/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.async.SubAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.SubAsyncRequestBodyConfiguration;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;

@SdkInternalApi
public final class NonRetryableSubAsyncRequestBody
implements SubAsyncRequestBody {
    private static final Logger log = Logger.loggerFor(NonRetryableSubAsyncRequestBody.class);
    private final SubAsyncRequestBodyConfiguration configuration;
    private final int partNumber;
    private final boolean contentLengthKnown;
    private final String sourceBodyName;
    private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher();
    private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
    private volatile long bufferedLength = 0L;
    private final Consumer<Long> onNumBytesReceived;
    private final Consumer<Long> onNumBytesConsumed;

    public NonRetryableSubAsyncRequestBody(SubAsyncRequestBodyConfiguration configuration) {
        this.configuration = Validate.paramNotNull(configuration, "configuration");
        this.partNumber = configuration.partNumber();
        this.contentLengthKnown = configuration.contentLengthKnown();
        this.sourceBodyName = configuration.sourceBodyName();
        this.onNumBytesReceived = configuration.onNumBytesReceived();
        this.onNumBytesConsumed = configuration.onNumBytesConsumed();
    }

    @Override
    public Optional<Long> contentLength() {
        return this.contentLengthKnown ? Optional.of(this.configuration.maxLength()) : Optional.of(this.bufferedLength);
    }

    @Override
    public void send(ByteBuffer data) {
        log.debug(() -> String.format("Sending bytebuffer %s to part %d", data, this.partNumber));
        long length = data.remaining();
        this.bufferedLength += length;
        this.onNumBytesReceived.accept(length);
        this.delegate.send(data).whenComplete((r, t) -> {
            this.onNumBytesConsumed.accept(length);
            if (t != null) {
                this.error((Throwable)t);
            }
        });
    }

    @Override
    public void complete() {
        log.debug(() -> "Received complete() for part number: " + this.partNumber);
        this.delegate.complete().whenComplete((r, t) -> {
            if (t != null) {
                this.error((Throwable)t);
            }
        });
    }

    @Override
    public long maxLength() {
        return this.configuration.maxLength();
    }

    @Override
    public long receivedBytesLength() {
        return this.bufferedLength;
    }

    @Override
    public int partNumber() {
        return this.partNumber;
    }

    public void error(Throwable error) {
        this.delegate.error(error);
    }

    @Override
    public void subscribe(Subscriber<? super ByteBuffer> s) {
        if (this.subscribeCalled.compareAndSet(false, true)) {
            this.delegate.subscribe(s);
        } else {
            s.onSubscribe(new NoopSubscription(s));
            s.onError(NonRetryableException.create("Multiple subscribers detected. This could happen due to a retry attempt. The AsyncRequestBody implementation provided does not support splitting to retryable/resubscribable AsyncRequestBody. If you need retry capability or multiple subscriptions, consider using BufferedSplittableAsyncRequestBody to wrap your AsyncRequestBody."));
        }
    }

    @Override
    public String body() {
        return this.sourceBodyName;
    }
}

