/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.debezium.internal;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.ververica.cdc.debezium.internal.Handover;
import io.debezium.connector.SnapshotRecord;
import io.debezium.engine.ChangeEvent;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DebeziumChangeFetcher<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final DebeziumDeserializationSchema<T> deserialization;
    private final DebeziumCollector debeziumCollector;
    private final DebeziumOffset debeziumOffset;
    private final DebeziumOffsetSerializer stateSerializer;
    private final String heartbeatTopicPrefix;
    private boolean isInDbSnapshotPhase;
    private final Handover handover;
    private volatile boolean isRunning = true;
    private volatile long messageTimestamp = 0L;
    private volatile long processTime = 0L;
    private volatile long fetchDelay = 0L;
    private volatile long emitDelay = 0L;

    public DebeziumChangeFetcher(SourceFunction.SourceContext<T> sourceContext, DebeziumDeserializationSchema<T> deserialization, boolean isInDbSnapshotPhase, String heartbeatTopicPrefix, Handover handover) {
        this.sourceContext = sourceContext;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.deserialization = deserialization;
        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
        this.debeziumCollector = new DebeziumCollector();
        this.debeziumOffset = new DebeziumOffset();
        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
        this.handover = handover;
    }

    public byte[] snapshotCurrentState() throws Exception {
        assert (Thread.holdsLock(this.checkpointLock));
        if (this.debeziumOffset.sourceOffset == null || this.debeziumOffset.sourcePartition == null) {
            return null;
        }
        return this.stateSerializer.serialize(this.debeziumOffset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runFetchLoop() throws Exception {
        try {
            if (this.isInDbSnapshotPhase) {
                List<ChangeEvent<SourceRecord, SourceRecord>> events = this.handover.pollNext();
                Object object = this.checkpointLock;
                synchronized (object) {
                    LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
                    this.handleBatch(events);
                    while (this.isRunning && this.isInDbSnapshotPhase) {
                        this.handleBatch(this.handover.pollNext());
                    }
                }
                LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
            }
            while (this.isRunning) {
                this.handleBatch(this.handover.pollNext());
            }
        }
        catch (Handover.ClosedException events) {
        }
        catch (RetriableException e) {
            LOG.info("Ignore the RetriableException, the underlying DebeziumEngine will restart automatically", (Throwable)e);
        }
    }

    public void close() {
        this.isRunning = false;
        this.handover.close();
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public long getEmitDelay() {
        return this.emitDelay;
    }

    public long getIdleTime() {
        return System.currentTimeMillis() - this.processTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents) throws Exception {
        if (CollectionUtils.isEmpty(changeEvents)) {
            return;
        }
        this.processTime = System.currentTimeMillis();
        for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
            SourceRecord record = (SourceRecord)event.value();
            this.updateMessageTimestamp(record);
            long l = this.fetchDelay = this.isInDbSnapshotPhase ? 0L : this.processTime - this.messageTimestamp;
            if (this.isHeartbeatEvent(record)) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.debeziumOffset.setSourcePartition(record.sourcePartition());
                    this.debeziumOffset.setSourceOffset(record.sourceOffset());
                    continue;
                }
            }
            this.deserialization.deserialize(record, this.debeziumCollector);
            if (this.isInDbSnapshotPhase && !this.isSnapshotRecord(record)) {
                LOG.debug("Snapshot phase finishes.");
                this.isInDbSnapshotPhase = false;
            }
            this.emitRecordsUnderCheckpointLock(this.debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordsUnderCheckpointLock(Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        Object object = this.checkpointLock;
        synchronized (object) {
            T record;
            while ((record = records.poll()) != null) {
                this.emitDelay = this.isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - this.messageTimestamp;
                this.sourceContext.collect(record);
            }
            this.debeziumOffset.setSourcePartition(sourcePartition);
            this.debeziumOffset.setSourceOffset(sourceOffset);
        }
    }

    private void updateMessageTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("source") == null) {
            return;
        }
        Struct source = value.getStruct("source");
        if (source.schema().field("ts_ms") == null) {
            return;
        }
        Long tsMs = source.getInt64("ts_ms");
        if (tsMs != null) {
            this.messageTimestamp = tsMs;
        }
    }

    private boolean isHeartbeatEvent(SourceRecord record) {
        String topic = record.topic();
        return topic != null && topic.startsWith(this.heartbeatTopicPrefix);
    }

    private boolean isSnapshotRecord(SourceRecord record) {
        Struct value = (Struct)record.value();
        if (value != null) {
            Struct source = value.getStruct("source");
            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource((Struct)source);
            return SnapshotRecord.TRUE == snapshotRecord;
        }
        return false;
    }

    private class DebeziumCollector
    implements Collector<T> {
        private final Queue<T> records = new ArrayDeque();

        private DebeziumCollector() {
        }

        public void collect(T record) {
            this.records.add(record);
        }

        public void close() {
        }
    }
}

