/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveStringifier;
import org.apache.parquet.schema.PrimitiveType;

public class ParquetUtils
extends BaseFileUtils {
    private static final Logger LOG = LogManager.getLogger(ParquetUtils.class);

    @Override
    public Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
        return ParquetUtils.filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema());
    }

    public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
        ParquetMetadata footer;
        try {
            footer = ParquetFileReader.readFooter((Configuration)FSUtils.getFs(parquetFilePath.toString(), conf).getConf(), (Path)parquetFilePath);
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
        }
        return footer;
    }

    private static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter, Schema readSchema) {
        Option<Object> filterFunction = Option.empty();
        if (filter != null && !filter.isEmpty()) {
            filterFunction = Option.of(new RecordKeysFilterFunction(filter));
        }
        Configuration conf = new Configuration(configuration);
        conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
        AvroReadSupport.setAvroReadSchema((Configuration)conf, (Schema)readSchema);
        AvroReadSupport.setRequestedProjection((Configuration)conf, (Schema)readSchema);
        HashSet<String> rowKeys = new HashSet<String>();
        try (ParquetReader reader = AvroParquetReader.builder((Path)filePath).withConf(conf).build();){
            Object obj = reader.read();
            while (obj != null) {
                if (obj instanceof GenericRecord) {
                    String recordKey = ((GenericRecord)obj).get("_hoodie_record_key").toString();
                    if (!filterFunction.isPresent() || ((RecordKeysFilterFunction)filterFunction.get()).apply(recordKey).booleanValue()) {
                        rowKeys.add(recordKey);
                    }
                }
                obj = reader.read();
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to read row keys from Parquet " + filePath, e);
        }
        return rowKeys;
    }

    @Override
    public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath) {
        return this.fetchHoodieKeys(configuration, filePath, Option.empty());
    }

    @Override
    public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath) {
        return this.getHoodieKeyIterator(configuration, filePath, Option.empty());
    }

    @Override
    public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
        try {
            Configuration conf = new Configuration(configuration);
            conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
            Schema readSchema = keyGeneratorOpt.map(keyGenerator -> {
                ArrayList<String> fields = new ArrayList<String>();
                fields.addAll(keyGenerator.getRecordKeyFieldNames());
                fields.addAll(keyGenerator.getPartitionPathFields());
                return HoodieAvroUtils.getSchemaForFields(this.readAvroSchema(conf, filePath), fields);
            }).orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
            AvroReadSupport.setAvroReadSchema((Configuration)conf, (Schema)readSchema);
            AvroReadSupport.setRequestedProjection((Configuration)conf, (Schema)readSchema);
            ParquetReader reader = AvroParquetReader.builder((Path)filePath).withConf(conf).build();
            return HoodieKeyIterator.getInstance(new ParquetReaderIterator<GenericRecord>(reader), keyGeneratorOpt);
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
        }
    }

    @Override
    public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
        ArrayList<HoodieKey> hoodieKeys = new ArrayList<HoodieKey>();
        try (ClosableIterator<HoodieKey> iterator = this.getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt);){
            iterator.forEachRemaining(hoodieKeys::add);
            ArrayList<HoodieKey> arrayList = hoodieKeys;
            return arrayList;
        }
    }

    public MessageType readSchema(Configuration configuration, Path parquetFilePath) {
        return ParquetUtils.readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema();
    }

    @Override
    public Map<String, String> readFooter(Configuration configuration, boolean required, Path parquetFilePath, String ... footerNames) {
        HashMap<String, String> footerVals = new HashMap<String, String>();
        ParquetMetadata footer = ParquetUtils.readMetadata(configuration, parquetFilePath);
        Map metadata = footer.getFileMetaData().getKeyValueMetaData();
        for (String footerName : footerNames) {
            if (metadata.containsKey(footerName)) {
                footerVals.put(footerName, (String)metadata.get(footerName));
                continue;
            }
            if (!required) continue;
            throw new MetadataNotFoundException("Could not find index in Parquet footer. Looked for key " + footerName + " in " + parquetFilePath);
        }
        return footerVals;
    }

    @Override
    public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) {
        MessageType parquetSchema = this.readSchema(configuration, parquetFilePath);
        return new AvroSchemaConverter(configuration).convert(parquetSchema);
    }

    @Override
    public HoodieFileFormat getFormat() {
        return HoodieFileFormat.PARQUET;
    }

    @Override
    public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
        try (ParquetReader reader = AvroParquetReader.builder((Path)filePath).withConf(configuration).build();){
            Object obj = reader.read();
            while (obj != null) {
                if (obj instanceof GenericRecord) {
                    records.add((GenericRecord)obj);
                }
                obj = reader.read();
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to read avro records from Parquet " + filePath, e);
        }
        return records;
    }

    @Override
    public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema schema) {
        AvroReadSupport.setAvroReadSchema((Configuration)configuration, (Schema)schema);
        return this.readAvroRecords(configuration, filePath);
    }

    @Override
    public long getRowCount(Configuration conf, Path parquetFilePath) {
        long rowCount = 0L;
        ParquetMetadata footer = ParquetUtils.readMetadata(conf, parquetFilePath);
        for (BlockMetaData b : footer.getBlocks()) {
            rowCount += b.getRowCount();
        }
        return rowCount;
    }

    public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(@Nonnull Configuration conf, @Nonnull Path parquetFilePath, @Nonnull List<String> cols) {
        ParquetMetadata metadata = ParquetUtils.readMetadata(conf, parquetFilePath);
        Collector<HoodieColumnRangeMetadata, ?, Map<String, List<HoodieColumnRangeMetadata>>> groupingByCollector = Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
        Map<String, List<HoodieColumnRangeMetadata>> columnToStatsListMap = ((Stream)metadata.getBlocks().stream().sequential()).flatMap(blockMetaData -> blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> HoodieColumnRangeMetadata.create(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), ParquetUtils.convertToNativeJavaType(columnChunkMetaData.getPrimitiveType(), columnChunkMetaData.getStatistics().genericGetMin()), ParquetUtils.convertToNativeJavaType(columnChunkMetaData.getPrimitiveType(), columnChunkMetaData.getStatistics().genericGetMax()), columnChunkMetaData.getStatistics().getNumNulls(), columnChunkMetaData.getValueCount(), columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalUncompressedSize()))).collect(groupingByCollector);
        Stream<HoodieColumnRangeMetadata> stream = columnToStatsListMap.values().stream().map(this::getColumnRangeInFile);
        return stream.collect(Collectors.toList());
    }

    private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInFile(@Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges) {
        if (blockRanges.size() == 1) {
            return blockRanges.get(0);
        }
        return (HoodieColumnRangeMetadata)((Stream)blockRanges.stream().sequential()).reduce(this::combineRanges).get();
    }

    private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(HoodieColumnRangeMetadata<T> one, HoodieColumnRangeMetadata<T> another) {
        T minValue = one.getMinValue() != null && another.getMinValue() != null ? (one.getMinValue().compareTo(another.getMinValue()) < 0 ? one.getMinValue() : another.getMinValue()) : (one.getMinValue() == null ? another.getMinValue() : one.getMinValue());
        T maxValue = one.getMaxValue() != null && another.getMaxValue() != null ? (one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? another.getMaxValue() : one.getMaxValue()) : (one.getMaxValue() == null ? another.getMaxValue() : one.getMaxValue());
        return HoodieColumnRangeMetadata.create(one.getFilePath(), one.getColumnName(), minValue, maxValue, one.getNullCount() + another.getNullCount(), one.getValueCount() + another.getValueCount(), one.getTotalSize() + another.getTotalSize(), one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) {
        if (val == null) {
            return null;
        }
        if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
            return ParquetUtils.extractDecimal(val, primitiveType.getDecimalMetadata());
        }
        if (primitiveType.getOriginalType() == OriginalType.DATE) {
            PrimitiveStringifier primitiveStringifier = primitiveType.stringifier();
            synchronized (primitiveStringifier) {
                return Date.valueOf(primitiveType.stringifier().stringify(((Integer)val).intValue()));
            }
        }
        if (primitiveType.getOriginalType() == OriginalType.UTF8) {
            return ((Binary)val).toStringUsingUTF8();
        }
        if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
            return ((Binary)val).toByteBuffer();
        }
        return val;
    }

    @Nonnull
    private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMetadata) {
        int scale = decimalMetadata.getScale();
        if (val == null) {
            return null;
        }
        if (val instanceof Integer) {
            return BigDecimal.valueOf(((Integer)val).intValue(), scale);
        }
        if (val instanceof Long) {
            return BigDecimal.valueOf((Long)val, scale);
        }
        if (val instanceof Binary) {
            return new BigDecimal(new BigInteger(((Binary)val).getBytesUnsafe()), scale);
        }
        throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName()));
    }

    private static class HoodieKeyIterator
    implements ClosableIterator<HoodieKey> {
        private final ClosableIterator<GenericRecord> nestedItr;
        private final Function<GenericRecord, HoodieKey> func;

        public static HoodieKeyIterator getInstance(ClosableIterator<GenericRecord> nestedItr, Option<BaseKeyGenerator> keyGenerator) {
            return new HoodieKeyIterator(nestedItr, keyGenerator);
        }

        private HoodieKeyIterator(ClosableIterator<GenericRecord> nestedItr, Option<BaseKeyGenerator> keyGenerator) {
            this.nestedItr = nestedItr;
            this.func = keyGenerator.isPresent() ? retVal -> {
                String recordKey = ((BaseKeyGenerator)keyGenerator.get()).getRecordKey((GenericRecord)retVal);
                String partitionPath = ((BaseKeyGenerator)keyGenerator.get()).getPartitionPath((GenericRecord)retVal);
                return new HoodieKey(recordKey, partitionPath);
            } : retVal -> {
                String recordKey = retVal.get("_hoodie_record_key").toString();
                String partitionPath = retVal.get("_hoodie_partition_path").toString();
                return new HoodieKey(recordKey, partitionPath);
            };
        }

        @Override
        public void close() {
            if (this.nestedItr != null) {
                this.nestedItr.close();
            }
        }

        @Override
        public boolean hasNext() {
            return this.nestedItr.hasNext();
        }

        @Override
        public HoodieKey next() {
            return this.func.apply((GenericRecord)this.nestedItr.next());
        }
    }

    static class RecordKeysFilterFunction
    implements Function<String, Boolean> {
        private final Set<String> candidateKeys;

        RecordKeysFilterFunction(Set<String> candidateKeys) {
            this.candidateKeys = candidateKeys;
        }

        @Override
        public Boolean apply(String recordKey) {
            return this.candidateKeys.contains(recordKey);
        }
    }
}

