/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.table.HoodieTableSink;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);
    public static final String FACTORY_ID = "hudi";

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        Path path = new Path((String)conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should not be empty.")));
        this.setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        HoodieTableFactory.setupConfOptions(conf, context.getObjectIdentifier(), (CatalogTable)context.getCatalogTable(), schema);
        return new HoodieTableSource(schema, path, context.getCatalogTable().getPartitionKeys(), conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), conf);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)), "Option [path] should not be empty.");
        this.setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        this.sanityCheck(conf, schema);
        HoodieTableFactory.setupConfOptions(conf, context.getObjectIdentifier(), (CatalogTable)context.getCatalogTable(), schema);
        return new HoodieTableSink(conf, schema);
    }

    private void setupTableOptions(String basePath, Configuration conf) {
        StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf)).ifPresent(tableConfig -> {
            if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS) && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
                conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
            }
            if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD) && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
                conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
            }
            if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE) && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
                conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE).booleanValue());
            }
        });
    }

    public String factoryIdentifier() {
        return FACTORY_ID;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.singleton(FlinkOptions.PATH);
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return FlinkOptions.optionalOptions();
    }

    private void sanityCheck(Configuration conf, ResolvedSchema schema) {
        if (!OptionsResolver.isAppendMode(conf)) {
            this.checkRecordKey(conf, schema);
            this.checkPreCombineKey(conf, schema);
        }
    }

    private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
        List fields = schema.getColumnNames();
        if (!schema.getPrimaryKey().isPresent()) {
            String[] recordKeys = ((String)conf.get(FlinkOptions.RECORD_KEY_FIELD)).split(",");
            if (recordKeys.length == 1 && ((String)FlinkOptions.RECORD_KEY_FIELD.defaultValue()).equals(recordKeys[0]) && !fields.contains(recordKeys[0])) {
                throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax or option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
            }
            Arrays.stream(recordKeys).filter(field -> !fields.contains(field)).findAny().ifPresent(f -> {
                throw new HoodieValidationException("Field '" + f + "' specified in option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema.");
            });
        }
    }

    private void checkPreCombineKey(Configuration conf, ResolvedSchema schema) {
        String preCombineField;
        List fields = schema.getColumnNames();
        if (!fields.contains(preCombineField = (String)conf.get(FlinkOptions.PRECOMBINE_FIELD))) {
            if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
                throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
            }
            if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
                conf.setString(FlinkOptions.PRECOMBINE_FIELD, "no_precombine");
            } else if (!preCombineField.equals("no_precombine")) {
                throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema.Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
            }
        }
    }

    private static void setupConfOptions(Configuration conf, ObjectIdentifier tablePath, CatalogTable table, ResolvedSchema schema) {
        conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
        HoodieTableFactory.setupHoodieKeyOptions(conf, table);
        HoodieTableFactory.setupCompactionOptions(conf);
        HoodieTableFactory.setupHiveOptions(conf, tablePath);
        HoodieTableFactory.setupReadOptions(conf);
        HoodieTableFactory.setupWriteOptions(conf);
        HoodieTableFactory.inferAvroSchema(conf, ((DataType)schema.toPhysicalRowDataType().notNull()).getLogicalType());
    }

    private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table) {
        boolean complexHoodieKey;
        List partitionKeys;
        List pkColumns = table.getSchema().getPrimaryKey().map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
        if (pkColumns.size() > 0) {
            String recordKey = String.join((CharSequence)",", pkColumns);
            conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
        }
        if ((partitionKeys = table.getPartitionKeys()).size() > 0) {
            conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join((CharSequence)",", partitionKeys));
        }
        if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) {
            if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
                conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
            } else {
                Set indexKeySet;
                Set recordKeySet = Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet());
                if (!recordKeySet.containsAll(indexKeySet = Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet()))) {
                    throw new HoodieValidationException(FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields");
                }
            }
        }
        String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
        String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
        if (partitions.length == 1) {
            String partitionField = partitions[0];
            if (partitionField.isEmpty()) {
                conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
                LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", (Object)FlinkOptions.KEYGEN_CLASS_NAME.key(), (Object)NonpartitionedAvroKeyGenerator.class.getName());
                return;
            }
            DataType partitionFieldType = (DataType)table.getSchema().getFieldDataType(partitionField).orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist"));
            if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) {
                HoodieTableFactory.setupTimestampKeygenOptions(conf, partitionFieldType);
                return;
            }
        }
        boolean bl = complexHoodieKey = pks.length > 1 || partitions.length > 1;
        if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) {
            conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
            LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", (Object)FlinkOptions.KEYGEN_CLASS_NAME.key(), (Object)ComplexAvroKeyGenerator.class.getName());
        }
    }

    public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) {
        if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
            return;
        }
        conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
        LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on", (Object)FlinkOptions.KEYGEN_CLASS_NAME.key(), (Object)TimestampBasedAvroKeyGenerator.class.getName());
        if (DataTypeUtils.isTimestampType(fieldType)) {
            int precision = DataTypeUtils.precision(fieldType.getLogicalType());
            if (precision == 0) {
                conf.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type", TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name());
            } else if (precision == 3) {
                conf.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type", TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
            }
            String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse("yyyyMMddHH");
            conf.setString("hoodie.deltastreamer.keygen.timebased.output.dateformat", outputPartitionFormat);
        } else {
            conf.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type", TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
            conf.setString("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", TimeUnit.DAYS.toString());
            String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse("yyyyMMdd");
            conf.setString("hoodie.deltastreamer.keygen.timebased.output.dateformat", outputPartitionFormat);
            conf.setString("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyyMMdd");
        }
        conf.setString("hoodie.deltastreamer.keygen.timebased.output.timezone", "UTC");
    }

    private static void setupCompactionOptions(Configuration conf) {
        int minCommitsToKeep;
        int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
        if (commitsToRetain >= (minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS))) {
            LOG.info("Table option [{}] is reset to {} to be greater than {}={},\nto avoid risk of missing data from few instants in incremental pull", new Object[]{FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain + 10, FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain});
            conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
            conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
        }
    }

    private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) {
        if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
            conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
        }
        if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
            conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
        }
    }

    private static void setupReadOptions(Configuration conf) {
        if (OptionsResolver.isIncrementalQuery(conf)) {
            conf.setString(FlinkOptions.QUERY_TYPE, "incremental");
        }
    }

    private static void setupWriteOptions(Configuration conf) {
        if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) && OptionsResolver.isCowTable(conf)) {
            conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        }
    }

    private static void inferAvroSchema(Configuration conf, LogicalType rowType) {
        if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent() && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
            String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString();
            conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
        }
    }
}

