/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.queue;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class BoundedInMemoryQueue<I, O>
implements HoodieMessageQueue<I, O>,
Iterable<O> {
    public static final int RECORD_POLL_INTERVAL_SEC = 1;
    public static final int RECORD_SAMPLING_RATE = 64;
    private static final int RECORD_CACHING_LIMIT = 131072;
    private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class);
    public final Semaphore rateLimiter = new Semaphore(1);
    public final AtomicLong samplingRecordCounter = new AtomicLong(-1L);
    private final LinkedBlockingQueue<Option<O>> queue = new LinkedBlockingQueue();
    private final long memoryLimit;
    private final AtomicReference<Throwable> hasFailed = new AtomicReference<Object>(null);
    private final AtomicBoolean isReadDone = new AtomicBoolean(false);
    private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
    private final Function<I, O> transformFunction;
    private final SizeEstimator<O> payloadSizeEstimator;
    private final QueueIterator iterator;
    public int currentRateLimit = 1;
    public long avgRecordSizeInBytes = 0L;
    private long numSamples = 0L;

    public BoundedInMemoryQueue(long memoryLimit, Function<I, O> transformFunction) {
        this(memoryLimit, transformFunction, new DefaultSizeEstimator(){});
    }

    public BoundedInMemoryQueue(long memoryLimit, Function<I, O> transformFunction, SizeEstimator<O> payloadSizeEstimator) {
        this.memoryLimit = memoryLimit;
        this.transformFunction = transformFunction;
        this.payloadSizeEstimator = payloadSizeEstimator;
        this.iterator = new QueueIterator();
    }

    @Override
    public long size() {
        return this.queue.size();
    }

    private void adjustBufferSizeIfNeeded(O payload) throws InterruptedException {
        if (this.samplingRecordCounter.incrementAndGet() % 64L != 0L) {
            return;
        }
        long recordSizeInBytes = this.payloadSizeEstimator.sizeEstimate(payload);
        long newAvgRecordSizeInBytes = Math.max(1L, (this.avgRecordSizeInBytes * this.numSamples + recordSizeInBytes) / (this.numSamples + 1L));
        int newRateLimit = (int)Math.min(131072L, Math.max(1L, this.memoryLimit / newAvgRecordSizeInBytes));
        if (newRateLimit > this.currentRateLimit) {
            this.rateLimiter.release(newRateLimit - this.currentRateLimit);
        } else if (newRateLimit < this.currentRateLimit) {
            this.rateLimiter.acquire(this.currentRateLimit - newRateLimit);
        }
        this.currentRateLimit = newRateLimit;
        this.avgRecordSizeInBytes = newAvgRecordSizeInBytes;
        ++this.numSamples;
    }

    @Override
    public void insertRecord(I t) throws Exception {
        if (this.isWriteDone.get()) {
            throw new IllegalStateException("Queue closed for enqueueing new entries");
        }
        this.throwExceptionIfFailed();
        this.rateLimiter.acquire();
        O payload = this.transformFunction.apply(t);
        this.adjustBufferSizeIfNeeded(payload);
        this.queue.put(Option.of(payload));
    }

    private boolean expectMoreRecords() {
        return !this.isWriteDone.get() || this.isWriteDone.get() && !this.queue.isEmpty();
    }

    @Override
    public Option<O> readNextRecord() {
        if (this.isReadDone.get()) {
            return Option.empty();
        }
        this.rateLimiter.release();
        Option<Object> newRecord = Option.empty();
        while (this.expectMoreRecords()) {
            try {
                this.throwExceptionIfFailed();
                newRecord = this.queue.poll(1L, TimeUnit.SECONDS);
                if (newRecord == null) continue;
                break;
            }
            catch (InterruptedException e) {
                LOG.error((Object)"error reading records from queue", (Throwable)e);
                throw new HoodieException(e);
            }
        }
        this.throwExceptionIfFailed();
        if (newRecord != null && newRecord.isPresent()) {
            return newRecord;
        }
        this.isReadDone.set(true);
        return Option.empty();
    }

    @Override
    public void seal() {
        this.isWriteDone.set(true);
    }

    @Override
    public void close() {
    }

    private void throwExceptionIfFailed() {
        if (this.hasFailed.get() != null) {
            this.close();
            throw new HoodieException("operation has failed", this.hasFailed.get());
        }
    }

    @Override
    public void markAsFailed(Throwable e) {
        this.hasFailed.set(e);
        this.rateLimiter.release(131073);
    }

    @Override
    public boolean isEmpty() {
        return this.queue.size() == 0;
    }

    @Override
    public Iterator<O> iterator() {
        return this.iterator;
    }

    private final class QueueIterator
    implements Iterator<O> {
        private O nextRecord;

        private QueueIterator() {
        }

        @Override
        public boolean hasNext() {
            if (this.nextRecord == null) {
                Option res = BoundedInMemoryQueue.this.readNextRecord();
                this.nextRecord = res.orElse(null);
            }
            return this.nextRecord != null;
        }

        @Override
        public O next() {
            ValidationUtils.checkState(this.hasNext() && this.nextRecord != null);
            Object ret = this.nextRecord;
            this.nextRecord = null;
            return ret;
        }
    }
}

