/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

class RealtimeUnmergedRecordReader
extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
    private final HoodieUnMergedLogRecordScanner logRecordScanner;
    private final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final RecordReaderValueIterator<NullWritable, ArrayWritable> parquetRecordsIterator;
    private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?> executor;
    private final Iterator<ArrayWritable> iterator;

    public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, RecordReader<NullWritable, ArrayWritable> realReader) {
        super(split, job);
        this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
        this.parquetRecordsIterator = new RecordReaderValueIterator<NullWritable, ArrayWritable>(this.parquetReader);
        this.executor = new BoundedInMemoryExecutor(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf), this.getParallelProducers(), Option.empty(), Function.identity(), (SizeEstimator)new DefaultSizeEstimator(), Functions.noop());
        this.iterator = this.executor.getQueue().iterator();
        this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder().withFileSystem(FSUtils.getFs((String)split.getPath().toString(), (Configuration)this.jobConf)).withBasePath(split.getBasePath()).withLogFilePaths(split.getDeltaLogPaths()).withReaderSchema(this.getReaderSchema()).withLatestInstantTime(split.getMaxCommitTime()).withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get("compaction.lazy.block.read.enabled", "true"))).withReverseReader(false).withBufferSize(this.jobConf.getInt("hoodie.memory.dfs.buffer.max.size", 0x100000)).withLogRecordScannerCallback(record -> {
            GenericRecord rec = (GenericRecord)((HoodieRecordPayload)record.getData()).getInsertValue(this.getReaderSchema(), this.payloadProps).get();
            ArrayWritable aWritable = (ArrayWritable)HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, this.getHiveSchema());
            this.executor.getQueue().insertRecord((Object)aWritable);
        }).build();
        this.executor.startProducers();
    }

    private List<BoundedInMemoryQueueProducer<ArrayWritable>> getParallelProducers() {
        ArrayList<BoundedInMemoryQueueProducer<ArrayWritable>> producers = new ArrayList<BoundedInMemoryQueueProducer<ArrayWritable>>();
        producers.add((BoundedInMemoryQueueProducer<ArrayWritable>)new FunctionBasedQueueProducer(buffer -> {
            this.logRecordScanner.scan();
            return null;
        }));
        producers.add((BoundedInMemoryQueueProducer<ArrayWritable>)new IteratorBasedQueueProducer(this.parquetRecordsIterator));
        return producers;
    }

    public boolean next(NullWritable key, ArrayWritable value) {
        if (!this.iterator.hasNext()) {
            return false;
        }
        value.set(this.iterator.next().get());
        return true;
    }

    public NullWritable createKey() {
        return (NullWritable)this.parquetReader.createKey();
    }

    public ArrayWritable createValue() {
        return (ArrayWritable)this.parquetReader.createValue();
    }

    public long getPos() {
        return 0L;
    }

    public void close() throws IOException {
        this.parquetRecordsIterator.close();
        this.executor.shutdownNow();
    }

    public float getProgress() throws IOException {
        return Math.min(this.parquetReader.getProgress(), this.logRecordScanner.getProgress());
    }
}

