/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;

public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends FlinkInsertOverwriteCommitActionExecutor<T> {
    private final List<String> partitions;

    public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> table, String instantTime, List<String> partitions) {
        super(context, null, config, table, instantTime, null, WriteOperationType.DELETE_PARTITION);
        this.partitions = partitions;
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> execute() {
        DeletePartitionUtils.checkForPendingTableServiceActions(this.table, this.partitions);
        try {
            HoodieTimer timer = HoodieTimer.start();
            this.context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
            Map<String, List<String>> partitionToReplaceFileIds = this.context.parallelize(this.partitions).distinct().collectAsList().stream().collect(Collectors.toMap(partitionPath -> partitionPath, this::getAllExistingFileIds));
            HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<List<WriteStatus>>();
            result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
            result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
            result.setWriteStatuses(Collections.emptyList());
            HoodieInstant dropPartitionsInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "replacecommit", this.instantTime);
            if (!this.table.getMetaClient().getFs().exists(new Path(this.table.getMetaClient().getMetaPath(), dropPartitionsInstant.getFileName()))) {
                HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setOperationType(WriteOperationType.DELETE_PARTITION.name()).setExtraMetadata(this.extraMetadata.orElse(Collections.emptyMap())).build();
                this.table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
            }
            this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap(), new WorkloadStat())), this.instantTime);
            this.commitOnAutoCommit(result);
            return result;
        }
        catch (Exception e) {
            throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + this.instantTime, e);
        }
    }

    private List<String> getAllExistingFileIds(String partitionPath) {
        return this.table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }
}

