/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.rollback.SerializableHoodieRollbackRequest;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseRollbackHelper
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(BaseRollbackHelper.class);
    protected static final String EMPTY_STRING = "";
    protected final HoodieTable table;
    protected final HoodieTableMetaClient metaClient;
    protected final HoodieWriteConfig config;

    public BaseRollbackHelper(HoodieTable table, HoodieWriteConfig config) {
        this.table = table;
        this.metaClient = table.getMetaClient();
        this.config = config;
    }

    public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, List<HoodieRollbackRequest> rollbackRequests) {
        int parallelism = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + this.config.getTableName());
        List<SerializableHoodieRollbackRequest> serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
        WriteMarkers markers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, instantTime);
        HashSet<String> logPaths = new HashSet();
        try {
            logPaths = markers.getAppendedLogPaths(context, this.config.getFinalizeWriteParallelism());
        }
        catch (FileNotFoundException fnf) {
            LOG.warn("Rollback never failed and hence no marker dir was found. Safely moving on");
        }
        catch (IOException e) {
            throw new HoodieRollbackException("Failed to list log file markers for previous attempt of rollback ", e);
        }
        List getRollbackStats = this.maybeDeleteAndCollectStats(context, instantTime, instantToRollback, serializableRequests, true, parallelism);
        List<HoodieRollbackStat> mergedRollbackStatByPartitionPath = context.reduceByKey(getRollbackStats, RollbackUtils::mergeRollbackStat, parallelism);
        return this.addLogFilesFromPreviousFailedRollbacksToStat(context, mergedRollbackStatByPartitionPath, logPaths);
    }

    public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, List<HoodieRollbackRequest> rollbackRequests) {
        int parallelism = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade: " + this.config.getTableName());
        List<SerializableHoodieRollbackRequest> serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
        return context.reduceByKey(this.maybeDeleteAndCollectStats(context, instantTime, instantToRollback, serializableRequests, false, parallelism), RollbackUtils::mergeRollbackStat, parallelism);
    }

    List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, List<SerializableHoodieRollbackRequest> rollbackRequests, boolean doDelete, int numPartitions) {
        return context.flatMap(rollbackRequests, rollbackRequest -> {
            List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
            if (!filesToBeDeleted.isEmpty()) {
                List<HoodieRollbackStat> rollbackStats = this.deleteFiles(this.metaClient, filesToBeDeleted, doDelete);
                ArrayList partitionToRollbackStats = new ArrayList();
                rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
                return partitionToRollbackStats.stream();
            }
            if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
                StoragePath filePath;
                Closeable writer = null;
                try {
                    String partitionPath = rollbackRequest.getPartitionPath();
                    String fileId = rollbackRequest.getFileId();
                    String latestBaseInstant = rollbackRequest.getLatestBaseInstant();
                    WriteMarkers writeMarkers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, instantTime);
                    writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.constructAbsolutePath(this.metaClient.getBasePathV2().toString(), partitionPath)).withFileId(fileId).overBaseCommit(latestBaseInstant).withStorage(this.metaClient.getStorage()).withLogWriteCallback(this.getRollbackLogMarkerCallback(writeMarkers, partitionPath, fileId)).withFileExtension(".log").build();
                    if (doDelete) {
                        Map<HoodieLogBlock.HeaderMetadataType, String> header = this.generateHeader(instantToRollback.getTimestamp());
                        filePath = writer.appendBlock(new HoodieCommandBlock(header)).logFile().getPath();
                    } else {
                        filePath = writer.getLogFile().getPath();
                    }
                }
                catch (IOException | InterruptedException io) {
                    throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
                }
                finally {
                    try {
                        if (writer != null) {
                            writer.close();
                        }
                    }
                    catch (IOException io) {
                        throw new HoodieIOException("Error appending rollback block", io);
                    }
                }
                Map<StoragePathInfo, Long> filesToNumBlocksRollback = Collections.singletonMap(this.metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)), 1L);
                String partitionFullPath = FSUtils.constructAbsolutePath(this.metaClient.getBasePathV2().toString(), rollbackRequest.getPartitionPath()).toString();
                HashMap<String, Long> validLogBlocksToDelete = new HashMap<String, Long>();
                rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach(kv -> {
                    String logFileFullPath = (String)kv.getKey();
                    String logFileName = logFileFullPath.replace(partitionFullPath, EMPTY_STRING);
                    if (!StringUtils.isNullOrEmpty(logFileName)) {
                        validLogBlocksToDelete.put((String)kv.getKey(), (Long)kv.getValue());
                    }
                });
                return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).withLogFilesFromFailedCommit(validLogBlocksToDelete).build())).stream();
            }
            return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).build())).stream();
        }, numPartitions);
    }

    private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final WriteMarkers writeMarkers, final String partitionPath, final String fileId) {
        return new HoodieLogFileWriteCallback(){

            @Override
            public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
                this.createAppendMarker(logFileToAppend);
                return true;
            }

            @Override
            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
                return this.createAppendMarker(logFileToCreate);
            }

            private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
                return writeMarkers.createIfNotExists(partitionPath, logFileToAppend.getFileName(), IOType.APPEND, BaseRollbackHelper.this.config, fileId, BaseRollbackHelper.this.metaClient.getActiveTimeline()).isPresent();
            }
        };
    }

    private List<HoodieRollbackStat> addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context, List<HoodieRollbackStat> originalRollbackStats, Set<String> logPaths) {
        if (logPaths.isEmpty()) {
            return originalRollbackStats;
        }
        String basePathStr = this.metaClient.getBasePathV2().toString();
        ArrayList<String> logFiles = new ArrayList<String>(logPaths);
        HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData = this.populatePartitionToLogFilesHoodieData(context, basePathStr, logFiles);
        HoodiePairData<String, HoodieRollbackStat> partitionPathToRollbackStatsHoodieData = context.parallelize(originalRollbackStats).mapToPair(t -> Pair.of(t.getPartitionPath(), t));
        StorageConfiguration<?> storageConf = context.getStorageConf();
        List<HoodieRollbackStat> finalRollbackStats = this.addMissingLogFilesAndGetRollbackStats(partitionPathToRollbackStatsHoodieData, partitionPathToLogFilesHoodieData, basePathStr, storageConf);
        return finalRollbackStats;
    }

    private HoodiePairData<String, List<String>> populatePartitionToLogFilesHoodieData(HoodieEngineContext context, String basePathStr, List<String> logFiles) {
        return context.parallelize(logFiles).mapToPair(t -> {
            Path logFilePath = new Path(basePathStr, t);
            String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePathStr), logFilePath.getParent());
            return Pair.of(partitionPath, logFilePath.getName());
        }).groupByKey().mapToPair(t -> {
            ArrayList allFiles = new ArrayList();
            ((Iterable)t.getRight()).forEach(entry -> allFiles.add(entry));
            return Pair.of(t.getKey(), allFiles);
        });
    }

    private List<HoodieRollbackStat> addMissingLogFilesAndGetRollbackStats(HoodiePairData<String, HoodieRollbackStat> partitionPathToRollbackStatsHoodieData, HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData, String basePathStr, StorageConfiguration<?> storageConf) {
        return partitionPathToRollbackStatsHoodieData.leftOuterJoin(partitionPathToLogFilesHoodieData).map(v1 -> {
            if (((Option)((Pair)v1.getValue()).getValue()).isPresent()) {
                String partition = (String)v1.getKey();
                HoodieRollbackStat rollbackStat = (HoodieRollbackStat)((Pair)v1.getValue()).getKey();
                List missingLogFiles = (List)((Option)((Pair)v1.getValue()).getRight()).get();
                StoragePath fullPartitionPath = StringUtils.isNullOrEmpty(partition) ? new StoragePath(basePathStr) : new StoragePath(basePathStr, partition);
                HoodieStorage storage = HoodieStorageUtils.getStorage(fullPartitionPath, storageConf);
                List<Option<StoragePathInfo>> pathInfoOptList = FSUtils.getPathInfoUnderPartition(storage, fullPartitionPath, new HashSet<String>(missingLogFiles), true);
                List<StoragePathInfo> pathInfoList = pathInfoOptList.stream().filter(fileStatusOption -> fileStatusOption.isPresent()).map(fileStatusOption -> (StoragePathInfo)fileStatusOption.get()).collect(Collectors.toList());
                HashMap<StoragePathInfo, Long> commandBlocksCount = new HashMap<StoragePathInfo, Long>(rollbackStat.getCommandBlocksCount());
                pathInfoList.forEach(pathInfo -> commandBlocksCount.put((StoragePathInfo)pathInfo, pathInfo.getLength()));
                return new HoodieRollbackStat(rollbackStat.getPartitionPath(), rollbackStat.getSuccessDeleteFiles(), rollbackStat.getFailedDeleteFiles(), commandBlocksCount, rollbackStat.getLogFilesFromFailedCommit());
            }
            return (HoodieRollbackStat)((Pair)v1.getValue()).getKey();
        }).collectAsList();
    }

    protected List<HoodieRollbackStat> deleteFiles(HoodieTableMetaClient metaClient, List<String> filesToBeDeleted, boolean doDelete) throws IOException {
        return filesToBeDeleted.stream().map(fileToDelete -> {
            String basePath = metaClient.getBasePathV2().toString();
            try {
                Path fullDeletePath = new Path(fileToDelete);
                String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
                boolean isDeleted = true;
                if (doDelete) {
                    try {
                        isDeleted = ((FileSystem)metaClient.getStorage().getFileSystem()).delete(fullDeletePath);
                    }
                    catch (FileNotFoundException e) {
                        isDeleted = true;
                    }
                }
                return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath).withDeletedFileResult(fullDeletePath.toString(), isDeleted).build();
            }
            catch (IOException e) {
                LOG.error("Fetching file status for ");
                throw new HoodieIOException("Fetching file status for " + fileToDelete + " failed ", e);
            }
        }).collect(Collectors.toList());
    }

    protected Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>(3);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        return header;
    }
}

