/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree.compact;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactTask;
import org.apache.flink.table.store.file.compact.CompactUnit;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;

public class MergeTreeCompactTask
extends CompactTask {
    private final long minFileSize;
    private final CompactRewriter rewriter;
    private final int outputLevel;
    private final List<List<SortedRun>> partitioned;
    private final boolean dropDelete;
    private int upgradeFilesNum;

    public MergeTreeCompactTask(Comparator<RowData> keyComparator, long minFileSize, CompactRewriter rewriter, CompactUnit unit, boolean dropDelete) {
        super(unit.files());
        this.minFileSize = minFileSize;
        this.rewriter = rewriter;
        this.outputLevel = unit.outputLevel();
        this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition();
        this.dropDelete = dropDelete;
        this.upgradeFilesNum = 0;
    }

    @Override
    protected CompactResult doCompact(List<DataFileMeta> inputs) throws Exception {
        ArrayList<List<SortedRun>> candidate = new ArrayList<List<SortedRun>>();
        final ArrayList<DataFileMeta> compactBefore = new ArrayList<DataFileMeta>();
        final ArrayList<DataFileMeta> compactAfter = new ArrayList<DataFileMeta>();
        for (List<SortedRun> section : this.partitioned) {
            if (section.size() > 1) {
                candidate.add(section);
                continue;
            }
            SortedRun run = section.get(0);
            for (DataFileMeta file : run.files()) {
                if (file.fileSize() < this.minFileSize) {
                    candidate.add(Collections.singletonList(SortedRun.fromSingle(file)));
                    continue;
                }
                this.rewrite(candidate, compactBefore, compactAfter);
                this.upgrade(file, compactBefore, compactAfter);
            }
        }
        this.rewrite(candidate, compactBefore, compactAfter);
        return new CompactResult(){

            @Override
            public List<DataFileMeta> before() {
                return compactBefore;
            }

            @Override
            public List<DataFileMeta> after() {
                return compactAfter;
            }
        };
    }

    @Override
    protected String logMetric(long startMillis, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) {
        return String.format("%s, upgrade file num = %d", super.logMetric(startMillis, compactBefore, compactAfter), this.upgradeFilesNum);
    }

    private void upgrade(DataFileMeta file, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) {
        if (file.level() != this.outputLevel) {
            compactBefore.add(file);
            compactAfter.add(file.upgrade(this.outputLevel));
            ++this.upgradeFilesNum;
        }
    }

    private void rewrite(List<List<SortedRun>> candidate, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) throws Exception {
        if (candidate.isEmpty()) {
            return;
        }
        if (candidate.size() == 1) {
            List<SortedRun> section = candidate.get(0);
            if (section.size() == 0) {
                return;
            }
            if (section.size() == 1) {
                for (DataFileMeta file : section.get(0).files()) {
                    this.upgrade(file, compactBefore, compactAfter);
                }
                candidate.clear();
                return;
            }
        }
        candidate.forEach(runs -> runs.forEach(run -> compactBefore.addAll(run.files())));
        List<DataFileMeta> result = this.rewriter.rewrite(this.outputLevel, this.dropDelete, candidate);
        compactAfter.addAll(result);
        candidate.clear();
    }
}

