/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusUpdateListener;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotInformation;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSlotTracker
implements SlotTracker {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSlotTracker.class);
    private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new HashMap<SlotID, DeclarativeTaskManagerSlot>();
    private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new LinkedHashMap<SlotID, DeclarativeTaskManagerSlot>();
    private final MultiSlotStatusUpdateListener slotStatusUpdateListeners = new MultiSlotStatusUpdateListener();
    private final SlotStatusStateReconciler slotStatusStateReconciler = new SlotStatusStateReconciler(this::transitionSlotToFree, this::transitionSlotToPending, this::transitionSlotToAllocated);

    @Override
    public void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener) {
        this.slotStatusUpdateListeners.registerSlotStatusUpdateListener(slotStatusUpdateListener);
    }

    @Override
    public void addSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection, @Nullable JobID assignedJob) {
        Preconditions.checkNotNull(slotId);
        Preconditions.checkNotNull(resourceProfile);
        Preconditions.checkNotNull(taskManagerConnection);
        if (this.slots.containsKey(slotId)) {
            LOG.debug("A slot was added with an already tracked slot ID {}. Removing previous entry.", (Object)slotId);
            this.removeSlot(slotId);
        }
        DeclarativeTaskManagerSlot slot = new DeclarativeTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
        this.slots.put(slotId, slot);
        this.freeSlots.put(slotId, slot);
        this.slotStatusStateReconciler.executeStateTransition(slot, assignedJob);
    }

    @Override
    public void removeSlots(Iterable<SlotID> slotsToRemove) {
        Preconditions.checkNotNull(slotsToRemove);
        for (SlotID slotId : slotsToRemove) {
            this.removeSlot(slotId);
        }
    }

    private void removeSlot(SlotID slotId) {
        DeclarativeTaskManagerSlot slot = this.slots.remove(slotId);
        if (slot != null) {
            if (slot.getState() != SlotState.FREE) {
                this.transitionSlotToFree(slot);
            }
            this.freeSlots.remove(slotId);
        } else {
            LOG.debug("There was no slot registered with slot id {}.", (Object)slotId);
        }
    }

    @Override
    public void notifyFree(SlotID slotId) {
        Preconditions.checkNotNull(slotId);
        this.transitionSlotToFree(this.slots.get(slotId));
    }

    @Override
    public void notifyAllocationStart(SlotID slotId, JobID jobId) {
        Preconditions.checkNotNull(slotId);
        Preconditions.checkNotNull(jobId);
        this.transitionSlotToPending(this.slots.get(slotId), jobId);
    }

    @Override
    public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
        Preconditions.checkNotNull(slotId);
        Preconditions.checkNotNull(jobId);
        this.transitionSlotToAllocated(this.slots.get(slotId), jobId);
    }

    @Override
    public boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
        Preconditions.checkNotNull(slotStatuses);
        boolean anyStatusChanged = false;
        for (SlotStatus slotStatus : slotStatuses) {
            anyStatusChanged |= this.slotStatusStateReconciler.executeStateTransition(this.slots.get(slotStatus.getSlotID()), slotStatus.getJobID());
        }
        return anyStatusChanged;
    }

    private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
        Preconditions.checkNotNull(slot);
        Preconditions.checkState(slot.getState() != SlotState.FREE);
        JobID jobId = slot.getJobId();
        SlotState state = slot.getState();
        slot.freeSlot();
        this.freeSlots.put(slot.getSlotId(), slot);
        this.slotStatusUpdateListeners.notifySlotStatusChange(slot, state, SlotState.FREE, jobId);
    }

    private void transitionSlotToPending(DeclarativeTaskManagerSlot slot, JobID jobId) {
        Preconditions.checkNotNull(slot);
        Preconditions.checkState(slot.getState() == SlotState.FREE);
        slot.startAllocation(jobId);
        this.freeSlots.remove(slot.getSlotId());
        this.slotStatusUpdateListeners.notifySlotStatusChange(slot, SlotState.FREE, SlotState.PENDING, jobId);
    }

    private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, JobID jobId) {
        Preconditions.checkNotNull(slot);
        Preconditions.checkState(jobId.equals(slot.getJobId()), "Job ID from slot status update (%s) does not match currently assigned job ID (%s) for slot %s.", jobId, slot.getJobId(), slot.getSlotId());
        Preconditions.checkState(slot.getState() == SlotState.PENDING, "State of slot %s must be %s, but was %s.", new Object[]{slot.getSlotId(), SlotState.PENDING, slot.getState()});
        slot.completeAllocation();
        this.slotStatusUpdateListeners.notifySlotStatusChange(slot, SlotState.PENDING, SlotState.ALLOCATED, jobId);
    }

    @Override
    public Collection<TaskManagerSlotInformation> getFreeSlots() {
        return Collections.unmodifiableCollection(this.freeSlots.values());
    }

    @Override
    public Collection<TaskExecutorConnection> getTaskExecutorsWithAllocatedSlotsForJob(JobID jobId) {
        HashMap<InstanceID, TaskExecutorConnection> taskExecutorConnections = new HashMap<InstanceID, TaskExecutorConnection>();
        for (DeclarativeTaskManagerSlot value : this.slots.values()) {
            if (!jobId.equals(value.getJobId())) continue;
            taskExecutorConnections.put(value.getInstanceId(), value.getTaskManagerConnection());
        }
        return taskExecutorConnections.values();
    }

    @VisibleForTesting
    boolean areMapsEmpty() {
        return this.slots.isEmpty() && this.freeSlots.isEmpty();
    }

    @Nullable
    @VisibleForTesting
    DeclarativeTaskManagerSlot getSlot(SlotID slotId) {
        return this.slots.get(slotId);
    }

    private static class MultiSlotStatusUpdateListener
    implements SlotStatusUpdateListener {
        private final Collection<SlotStatusUpdateListener> listeners = new ArrayList<SlotStatusUpdateListener>();

        private MultiSlotStatusUpdateListener() {
        }

        public void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener) {
            this.listeners.add(slotStatusUpdateListener);
        }

        @Override
        public void notifySlotStatusChange(TaskManagerSlotInformation slot, SlotState previous, SlotState current, JobID jobId) {
            LOG.trace("Slot {} transitioned from {} to {} for job {}.", new Object[]{slot.getSlotId(), previous, current, jobId});
            this.listeners.forEach(listeners -> listeners.notifySlotStatusChange(slot, previous, current, jobId));
        }
    }

    @VisibleForTesting
    static class SlotStatusStateReconciler {
        private final Consumer<DeclarativeTaskManagerSlot> toFreeSlot;
        private final BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot;
        private final BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot;

        @VisibleForTesting
        SlotStatusStateReconciler(Consumer<DeclarativeTaskManagerSlot> toFreeSlot, BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot, BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot) {
            this.toFreeSlot = toFreeSlot;
            this.toPendingSlot = toPendingSlot;
            this.toAllocatedSlot = toAllocatedSlot;
        }

        public boolean executeStateTransition(DeclarativeTaskManagerSlot slot, JobID jobId) {
            SlotState reportedSlotState = jobId == null ? SlotState.FREE : SlotState.ALLOCATED;
            SlotState trackedSlotState = slot.getState();
            if (reportedSlotState == SlotState.FREE) {
                switch (trackedSlotState) {
                    case FREE: {
                        return false;
                    }
                    case PENDING: {
                        return false;
                    }
                    case ALLOCATED: {
                        this.toFreeSlot.accept(slot);
                        return true;
                    }
                }
            } else {
                switch (trackedSlotState) {
                    case FREE: {
                        this.toPendingSlot.accept(slot, jobId);
                        this.toAllocatedSlot.accept(slot, jobId);
                        return true;
                    }
                    case PENDING: {
                        if (!jobId.equals(slot.getJobId())) {
                            this.toFreeSlot.accept(slot);
                            this.toPendingSlot.accept(slot, jobId);
                        }
                        this.toAllocatedSlot.accept(slot, jobId);
                        return true;
                    }
                    case ALLOCATED: {
                        if (!jobId.equals(slot.getJobId())) {
                            this.toFreeSlot.accept(slot);
                            this.toPendingSlot.accept(slot, jobId);
                            this.toAllocatedSlot.accept(slot, jobId);
                            return true;
                        }
                        return false;
                    }
                }
            }
            return false;
        }
    }
}

