/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseFlinkCommitActionExecutor<T>
extends BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LogManager.getLogger(BaseFlinkCommitActionExecutor.class);
    protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;

    public BaseFlinkCommitActionExecutor(HoodieEngineContext context, HoodieWriteHandle<?, ?, ?, ?> writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        this(context, writeHandle, config, table, instantTime, operationType, Option.empty());
    }

    public BaseFlinkCommitActionExecutor(HoodieEngineContext context, HoodieWriteHandle<?, ?, ?, ?> writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
        this.writeHandle = writeHandle;
    }

    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
        HoodieWriteMetadata result = new HoodieWriteMetadata();
        LinkedList<WriteStatus> writeStatuses = new LinkedList<WriteStatus>();
        HoodieRecord<T> record = inputRecords.get(0);
        String partitionPath = record.getPartitionPath();
        String fileId = record.getCurrentLocation().getFileId();
        BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") ? BucketType.INSERT : BucketType.UPDATE;
        this.handleUpsertPartition(this.instantTime, partitionPath, fileId, bucketType, inputRecords.iterator()).forEachRemaining(writeStatuses::addAll);
        this.setUpWriteMetadata(writeStatuses, (HoodieWriteMetadata<List<WriteStatus>>)result);
        return result;
    }

    protected void setUpWriteMetadata(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) {
        result.setWriteStatuses(statuses);
        result.setIndexUpdateDuration(Duration.ZERO);
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
        this.commit(extraMetadata, result, ((List)result.getWriteStatuses()).stream().map(WriteStatus::getStat).collect(Collectors.toList()));
    }

    protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> result) {
        result.setCommitMetadata(Option.of((Object)CommitUtils.buildMetadata(((List)result.getWriteStatuses()).stream().map(WriteStatus::getStat).collect(Collectors.toList()), (Map)result.getPartitionToReplaceFileIds(), (Option)this.extraMetadata, (WriteOperationType)this.operationType, (String)this.getSchemaToStoreInCommit(), (String)this.getCommitActionType())));
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType));
        result.setCommitted(true);
        result.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result);
        try {
            LOG.info((Object)("Committing " + this.instantTime + ", action Type " + this.getCommitActionType()));
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)result.getCommitMetadata().get();
            this.writeTableMetadata(metadata, actionType);
            activeTimeline.saveAsComplete(new HoodieInstant(true, this.getCommitActionType(), this.instantTime), Option.of((Object)metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
            result.setCommitMetadata(Option.of((Object)metadata));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, (Throwable)e);
        }
    }

    protected boolean isWorkloadProfileNeeded() {
        return true;
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, String partitionPath, String fileIdHint, BucketType bucketType, Iterator recordItr) {
        try {
            if (this.writeHandle instanceof HoodieCreateHandle) {
                return this.handleInsert(fileIdHint, recordItr);
            }
            if (this.writeHandle instanceof HoodieMergeHandle) {
                return this.handleUpdate(partitionPath, fileIdHint, recordItr);
            }
            switch (bucketType) {
                case INSERT: {
                    return this.handleInsert(fileIdHint, recordItr);
                }
                case UPDATE: {
                    return this.handleUpdate(partitionPath, fileIdHint, recordItr);
                }
            }
            throw new AssertionError();
        }
        catch (Throwable t) {
            String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        HoodieMergeHandle upsertHandle = (HoodieMergeHandle)this.writeHandle;
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + fileId);
        }
        HoodieMergeHelper.newInstance().runMerge(this.table, upsertHandle);
        if (upsertHandle.getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.writeStatuses()));
        }
        return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
    }

    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        return new FlinkLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, new ExplicitWriteHandleFactory(this.writeHandle));
    }
}

