/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.timeline;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimelineDiffHelper {
    private static final Logger LOG = LoggerFactory.getLogger(TimelineDiffHelper.class);

    public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
        HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
        HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
        Option<HoodieInstant> lastSeenInstant = oldT.lastInstant();
        Option<HoodieInstant> firstInstantInNewTimeline = newT.firstInstant();
        if (lastSeenInstant.isPresent() && firstInstantInNewTimeline.isPresent()) {
            if (HoodieTimeline.compareTimestamps(lastSeenInstant.get().getTimestamp(), HoodieTimeline.LESSER_THAN, firstInstantInNewTimeline.get().getTimestamp())) {
                return TimelineDiffResult.UNSAFE_SYNC_RESULT;
            }
            Set oldTimelineInstants = oldT.getInstantsAsStream().collect(Collectors.toSet());
            ArrayList<HoodieInstant> newInstants = new ArrayList<HoodieInstant>();
            List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = TimelineDiffHelper.getPendingCompactionTransitions(oldT, newT);
            List lostPendingCompactions = compactionInstants.stream().filter(instantPair -> instantPair.getValue() == null).map(Pair::getKey).collect(Collectors.toList());
            if (!lostPendingCompactions.isEmpty()) {
                LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :" + lostPendingCompactions);
                return TimelineDiffResult.UNSAFE_SYNC_RESULT;
            }
            List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream().filter(instantPair -> ((HoodieInstant)instantPair.getValue()).getAction().equals("commit") && ((HoodieInstant)instantPair.getValue()).isCompleted()).map(Pair::getKey).collect(Collectors.toList());
            newTimeline.getInstantsAsStream().filter(instant -> !oldTimelineInstants.contains(instant)).forEach(newInstants::add);
            List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = TimelineDiffHelper.getPendingLogCompactionTransitions(oldTimeline, newTimeline);
            List<HoodieInstant> finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream().filter(instantPair -> !((HoodieInstant)instantPair.getKey()).isCompleted() && (instantPair.getValue() == null || ((HoodieInstant)instantPair.getValue()).isCompleted())).map(Pair::getKey).collect(Collectors.toList());
            return new TimelineDiffResult(newInstants, finishedCompactionInstants, finishedOrRemovedLogCompactionInstants, true);
        }
        LOG.warn("One or more timelines is empty");
        return TimelineDiffResult.UNSAFE_SYNC_RESULT;
    }

    private static List<Pair<HoodieInstant, HoodieInstant>> getPendingLogCompactionTransitions(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
        Set newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet());
        return oldTimeline.filterPendingLogCompactionTimeline().getInstantsAsStream().map(instant -> {
            if (newTimelineInstants.contains(instant)) {
                return Pair.of(instant, instant);
            }
            HoodieInstant logCompacted = new HoodieInstant(HoodieInstant.State.COMPLETED, "deltacommit", instant.getTimestamp());
            if (newTimelineInstants.contains(logCompacted)) {
                return Pair.of(instant, logCompacted);
            }
            HoodieInstant inflightLogCompacted = new HoodieInstant(HoodieInstant.State.INFLIGHT, "logcompaction", instant.getTimestamp());
            if (newTimelineInstants.contains(inflightLogCompacted)) {
                return Pair.of(instant, inflightLogCompacted);
            }
            return Pair.of(instant, null);
        }).collect(Collectors.toList());
    }

    private static List<Pair<HoodieInstant, HoodieInstant>> getPendingCompactionTransitions(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
        Set newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet());
        return oldTimeline.filterPendingCompactionTimeline().getInstantsAsStream().map(instant -> {
            if (newTimelineInstants.contains(instant)) {
                return Pair.of(instant, instant);
            }
            HoodieInstant compacted = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instant.getTimestamp());
            if (newTimelineInstants.contains(compacted)) {
                return Pair.of(instant, compacted);
            }
            HoodieInstant inflightCompacted = new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", instant.getTimestamp());
            if (newTimelineInstants.contains(inflightCompacted)) {
                return Pair.of(instant, inflightCompacted);
            }
            return Pair.of(instant, null);
        }).collect(Collectors.toList());
    }

    public static class TimelineDiffResult {
        private final List<HoodieInstant> newlySeenInstants;
        private final List<HoodieInstant> finishedCompactionInstants;
        private final List<HoodieInstant> finishedOrRemovedLogCompactionInstants;
        private final boolean canSyncIncrementally;
        public static final TimelineDiffResult UNSAFE_SYNC_RESULT = new TimelineDiffResult(null, null, null, false);

        public TimelineDiffResult(List<HoodieInstant> newlySeenInstants, List<HoodieInstant> finishedCompactionInstants, List<HoodieInstant> finishedOrRemovedLogCompactionInstants, boolean canSyncIncrementally) {
            this.newlySeenInstants = newlySeenInstants;
            this.finishedCompactionInstants = finishedCompactionInstants;
            this.finishedOrRemovedLogCompactionInstants = finishedOrRemovedLogCompactionInstants;
            this.canSyncIncrementally = canSyncIncrementally;
        }

        public List<HoodieInstant> getNewlySeenInstants() {
            return this.newlySeenInstants;
        }

        public List<HoodieInstant> getFinishedCompactionInstants() {
            return this.finishedCompactionInstants;
        }

        public List<HoodieInstant> getFinishedOrRemovedLogCompactionInstants() {
            return this.finishedOrRemovedLogCompactionInstants;
        }

        public boolean canSyncIncrementally() {
            return this.canSyncIncrementally;
        }

        public String toString() {
            return "TimelineDiffResult{newlySeenInstants=" + this.newlySeenInstants + ", finishedCompactionInstants=" + this.finishedCompactionInstants + ", finishedOrRemovedLogCompactionInstants=" + this.finishedOrRemovedLogCompactionInstants + ", canSyncIncrementally=" + this.canSyncIncrementally + '}';
        }
    }
}

