/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.CommittableStateManager;
import org.apache.flink.table.store.connector.sink.Committer;
import org.apache.flink.table.store.connector.sink.StateUtils;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;

public class CommitterOperator
extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Committable, Committable>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Deque<Committable> inputs = new ArrayDeque<Committable>();
    private final boolean streamingCheckpointEnabled;
    private final String initialCommitUser;
    protected final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
    private final SerializableFunction<String, Committer> committerFactory;
    private final CommittableStateManager committableStateManager;
    protected Committer committer;
    private boolean endInput = false;

    public CommitterOperator(boolean streamingCheckpointEnabled, String initialCommitUser, SerializableFunction<String, Committer> committerFactory, CommittableStateManager committableStateManager) {
        this.streamingCheckpointEnabled = streamingCheckpointEnabled;
        this.initialCommitUser = initialCommitUser;
        this.committablesPerCheckpoint = new TreeMap<Long, ManifestCommittable>();
        this.committerFactory = (SerializableFunction)Preconditions.checkNotNull(committerFactory);
        this.committableStateManager = committableStateManager;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        String commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        this.committer = (Committer)this.committerFactory.apply((Object)commitUser);
        this.committableStateManager.initializeState(context, this.committer);
    }

    private ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs) throws Exception {
        return this.committer.combine(checkpoint, inputs);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.pollInputs();
        this.committableStateManager.snapshotState(context, this.committables(this.committablesPerCheckpoint));
    }

    private List<ManifestCommittable> committables(NavigableMap<Long, ManifestCommittable> map) {
        return new ArrayList<ManifestCommittable>(map.values());
    }

    public void endInput() throws Exception {
        this.endInput = true;
        if (this.streamingCheckpointEnabled) {
            return;
        }
        this.pollInputs();
        this.commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.commitUpToCheckpoint(this.endInput ? Long.MAX_VALUE : checkpointId);
    }

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        NavigableMap<Long, ManifestCommittable> headMap = this.committablesPerCheckpoint.headMap(checkpointId, true);
        this.committer.commit(this.committables(headMap));
        headMap.clear();
    }

    public void processElement(StreamRecord<Committable> element) {
        this.output.collect(element);
        this.inputs.add((Committable)element.getValue());
    }

    public void close() throws Exception {
        this.committablesPerCheckpoint.clear();
        this.inputs.clear();
        super.close();
    }

    private void pollInputs() throws Exception {
        HashMap<Long, List> grouped = new HashMap<Long, List>();
        for (Committable committable : this.inputs) {
            grouped.computeIfAbsent(committable.checkpointId(), k -> new ArrayList()).add(committable);
        }
        for (Map.Entry entry : grouped.entrySet()) {
            Long cp = (Long)entry.getKey();
            List committables = (List)entry.getValue();
            if (this.committablesPerCheckpoint.containsKey(cp)) {
                throw new RuntimeException(String.format("Repeatedly commit the same checkpoint files. \nThe previous files is %s, \nand the subsequent files is %s", this.committablesPerCheckpoint.get(cp), committables));
            }
            this.committablesPerCheckpoint.put(cp, this.toCommittables(cp, committables));
        }
        this.inputs.clear();
    }
}

