/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AsyncClusteringService
extends HoodieAsyncTableService {
    public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncClusteringService.class);
    private final int maxConcurrentClustering;
    protected transient HoodieEngineContext context;
    private transient BaseClusterer clusteringClient;

    public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
        this(context, writeClient, false);
    }

    public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient, boolean runInDaemonMode) {
        super(writeClient.getConfig(), runInDaemonMode);
        this.clusteringClient = this.createClusteringClient(writeClient);
        this.maxConcurrentClustering = 1;
        this.context = context;
    }

    protected abstract BaseClusterer createClusteringClient(BaseHoodieWriteClient var1);

    @Override
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService executor = Executors.newFixedThreadPool(this.maxConcurrentClustering, new CustomizedThreadFactory("async_clustering_thread", this.isRunInDaemonMode()));
        return Pair.of(CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, this.maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                LOG.info("Setting pool name for clustering to hoodiecluster");
                this.context.setProperty(EngineProperty.CLUSTERING_POOL_NAME, CLUSTERING_POOL_NAME);
                while (!this.isShutdownRequested()) {
                    HoodieInstant instant = this.fetchNextAsyncServiceInstant();
                    if (null == instant) continue;
                    LOG.info("Starting clustering for instant " + instant);
                    this.clusteringClient.cluster(instant);
                    LOG.info("Finished clustering for instant " + instant);
                }
                LOG.info("Clustering executor shutting down properly");
            }
            catch (InterruptedException ie) {
                this.hasError = true;
                LOG.warn("Clustering executor got interrupted exception! Stopping", (Throwable)ie);
            }
            catch (IOException e) {
                this.hasError = true;
                LOG.error("Clustering executor failed due to IOException", (Throwable)e);
                throw new HoodieIOException(e.getMessage(), e);
            }
            catch (Exception e) {
                this.hasError = true;
                LOG.error("Clustering executor failed", (Throwable)e);
                throw e;
            }
            return true;
        }, executor)).toArray(CompletableFuture[]::new)), executor);
    }

    public synchronized void updateWriteClient(BaseHoodieWriteClient writeClient) {
        this.clusteringClient.updateWriteClient(writeClient);
    }
}

