/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.view;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hudi.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hudi.com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.DTOUtils;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.org.apache.http.Consts;
import org.apache.hudi.org.apache.http.client.fluent.Request;
import org.apache.hudi.org.apache.http.client.fluent.Response;
import org.apache.hudi.org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteHoodieTableFileSystemView
implements SyncableFileSystemView,
Serializable {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
    private static final String BASE_URL = "/v1/hoodie/view";
    public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/partition/latest/");
    public static final String LATEST_PARTITION_SLICES_STATELESS_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/partition/latest/stateless/");
    public static final String LATEST_PARTITION_SLICE_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/file/latest/");
    public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/uncompacted/partition/latest/");
    public static final String ALL_SLICES_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/all");
    public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/merged/beforeoron/latest/");
    public static final String LATEST_SLICES_RANGE_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/range/latest/");
    public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/beforeoron/latest/");
    public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "slices/all/beforeoron/latest/");
    public static final String PENDING_COMPACTION_OPS = String.format("%s/%s", "/v1/hoodie/view", "compactions/pending/");
    public static final String PENDING_LOG_COMPACTION_OPS = String.format("%s/%s", "/v1/hoodie/view", "logcompactions/pending/");
    public static final String LATEST_PARTITION_DATA_FILES_URL = String.format("%s/%s", "/v1/hoodie/view", "datafiles/latest/partition");
    public static final String LATEST_PARTITION_DATA_FILE_URL = String.format("%s/%s", "/v1/hoodie/view", "datafile/latest/partition");
    public static final String ALL_DATA_FILES = String.format("%s/%s", "/v1/hoodie/view", "datafiles/all");
    public static final String LATEST_ALL_DATA_FILES = String.format("%s/%s", "/v1/hoodie/view", "datafiles/all/latest/");
    public static final String LATEST_DATA_FILE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "datafile/on/latest/");
    public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "datafiles/range/latest/");
    public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "datafiles/beforeoron/latest/");
    public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", "/v1/hoodie/view", "basefiles/all/beforeoron/");
    public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = String.format("%s/%s", "/v1/hoodie/view", "filegroups/all/partition/");
    public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL = String.format("%s/%s", "/v1/hoodie/view", "filegroups/all/partition/stateless/");
    public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON = String.format("%s/%s", "/v1/hoodie/view", "filegroups/replaced/beforeoron/");
    public static final String ALL_REPLACED_FILEGROUPS_BEFORE = String.format("%s/%s", "/v1/hoodie/view", "filegroups/replaced/before/");
    public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON = String.format("%s/%s", "/v1/hoodie/view", "filegroups/replaced/afteroron/");
    public static final String ALL_REPLACED_FILEGROUPS_PARTITION = String.format("%s/%s", "/v1/hoodie/view", "filegroups/replaced/partition/");
    public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", "/v1/hoodie/view", "clustering/pending/");
    public static final String LAST_INSTANT = String.format("%s/%s", "/v1/hoodie/view", "timeline/instant/last");
    public static final String LAST_INSTANTS = String.format("%s/%s", "/v1/hoodie/view", "timeline/instants/last");
    public static final String TIMELINE = String.format("%s/%s", "/v1/hoodie/view", "timeline/instants/all");
    public static final String REFRESH_TABLE = String.format("%s/%s", "/v1/hoodie/view", "refresh/");
    public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", "/v1/hoodie/view", "loadallpartitions/");
    public static final String PARTITION_PARAM = "partition";
    public static final String BASEPATH_PARAM = "basepath";
    public static final String INSTANT_PARAM = "instant";
    public static final String MAX_INSTANT_PARAM = "maxinstant";
    public static final String MIN_INSTANT_PARAM = "mininstant";
    public static final String INSTANTS_PARAM = "instants";
    public static final String FILEID_PARAM = "fileid";
    public static final String LAST_INSTANT_TS = "lastinstantts";
    public static final String TIMELINE_HASH = "timelinehash";
    public static final String REFRESH_OFF = "refreshoff";
    public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
    private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
    private static final TypeReference<List<FileSliceDTO>> FILE_SLICE_DTOS_REFERENCE = new TypeReference<List<FileSliceDTO>>(){};
    private static final TypeReference<List<FileGroupDTO>> FILE_GROUP_DTOS_REFERENCE = new TypeReference<List<FileGroupDTO>>(){};
    private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>(){};
    private static final TypeReference<List<CompactionOpDTO>> COMPACTION_OP_DTOS_REFERENCE = new TypeReference<List<CompactionOpDTO>>(){};
    private static final TypeReference<List<ClusteringOpDTO>> CLUSTERING_OP_DTOS_REFERENCE = new TypeReference<List<ClusteringOpDTO>>(){};
    private static final TypeReference<List<InstantDTO>> INSTANT_DTOS_REFERENCE = new TypeReference<List<InstantDTO>>(){};
    private static final TypeReference<TimelineDTO> TIMELINE_DTO_REFERENCE = new TypeReference<TimelineDTO>(){};
    private static final TypeReference<List<BaseFileDTO>> BASE_FILE_DTOS_REFERENCE = new TypeReference<List<BaseFileDTO>>(){};
    private static final TypeReference<Map<String, List<BaseFileDTO>>> BASE_FILE_MAP_REFERENCE = new TypeReference<Map<String, List<BaseFileDTO>>>(){};
    private static final TypeReference<Map<String, List<FileSliceDTO>>> FILE_SLICE_MAP_REFERENCE = new TypeReference<Map<String, List<FileSliceDTO>>>(){};
    private final String serverHost;
    private final int serverPort;
    private final String basePath;
    private final HoodieTableMetaClient metaClient;
    private HoodieTimeline timeline;
    private final int timeoutMs;
    private boolean closed = false;
    private RetryHelper<Response, IOException> retryHelper;

    public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
        this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
    }

    public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
        this.basePath = metaClient.getBasePath();
        this.metaClient = metaClient;
        this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        this.serverHost = viewConf.getRemoteViewServerHost();
        this.serverPort = viewConf.getRemoteViewServerPort();
        this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
        if (viewConf.isRemoteTimelineClientRetryEnabled()) {
            this.retryHelper = new RetryHelper(viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), viewConf.getRemoteTimelineClientMaxRetryNumbers(), viewConf.getRemoteTimelineInitialRetryIntervalMs(), viewConf.getRemoteTimelineClientRetryExceptions(), "Sending request");
        }
    }

    private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference<T> reference, RequestMethod method) throws IOException {
        ValidationUtils.checkArgument(!this.closed, "View already closed");
        URIBuilder builder = new URIBuilder().setHost(this.serverHost).setPort(this.serverPort).setPath(requestPath).setScheme("http");
        queryParameters.forEach(builder::addParameter);
        this.timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
        builder.addParameter(TIMELINE_HASH, this.timeline.getTimelineHash());
        String url2 = builder.toString();
        LOG.info("Sending request : (" + url2 + ")");
        Response response = this.retryHelper != null ? this.retryHelper.start(() -> this.get(this.timeoutMs, url2, method)) : this.get(this.timeoutMs, url2, method);
        String content = response.returnContent().asString(Consts.UTF_8);
        return OBJECT_MAPPER.readValue(content, reference);
    }

    private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(PARTITION_PARAM, partitionPath);
        return paramsMap;
    }

    private Map<String, String> getParams() {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        return paramsMap;
    }

    private Map<String, String> getParams(String paramName, String instant) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(paramName, instant);
        return paramsMap;
    }

    private Map<String, String> getParamsWithAdditionalParam(String partitionPath, String paramName, String paramVal) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(PARTITION_PARAM, partitionPath);
        paramsMap.put(paramName, paramVal);
        return paramsMap;
    }

    private Map<String, String> getParamsWithAdditionalParams(String partitionPath, String[] paramNames, String[] paramVals) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(PARTITION_PARAM, partitionPath);
        ValidationUtils.checkArgument(paramNames.length == paramVals.length);
        for (int i = 0; i < paramNames.length; ++i) {
            paramsMap.put(paramNames[i], paramVals[i]);
        }
        return paramsMap;
    }

    @Override
    public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        return this.getLatestBaseFilesFromParams(paramsMap, LATEST_PARTITION_DATA_FILES_URL);
    }

    @Override
    public Stream<HoodieBaseFile> getLatestBaseFiles() {
        Map<String, String> paramsMap = this.getParams();
        return this.getLatestBaseFilesFromParams(paramsMap, LATEST_ALL_DATA_FILES);
    }

    private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(Map<String, String> paramsMap, String requestPath) {
        try {
            List<BaseFileDTO> dataFiles = this.executeRequest(requestPath, paramsMap, BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitionPath, String maxCommitTime) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
        return this.getLatestBaseFilesFromParams(paramsMap, LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
    }

    @Override
    public Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
        try {
            Map<String, List<BaseFileDTO>> dataFileMap = this.executeRequest(ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL, paramsMap, BASE_FILE_MAP_REFERENCE, RequestMethod.GET);
            return dataFileMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((List)entry.getValue()).stream().map(BaseFileDTO::toHoodieBaseFile)));
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String instantTime, String fileId) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParams(partitionPath, new String[]{INSTANT_PARAM, FILEID_PARAM}, new String[]{instantTime, fileId});
        try {
            List<BaseFileDTO> dataFiles = this.executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap, BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
            return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commitsToReturn) {
        Map<String, String> paramsMap = this.getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ","));
        return this.getLatestBaseFilesFromParams(paramsMap, LATEST_DATA_FILES_RANGE_INSTANT_URL);
    }

    @Override
    public Stream<HoodieBaseFile> getAllBaseFiles(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        return this.getLatestBaseFilesFromParams(paramsMap, ALL_DATA_FILES);
    }

    @Override
    public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap, new TypeReference<List<FileSliceDTO>>(){}, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst());
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, boolean includeFileSlicesInPendingCompaction) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParams(partitionPath, new String[]{MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM}, new String[]{maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)});
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Map<String, Stream<FileSlice>> getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put(BASEPATH_PARAM, this.basePath);
        paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
        try {
            Map<String, List<FileSliceDTO>> fileSliceMap = this.executeRequest(ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, FILE_SLICE_MAP_REFERENCE, RequestMethod.GET);
            return fileSliceMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((List)entry.getValue()).stream().map(FileSliceDTO::toFileSlice)));
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
        Map<String, String> paramsMap = this.getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ","));
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<FileSlice> getAllFileSlices(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileSliceDTO> dataFiles = this.executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
            return dataFiles.stream().map(FileSliceDTO::toFileSlice);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap, FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap, new TypeReference<List<FileGroupDTO>>(){}, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap, FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap, FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTime, String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap, FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
        Map<String, String> paramsMap = this.getParamsWithPartitionPath(partitionPath);
        try {
            List<FileGroupDTO> fileGroups = this.executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap, FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
            return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    public boolean refresh() {
        Map<String, String> paramsMap = this.getParams();
        try {
            this.timeline = this.metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
            return this.executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Void loadAllPartitions() {
        Map<String, String> paramsMap = this.getParams();
        try {
            this.executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
            return null;
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations() {
        Map<String, String> paramsMap = this.getParams();
        try {
            List<CompactionOpDTO> dtos = this.executeRequest(PENDING_COMPACTION_OPS, paramsMap, COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
            return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<Pair<String, CompactionOperation>> getPendingLogCompactionOperations() {
        Map<String, String> paramsMap = this.getParams();
        try {
            List<CompactionOpDTO> dtos = this.executeRequest(PENDING_LOG_COMPACTION_OPS, paramsMap, COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
            return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() {
        Map<String, String> paramsMap = this.getParams();
        try {
            List<ClusteringOpDTO> dtos = this.executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap, CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET);
            return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public void close() {
        this.closed = true;
    }

    @Override
    public void reset() {
        this.refresh();
    }

    @Override
    public Option<HoodieInstant> getLastInstant() {
        Map<String, String> paramsMap = this.getParams();
        try {
            List<InstantDTO> instants = this.executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE, RequestMethod.GET);
            return Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst());
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public HoodieTimeline getTimeline() {
        Map<String, String> paramsMap = this.getParams();
        try {
            TimelineDTO timeline = this.executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE, RequestMethod.GET);
            return TimelineDTO.toTimeline(timeline, this.metaClient);
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    @Override
    public void sync() {
        this.refresh();
    }

    @Override
    public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fileId) {
        Map<String, String> paramsMap = this.getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
        try {
            List<BaseFileDTO> dataFiles = this.executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap, BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
            return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
        }
        catch (IOException e) {
            throw new HoodieRemoteException(e);
        }
    }

    private Response get(int timeoutMs, String url2, RequestMethod method) throws IOException {
        switch (method) {
            case GET: {
                return Request.Get(url2).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
            }
        }
        return Request.Post(url2).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
    }

    private static enum RequestMethod {
        GET,
        POST;

    }
}

