/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.action.compact.OperationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionAdminClient
extends BaseHoodieClient {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class);

    public CompactionAdminClient(HoodieEngineContext context, String basePath) {
        super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build());
    }

    public List<ValidationOpResult> validateCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant, int parallelism) throws IOException {
        HoodieCompactionPlan plan = CompactionAdminClient.getCompactionPlan(metaClient, compactionInstant);
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        if (plan.getOperations() != null) {
            List ops = plan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
            this.context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + this.config.getTableName());
            return this.context.map(ops, op -> {
                try {
                    return this.validateCompactionOperation(metaClient, compactionInstant, (CompactionOperation)op, Option.of(fsView));
                }
                catch (IOException e) {
                    throw new HoodieIOException(e.getMessage(), e);
                }
            }, parallelism);
        }
        return new ArrayList<ValidationOpResult>();
    }

    public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, boolean skipValidation, int parallelism, boolean dryRun) throws Exception {
        Option allSuccess;
        List<Pair<HoodieLogFile, HoodieLogFile>> renameActions;
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        List<RenameOpResult> res = this.runRenamingOps(metaClient, renameActions = this.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, parallelism, Option.empty(), skipValidation), parallelism, dryRun);
        Option<Boolean> success = Option.fromJavaOptional(res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd));
        Option<Object> option = allSuccess = success.isPresent() ? Option.of(success.get()) : Option.empty();
        if (!dryRun && allSuccess.isPresent() && ((Boolean)allSuccess.get()).booleanValue()) {
            HoodieInstant inflight = new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionInstant);
            StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), inflight.getFileName());
            if (metaClient.getStorage().exists(inflightPath)) {
                throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
            }
            HoodieInstant instant = new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", compactionInstant);
            boolean deleted = metaClient.getStorage().deleteFile(new StoragePath(metaClient.getMetaPath(), instant.getFileName()));
            ValidationUtils.checkArgument(deleted, "Unable to delete compaction instant.");
        }
        return res;
    }

    public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, boolean skipValidation, boolean dryRun) throws Exception {
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        List<Pair<HoodieLogFile, HoodieLogFile>> renameActions = this.getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId, Option.empty(), skipValidation);
        List<RenameOpResult> res = this.runRenamingOps(metaClient, renameActions, 1, dryRun);
        if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) {
            Pair<String, HoodieCompactionOperation> compactionOperationWithInstant = CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fgId);
            HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, compactionOperationWithInstant.getKey());
            List<HoodieCompactionOperation> newOps = plan.getOperations().stream().filter(op -> !op.getFileId().equals(fgId.getFileId()) && !op.getPartitionPath().equals(fgId.getPartitionPath())).collect(Collectors.toList());
            HoodieCompactionPlan newPlan = HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
            HoodieInstant inflight = new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionOperationWithInstant.getLeft());
            StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), inflight.getFileName());
            if (metaClient.getStorage().exists(inflightPath)) {
                metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
            }
            metaClient.getActiveTimeline().saveToCompactionRequested(new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", compactionOperationWithInstant.getLeft()), TimelineMetadataUtils.serializeCompactionPlan(newPlan), true);
        }
        return res;
    }

    public List<RenameOpResult> repairCompaction(String compactionInstant, int parallelism, boolean dryRun) throws Exception {
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        List<ValidationOpResult> validationResults = this.validateCompactionPlan(metaClient, compactionInstant, parallelism);
        List failed = validationResults.stream().filter(v -> !v.isSuccess()).collect(Collectors.toList());
        if (failed.isEmpty()) {
            return new ArrayList<RenameOpResult>();
        }
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        List<Pair<HoodieLogFile, HoodieLogFile>> renameActions = failed.stream().flatMap(v -> CompactionAdminClient.getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant, (CompactionOperation)v.getOperation(), Option.of(fsView)).stream()).collect(Collectors.toList());
        return this.runRenamingOps(metaClient, renameActions, parallelism, dryRun);
    }

    private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) throws IOException {
        return TimelineMetadataUtils.deserializeCompactionPlan(metaClient.getActiveTimeline().readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
    }

    protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAlignWithCompactionOperation(HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation op, Option<HoodieTableFileSystemView> fsViewOpt) {
        HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get();
        FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()).filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get();
        int maxVersion = op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new StoragePath((String)lf))).reduce((x, y) -> x > y ? x : y).orElse(0);
        List logFilesToBeMoved = merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
        return logFilesToBeMoved.stream().map(lf -> {
            ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0, "Expect new log version to be sane");
            HoodieLogFile newLogFile = new HoodieLogFile(new StoragePath(lf.getPath().getParent(), FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(), compactionInstant, lf.getLogVersion() - maxVersion, "1-0-1")));
            return Pair.of(lf, newLogFile);
        }).collect(Collectors.toList());
    }

    protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogFile oldLogFile, HoodieLogFile newLogFile) throws IOException {
        List<StoragePathInfo> pathInfoList = metaClient.getStorage().listDirectEntries(oldLogFile.getPath());
        ValidationUtils.checkArgument(pathInfoList.size() == 1, "Only one status must be present");
        ValidationUtils.checkArgument(pathInfoList.get(0).isFile(), "Source File must exist");
        ValidationUtils.checkArgument(oldLogFile.getPath().getParent().equals(newLogFile.getPath().getParent()), "Log file must only be moved within the parent directory");
        metaClient.getStorage().rename(oldLogFile.getPath(), newLogFile.getPath());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation operation, Option<HoodieTableFileSystemView> fsViewOpt) throws IOException {
        HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        Option<HoodieInstant> lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant();
        try {
            if (!lastInstant.isPresent()) throw new CompactionValidationException("Unable to find any committed instant. Compaction Operation may be pointing to stale file-slices");
            Option<FileSlice> fileSliceOptional = Option.fromJavaOptional(fileSystemView.getLatestUnCompactedFileSlices(operation.getPartitionPath()).filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst());
            if (!fileSliceOptional.isPresent()) throw new CompactionValidationException("Unable to find file-slice for file-id (" + operation.getFileId() + " Compaction operation is invalid.");
            FileSlice fs2 = fileSliceOptional.get();
            Option<HoodieBaseFile> df = fs2.getBaseFile();
            if (operation.getDataFileName().isPresent()) {
                String expPath = metaClient.getStorage().getPathInfo(new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), operation.getPartitionPath()), operation.getDataFileName().get())).getPath().toString();
                ValidationUtils.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " + fs2 + ", operation :" + operation);
                ValidationUtils.checkArgument(df.get().getPath().equals(expPath), "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath());
            }
            Set logFilesInFileSlice = fs2.getLogFiles().collect(Collectors.toSet());
            Set logFilesInCompactionOp = operation.getDeltaFileNames().stream().map(dp -> {
                try {
                    List<StoragePathInfo> pathInfoList = metaClient.getStorage().listDirectEntries(new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), operation.getPartitionPath()), (String)dp));
                    ValidationUtils.checkArgument(pathInfoList.size() == 1, "Expect only 1 file-status");
                    return new HoodieLogFile(pathInfoList.get(0));
                }
                catch (FileNotFoundException fe) {
                    throw new CompactionValidationException(fe.getMessage());
                }
                catch (IOException ioe) {
                    throw new HoodieIOException(ioe.getMessage(), ioe);
                }
            }).collect(Collectors.toSet());
            Set missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)).collect(Collectors.toSet());
            ValidationUtils.checkArgument(missing.isEmpty(), "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice);
            Set diff = logFilesInFileSlice.stream().filter(lf -> !logFilesInCompactionOp.contains(lf)).collect(Collectors.toSet());
            ValidationUtils.checkArgument(diff.stream().allMatch(lf -> lf.getBaseCommitTime().equals(compactionInstant)), "There are some log-files which are neither specified in compaction plan nor present after compaction request instant. Some of these :" + diff);
            return new ValidationOpResult(operation, true, Option.empty());
        }
        catch (IllegalArgumentException | CompactionValidationException e) {
            return new ValidationOpResult(operation, false, Option.of(e));
        }
    }

    private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient, List<Pair<HoodieLogFile, HoodieLogFile>> renameActions, int parallelism, boolean dryRun) {
        if (renameActions.isEmpty()) {
            LOG.info("No renaming of log-files needed. Proceeding to removing file-id from compaction-plan");
            return new ArrayList<RenameOpResult>();
        }
        LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
        if (!dryRun) {
            this.context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + this.config.getTableName());
            return this.context.map(renameActions, lfPair -> {
                try {
                    LOG.info("RENAME " + ((HoodieLogFile)lfPair.getLeft()).getPath() + " => " + ((HoodieLogFile)lfPair.getRight()).getPath());
                    CompactionAdminClient.renameLogFile(metaClient, (HoodieLogFile)lfPair.getLeft(), (HoodieLogFile)lfPair.getRight());
                    return new RenameOpResult((Pair<HoodieLogFile, HoodieLogFile>)lfPair, true, Option.empty());
                }
                catch (IOException e) {
                    LOG.error("Error renaming log file", (Throwable)e);
                    LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair " + ((HoodieLogFile)lfPair.getLeft()).getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
                    return new RenameOpResult((Pair<HoodieLogFile, HoodieLogFile>)lfPair, false, Option.of(e));
                }
            }, parallelism);
        }
        LOG.info("Dry-Run Mode activated for rename operations");
        return renameActions.parallelStream().map(lfPair -> new RenameOpResult((Pair<HoodieLogFile, HoodieLogFile>)lfPair, false, false, Option.empty())).collect(Collectors.toList());
    }

    public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant, int parallelism, Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
        HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        HoodieCompactionPlan plan = CompactionAdminClient.getCompactionPlan(metaClient, compactionInstant);
        if (plan.getOperations() != null) {
            LOG.info("Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
            List ops = plan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
            this.context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + this.config.getTableName());
            return this.context.flatMap(ops, op -> {
                try {
                    return this.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, (CompactionOperation)op, Option.of(fsView), skipValidation).stream();
                }
                catch (IOException ioe) {
                    throw new HoodieIOException(ioe.getMessage(), ioe);
                }
                catch (CompactionValidationException ve) {
                    throw new HoodieException(ve);
                }
            }, parallelism);
        }
        LOG.warn("No operations for compaction instant : " + compactionInstant);
        return new ArrayList<Pair<HoodieLogFile, HoodieLogFile>>();
    }

    public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionOperation(HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation operation, Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
        HoodieTableFileSystemView fileSystemView;
        ArrayList<Pair<HoodieLogFile, HoodieLogFile>> result = new ArrayList<Pair<HoodieLogFile, HoodieLogFile>>();
        HoodieTableFileSystemView hoodieTableFileSystemView = fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
        if (!skipValidation) {
            this.validateCompactionOperation(metaClient, compactionInstant, operation, Option.of(fileSystemView));
        }
        HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get();
        FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(operation.getPartitionPath(), lastInstant.getTimestamp()).filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
        List logFilesToRepair = merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant)).sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
        FileSlice fileSliceForCompaction = fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime(), true).filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
        int maxUsedVersion = fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION - 1);
        String logExtn = fileSliceForCompaction.getLogFiles().findFirst().map(lf -> "." + lf.getFileExtension()).orElse(".log");
        String parentPath = fileSliceForCompaction.getBaseFile().map(df -> new StoragePath(df.getPath()).getParent().toString()).orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getPath().getParent().toString()).get());
        for (HoodieLogFile toRepair : logFilesToRepair) {
            int version = maxUsedVersion + 1;
            HoodieLogFile newLf = new HoodieLogFile(new StoragePath(parentPath, FSUtils.makeLogFileName(operation.getFileId(), logExtn, operation.getBaseInstantTime(), version, "1-0-1")));
            result.add(Pair.of(toRepair, newLf));
            maxUsedVersion = version;
        }
        return result;
    }

    public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionForFileId(HoodieTableMetaClient metaClient, HoodieFileGroupId fgId, Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
        Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> allPendingCompactions = CompactionUtils.getAllPendingCompactionOperations(metaClient);
        if (allPendingCompactions.containsKey(fgId)) {
            Pair<String, HoodieCompactionOperation> opWithInstant = allPendingCompactions.get(fgId);
            return this.getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(), CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation);
        }
        throw new HoodieException("FileGroupId " + fgId + " not in pending compaction");
    }

    public static class CompactionValidationException
    extends RuntimeException {
        public CompactionValidationException(String msg) {
            super(msg);
        }
    }

    public static class RenameInfo
    implements Serializable {
        public String fileId;
        public String srcPath;
        public String destPath;

        public RenameInfo(String fileId, String srcPath, String destPath) {
            this.fileId = fileId;
            this.srcPath = srcPath;
            this.destPath = destPath;
        }
    }

    public static class ValidationOpResult
    extends OperationResult<CompactionOperation> {
        public ValidationOpResult(CompactionOperation operation, boolean success, Option<Exception> exception) {
            super(operation, success, exception);
        }
    }

    public static class RenameOpResult
    extends OperationResult<RenameInfo> {
        public RenameOpResult() {
        }

        public RenameOpResult(Pair<HoodieLogFile, HoodieLogFile> op, boolean success, Option<Exception> exception) {
            super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(), op.getRight().getPath().toString()), success, exception);
        }

        public RenameOpResult(Pair<HoodieLogFile, HoodieLogFile> op, boolean executed, boolean success, Option<Exception> exception) {
            super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(), op.getRight().getPath().toString()), executed, success, exception);
        }
    }
}

