/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bucket;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsistentBucketAssignFunction
extends ProcessFunction<HoodieRecord, HoodieRecord>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ConsistentBucketAssignFunction.class);
    private final Configuration config;
    private final List<String> indexKeyFields;
    private final int bucketNum;
    private transient HoodieFlinkWriteClient writeClient;
    private transient Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
    private transient String lastRefreshInstant = "00000000000000";
    private final int maxRetries = 10;
    private final long maxWaitTimeInMs = 1000L;

    public ConsistentBucketAssignFunction(Configuration conf) {
        this.config = conf;
        this.indexKeyFields = Arrays.asList(OptionsResolver.getIndexKeyField(conf).split(","));
        this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
    }

    public void open(Configuration parameters) throws Exception {
        try {
            this.writeClient = FlinkWriteClients.createWriteClient(this.config, this.getRuntimeContext());
            this.partitionToIdentifier = new HashMap<String, ConsistentBucketIdentifier>();
        }
        catch (Throwable e) {
            LOG.error("Fail to initialize consistent bucket assigner", e);
            throw new RuntimeException(e);
        }
    }

    public void processElement(HoodieRecord record, ProcessFunction.Context context, Collector<HoodieRecord> collector) throws Exception {
        HoodieKey hoodieKey = record.getKey();
        String partition = hoodieKey.getPartitionPath();
        ConsistentHashingNode node = this.getBucketIdentifier(partition).getBucket(hoodieKey, this.indexKeyFields);
        Preconditions.checkArgument((boolean)StringUtils.nonEmpty(node.getFileIdPrefix()), (Object)("Consistent hashing node has no file group, partition: " + partition + ", meta: " + this.partitionToIdentifier.get(partition).getMetadata().getFilename() + ", record_key: " + hoodieKey));
        record.unseal();
        record.setCurrentLocation(new HoodieRecordLocation("U", FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
        record.seal();
        collector.collect((Object)record);
    }

    private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
        return this.partitionToIdentifier.computeIfAbsent(partition, p -> {
            int retryCount = 0;
            HoodieConsistentHashingMetadata metadata = null;
            while (retryCount <= 10) {
                try {
                    metadata = ConsistentBucketIndexUtils.loadOrCreateMetadata(this.writeClient.getHoodieTable(), p, this.bucketNum);
                    break;
                }
                catch (Exception e) {
                    if (retryCount >= 10) {
                        throw new HoodieLockException("Fail to load or create metadata for partition " + partition, e);
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    LOG.info("Retrying to load or create metadata for partition {} for {} times", (Object)partition, (Object)(retryCount + 1));
                }
                finally {
                    ++retryCount;
                }
            }
            ValidationUtils.checkState(metadata != null);
            return new ConsistentBucketIdentifier(metadata);
        });
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        HoodieTimeline timeline = this.writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(this.lastRefreshInstant);
        if (!timeline.empty()) {
            for (HoodieInstant instant : timeline.getInstants()) {
                HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
                Set<String> affectedPartitions = commitMetadata.getPartitionToReplaceFileIds().keySet();
                LOG.info("Clear up cached hashing metadata because find a new replace commit.\n Instant: {}.\n Effected Partitions: {}.", (Object)this.lastRefreshInstant, affectedPartitions);
                affectedPartitions.forEach(this.partitionToIdentifier::remove);
            }
            this.lastRefreshInstant = timeline.lastInstant().get().getTimestamp();
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }
}

