/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

@Internal
public class KafkaShuffleFetcher<T>
extends KafkaFetcher<T> {
    private final WatermarkHandler watermarkHandler;
    private final KafkaShuffleElementDeserializer kafkaShuffleDeserializer;

    public KafkaShuffleFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics, TypeSerializer<T> typeSerializer, int producerParallelism) throws Exception {
        super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, taskNameWithSubtasks, deserializer, kafkaProperties, pollTimeout, subtaskMetricGroup, consumerMetricGroup, useMetrics);
        this.kafkaShuffleDeserializer = new KafkaShuffleElementDeserializer<T>(typeSerializer);
        this.watermarkHandler = new WatermarkHandler(producerParallelism);
    }

    @Override
    protected String getFetcherName() {
        return "Kafka Shuffle Fetcher";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
            KafkaShuffleElement element = this.kafkaShuffleDeserializer.deserialize(record);
            if (element.isRecord()) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    KafkaShuffleRecord elementAsRecord = element.asRecord();
                    this.sourceContext.collectWithTimestamp(elementAsRecord.value, elementAsRecord.timestamp == null ? record.timestamp() : elementAsRecord.timestamp.longValue());
                    partition.setOffset(record.offset());
                    continue;
                }
            }
            if (!element.isWatermark()) continue;
            KafkaShuffleWatermark watermark = element.asWatermark();
            Optional newWatermark = this.watermarkHandler.checkAndGetNewWatermark(watermark);
            newWatermark.ifPresent(arg_0 -> ((SourceFunction.SourceContext)this.sourceContext).emitWatermark(arg_0));
        }
    }

    private static class WatermarkHandler {
        private final int producerParallelism;
        private final Map<Integer, Long> subtaskWatermark;
        private long currentMinWatermark = Long.MIN_VALUE;

        WatermarkHandler(int producerParallelism) {
            this.producerParallelism = producerParallelism;
            this.subtaskWatermark = new HashMap<Integer, Long>(producerParallelism);
        }

        private Optional<Watermark> checkAndGetNewWatermark(KafkaShuffleWatermark newWatermark) {
            Long currentSubTaskWatermark = this.subtaskWatermark.get(newWatermark.subtask);
            Preconditions.checkState((currentSubTaskWatermark == null || currentSubTaskWatermark < newWatermark.watermark ? 1 : 0) != 0, (Object)("Watermark should always increase: current : new " + currentSubTaskWatermark + ":" + newWatermark.watermark));
            this.subtaskWatermark.put(newWatermark.subtask, newWatermark.watermark);
            if (this.subtaskWatermark.values().size() < this.producerParallelism) {
                return Optional.empty();
            }
            long minWatermark = this.subtaskWatermark.values().stream().min(Comparator.naturalOrder()).orElse(Long.MIN_VALUE);
            if (this.currentMinWatermark < minWatermark) {
                this.currentMinWatermark = minWatermark;
                return Optional.of(new Watermark(minWatermark));
            }
            return Optional.empty();
        }
    }

    @VisibleForTesting
    public static class KafkaShuffleElementDeserializer<T>
    implements Serializable {
        private static final long serialVersionUID = 1000001L;
        private final TypeSerializer<T> typeSerializer;
        private transient DataInputDeserializer dis;

        @VisibleForTesting
        public KafkaShuffleElementDeserializer(TypeSerializer<T> typeSerializer) {
            this.typeSerializer = typeSerializer;
        }

        @VisibleForTesting
        public KafkaShuffleElement deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            byte[] value = (byte[])record.value();
            if (this.dis != null) {
                this.dis.setBuffer(value);
            } else {
                this.dis = new DataInputDeserializer(value);
            }
            ByteSerializer.INSTANCE.deserialize((DataInputView)this.dis);
            byte tag = ByteSerializer.INSTANCE.deserialize((DataInputView)this.dis);
            if (tag == 1) {
                return new KafkaShuffleRecord<Object>(this.typeSerializer.deserialize((DataInputView)this.dis));
            }
            if (tag == 0) {
                return new KafkaShuffleRecord<Object>(LongSerializer.INSTANCE.deserialize((DataInputView)this.dis), this.typeSerializer.deserialize((DataInputView)this.dis));
            }
            if (tag == 2) {
                return new KafkaShuffleWatermark(IntSerializer.INSTANCE.deserialize((DataInputView)this.dis), LongSerializer.INSTANCE.deserialize((DataInputView)this.dis));
            }
            throw new UnsupportedOperationException("Unsupported tag format");
        }
    }

    @VisibleForTesting
    public static class KafkaShuffleRecord<T>
    extends KafkaShuffleElement {
        final T value;
        final Long timestamp;

        KafkaShuffleRecord(T value) {
            this.value = value;
            this.timestamp = null;
        }

        KafkaShuffleRecord(long timestamp, T value) {
            this.value = value;
            this.timestamp = timestamp;
        }

        public T getValue() {
            return this.value;
        }

        public Long getTimestamp() {
            return this.timestamp;
        }
    }

    @VisibleForTesting
    public static class KafkaShuffleWatermark
    extends KafkaShuffleElement {
        final int subtask;
        final long watermark;

        KafkaShuffleWatermark(int subtask, long watermark) {
            this.subtask = subtask;
            this.watermark = watermark;
        }

        public int getSubtask() {
            return this.subtask;
        }

        public long getWatermark() {
            return this.watermark;
        }
    }

    @VisibleForTesting
    public static abstract class KafkaShuffleElement {
        public boolean isRecord() {
            return this.getClass() == KafkaShuffleRecord.class;
        }

        public boolean isWatermark() {
            return this.getClass() == KafkaShuffleWatermark.class;
        }

        public <T> KafkaShuffleRecord<T> asRecord() {
            return (KafkaShuffleRecord)this;
        }

        public KafkaShuffleWatermark asWatermark() {
            return (KafkaShuffleWatermark)this;
        }
    }
}

