/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ServerSocketFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServerConnection;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCleanupTask;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobServer
extends Thread
implements BlobService,
BlobWriter,
PermanentBlobService,
TransientBlobService,
LocallyCleanableResource,
GloballyCleanableResource {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    private final AtomicLong tempFileCounter = new AtomicLong(0L);
    private final ServerSocket serverSocket;
    private final Configuration blobServiceConfiguration;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Reference<File> storageDir;
    private final BlobStore blobStore;
    private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
    private final int maxConnections;
    private final ReadWriteLock readWriteLock;
    private final Thread shutdownHook;
    private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = new ConcurrentHashMap();
    private final long cleanupInterval;
    private final Timer cleanupTimer;

    @VisibleForTesting
    public BlobServer(Configuration config, File storageDir, BlobStore blobStore) throws IOException {
        this(config, Reference.owned(storageDir), blobStore);
    }

    public BlobServer(Configuration config, Reference<File> storageDir, BlobStore blobStore) throws IOException {
        ServerSocketFactory socketFactory;
        this.blobServiceConfiguration = Preconditions.checkNotNull(config);
        this.blobStore = Preconditions.checkNotNull(blobStore);
        this.readWriteLock = new ReentrantReadWriteLock();
        this.storageDir = storageDir;
        LOG.info("Created BLOB server storage directory {}", storageDir);
        int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
        if (maxConnections >= 1) {
            this.maxConnections = maxConnections;
        } else {
            LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", (Object)maxConnections, (Object)BlobServerOptions.FETCH_CONCURRENT.defaultValue());
            this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue();
        }
        int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
        if (backlog < 1) {
            LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", (Object)backlog, (Object)BlobServerOptions.FETCH_BACKLOG.defaultValue());
            backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
        }
        this.cleanupTimer = new Timer(true);
        this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000L;
        this.cleanupTimer.schedule((TimerTask)new TransientBlobCleanupTask(this.blobExpiryTimes, this::deleteInternal, LOG), this.cleanupInterval, this.cleanupInterval);
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, this.getClass().getSimpleName(), LOG);
        String serverPortRange = config.getString(BlobServerOptions.PORT);
        Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
        if (SecurityOptions.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
            try {
                socketFactory = SSLUtils.createSSLServerSocketFactory(config);
            }
            catch (Exception e) {
                throw new IOException("Failed to initialize SSL for the blob server", e);
            }
        } else {
            socketFactory = ServerSocketFactory.getDefault();
        }
        int finalBacklog = backlog;
        String bindHost = config.getOptional(JobManagerOptions.BIND_HOST).orElseGet(NetUtils::getWildcardIPAddress);
        this.serverSocket = NetUtils.createSocketFromPorts(ports, port -> socketFactory.createServerSocket(port, finalBacklog, InetAddress.getByName(bindHost)));
        if (this.serverSocket == null) {
            throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange);
        }
        this.setName("BLOB Server listener at " + this.getPort());
        this.setDaemon(true);
        if (LOG.isInfoEnabled()) {
            LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", new Object[]{this.serverSocket.getInetAddress().getHostAddress(), this.getPort(), maxConnections, backlog});
        }
        this.checkStoredBlobsForCorruption();
        this.registerBlobExpiryTimes();
    }

    private void registerBlobExpiryTimes() throws IOException {
        if (this.storageDir.deref().exists()) {
            Collection<BlobUtils.TransientBlob> transientBlobs = BlobUtils.listTransientBlobsInDirectory(this.storageDir.deref().toPath());
            long expiryTime = System.currentTimeMillis() + this.cleanupInterval;
            for (BlobUtils.TransientBlob transientBlob : transientBlobs) {
                this.blobExpiryTimes.put(Tuple2.of(transientBlob.getJobId(), transientBlob.getBlobKey()), expiryTime);
            }
        }
    }

    private void checkStoredBlobsForCorruption() throws IOException {
        if (this.storageDir.deref().exists()) {
            BlobUtils.checkAndDeleteCorruptedBlobs(this.storageDir.deref().toPath(), LOG);
        }
    }

    public File getStorageDir() {
        return this.storageDir.deref();
    }

    @VisibleForTesting
    public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOException {
        return BlobUtils.getStorageLocation(this.storageDir.deref(), jobId, key);
    }

    File createTemporaryFilename() throws IOException {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir.deref()), String.format("temp-%08d", this.tempFileCounter.getAndIncrement()));
    }

    ReadWriteLock getReadWriteLock() {
        return this.readWriteLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            while (!this.shutdownRequested.get()) {
                Set<BlobServerConnection> set;
                BlobServerConnection conn = new BlobServerConnection(NetUtils.acceptWithoutTimeout(this.serverSocket), this);
                try {
                    set = this.activeConnections;
                    synchronized (set) {
                        while (this.activeConnections.size() >= this.maxConnections) {
                            this.activeConnections.wait(2000L);
                        }
                        this.activeConnections.add(conn);
                    }
                    conn.start();
                    conn = null;
                }
                finally {
                    if (conn == null) continue;
                    conn.close();
                    set = this.activeConnections;
                    synchronized (set) {
                        this.activeConnections.remove(conn);
                    }
                }
            }
            return;
        }
        catch (Throwable t) {
            if (this.shutdownRequested.get()) return;
            LOG.error("BLOB server stopped working. Shutting down", t);
            try {
                this.close();
                return;
            }
            catch (Throwable closeThrowable) {
                LOG.error("Could not properly close the BlobServer.", closeThrowable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        this.cleanupTimer.cancel();
        if (this.shutdownRequested.compareAndSet(false, true)) {
            Exception exception = null;
            try {
                this.serverSocket.close();
            }
            catch (IOException ioe) {
                exception = ioe;
            }
            this.interrupt();
            try {
                this.join();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.debug("Error while waiting for this thread to die.", (Throwable)ie);
            }
            Set<BlobServerConnection> ie = this.activeConnections;
            synchronized (ie) {
                if (!this.activeConnections.isEmpty()) {
                    for (BlobServerConnection conn : this.activeConnections) {
                        LOG.debug("Shutting down connection {}.", (Object)conn.getName());
                        conn.close();
                    }
                    this.activeConnections.clear();
                }
            }
            try {
                this.storageDir.owned().ifPresent(FunctionUtils.uncheckedConsumer(FileUtils::deleteDirectory));
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), LOG);
            if (LOG.isInfoEnabled()) {
                LOG.info("Stopped BLOB server at {}:{}", (Object)this.serverSocket.getInetAddress().getHostAddress(), (Object)this.getPort());
            }
            ExceptionUtils.tryRethrowIOException(exception);
        }
    }

    protected BlobClient createClient() throws IOException {
        return new BlobClient(new InetSocketAddress(this.serverSocket.getInetAddress(), this.getPort()), this.blobServiceConfiguration);
    }

    @Override
    public File getFile(TransientBlobKey key) throws IOException {
        return this.getFileInternalWithReadLock(null, key);
    }

    @Override
    public File getFile(JobID jobId, TransientBlobKey key) throws IOException {
        Preconditions.checkNotNull(jobId);
        return this.getFileInternalWithReadLock(jobId, key);
    }

    @Override
    public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
        Preconditions.checkNotNull(jobId);
        return this.getFileInternalWithReadLock(jobId, key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private File getFileInternalWithReadLock(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
        Preconditions.checkArgument(blobKey != null, "BLOB key cannot be null.");
        this.readWriteLock.readLock().lock();
        try {
            File file = this.getFileInternal(jobId, blobKey);
            return file;
        }
        finally {
            this.readWriteLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
        File localFile = BlobUtils.getStorageLocation(this.storageDir.deref(), jobId, blobKey);
        if (localFile.exists()) {
            if (blobKey instanceof TransientBlobKey) {
                this.blobExpiryTimes.put(Tuple2.of(jobId, (TransientBlobKey)blobKey), System.currentTimeMillis() + this.cleanupInterval);
            }
            return localFile;
        }
        if (blobKey instanceof PermanentBlobKey) {
            File file;
            block10: {
                this.readWriteLock.readLock().unlock();
                File incomingFile = null;
                try {
                    incomingFile = this.createTemporaryFilename();
                    this.blobStore.get(jobId, blobKey, incomingFile);
                    this.readWriteLock.writeLock().lock();
                    try {
                        BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, LOG, null);
                    }
                    finally {
                        this.readWriteLock.writeLock().unlock();
                    }
                    file = localFile;
                    if (incomingFile == null || incomingFile.delete() || !incomingFile.exists()) break block10;
                }
                catch (Throwable throwable) {
                    if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                        LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
                    }
                    this.readWriteLock.readLock().lock();
                    throw throwable;
                }
                LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
            }
            this.readWriteLock.readLock().lock();
            return file;
        }
        throw new FileNotFoundException("Local file " + localFile + " does not exist and failed to copy from blob store.");
    }

    @Override
    public TransientBlobKey putTransient(byte[] value) throws IOException {
        return (TransientBlobKey)this.putBuffer(null, value, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException {
        Preconditions.checkNotNull(jobId);
        return (TransientBlobKey)this.putBuffer(jobId, value, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(InputStream inputStream) throws IOException {
        return (TransientBlobKey)this.putInputStream(null, inputStream, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException {
        Preconditions.checkNotNull(jobId);
        return (TransientBlobKey)this.putInputStream(jobId, inputStream, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
        Preconditions.checkNotNull(jobId);
        return (PermanentBlobKey)this.putBuffer(jobId, value, BlobKey.BlobType.PERMANENT_BLOB);
    }

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
        Preconditions.checkNotNull(jobId);
        return (PermanentBlobKey)this.putInputStream(jobId, inputStream, BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType) throws IOException {
        BlobKey blobKey;
        block20: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", (Object)jobId);
            }
            File incomingFile = this.createTemporaryFilename();
            MessageDigest md = BlobUtils.createMessageDigest();
            BlobKey blobKey2 = null;
            try (FileOutputStream fos = new FileOutputStream(incomingFile);){
                md.update(value);
                fos.write(value);
            }
            catch (IOException ioe) {
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for job {}.", (Object)incomingFile, (Object)jobId);
                }
                throw ioe;
            }
            try {
                blobKey = blobKey2 = this.moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
                if (incomingFile.delete() || !incomingFile.exists()) break block20;
            }
            catch (Throwable throwable) {
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
                }
                throw throwable;
            }
            LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
        }
        return blobKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType) throws IOException {
        BlobKey blobKey;
        block4: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", (Object)jobId);
            }
            File incomingFile = this.createTemporaryFilename();
            BlobKey blobKey2 = null;
            try {
                MessageDigest md = BlobServer.writeStreamToFileAndCreateDigest(inputStream, incomingFile);
                blobKey = blobKey2 = this.moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
                if (incomingFile.delete() || !incomingFile.exists()) break block4;
            }
            catch (Throwable throwable) {
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
                }
                throw throwable;
            }
            LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
        }
        return blobKey;
    }

    private static MessageDigest writeStreamToFileAndCreateDigest(InputStream inputStream, File file) throws IOException {
        try (FileOutputStream fos = new FileOutputStream(file);){
            int bytesRead;
            MessageDigest md = BlobUtils.createMessageDigest();
            byte[] buf = new byte[65536];
            while ((bytesRead = inputStream.read(buf)) != -1) {
                fos.write(buf, 0, bytesRead);
                md.update(buf, 0, bytesRead);
            }
            MessageDigest messageDigest = md;
            return messageDigest;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    BlobKey moveTempFileToStore(File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType) throws IOException {
        int retries = 10;
        int attempt = 0;
        while (true) {
            BlobKey blobKey = BlobKey.createKey(blobType, digest);
            File storageFile = BlobUtils.getStorageLocation(this.storageDir.deref(), jobId, blobKey);
            this.readWriteLock.writeLock().lock();
            try {
                if (!storageFile.exists()) {
                    BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, storageFile, LOG, blobKey instanceof PermanentBlobKey ? this.blobStore : null);
                    if (blobKey instanceof TransientBlobKey) {
                        this.blobExpiryTimes.put(Tuple2.of(jobId, (TransientBlobKey)blobKey), System.currentTimeMillis() + this.cleanupInterval);
                    }
                    BlobKey blobKey2 = blobKey;
                    return blobKey2;
                }
            }
            finally {
                this.readWriteLock.writeLock().unlock();
            }
            if (++attempt >= retries) {
                String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
                LOG.error(message + " No retries left.");
                throw new IOException(message);
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})", new Object[]{jobId, attempt, storageFile.getAbsolutePath()});
        }
    }

    @Override
    public boolean deleteFromCache(TransientBlobKey key) {
        return this.deleteInternal(null, key);
    }

    @Override
    public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
        Preconditions.checkNotNull(jobId);
        return this.deleteInternal(jobId, key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean deleteInternal(@Nullable JobID jobId, TransientBlobKey key) {
        File localFile = new File(BlobUtils.getStorageLocationPath(this.storageDir.deref().getAbsolutePath(), jobId, key));
        this.readWriteLock.writeLock().lock();
        try {
            if (!localFile.delete() && localFile.exists()) {
                LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
                boolean bl = false;
                return bl;
            }
            this.blobExpiryTimes.remove(Tuple2.of(jobId, key));
            boolean bl = true;
            return bl;
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean deleteInternal(JobID jobId, PermanentBlobKey key) {
        File localFile = new File(BlobUtils.getStorageLocationPath(this.storageDir.deref().getAbsolutePath(), jobId, key));
        this.readWriteLock.writeLock().lock();
        try {
            boolean deleteLocally = true;
            if (!localFile.delete() && localFile.exists()) {
                LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
                deleteLocally = false;
            }
            boolean deleteHA = this.blobStore.delete(jobId, key);
            boolean bl = deleteLocally && deleteHA;
            return bl;
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    @Override
    public boolean deletePermanent(JobID jobId, PermanentBlobKey key) {
        return this.deleteInternal(jobId, key);
    }

    @Override
    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor) {
        Preconditions.checkNotNull(jobId);
        return this.runAsyncWithWriteLock(() -> this.internalLocalCleanup(jobId), cleanupExecutor);
    }

    @GuardedBy(value="readWriteLock")
    private void internalLocalCleanup(JobID jobId) throws IOException {
        File jobDir = new File(BlobUtils.getStorageLocationPath(this.storageDir.deref().getAbsolutePath(), jobId));
        FileUtils.deleteDirectory(jobDir);
    }

    @Override
    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
        Preconditions.checkNotNull(jobId);
        return this.runAsyncWithWriteLock(() -> {
            IOException exception = null;
            try {
                this.internalLocalCleanup(jobId);
            }
            catch (IOException e) {
                exception = e;
            }
            if (!this.blobStore.deleteAll(jobId)) {
                exception = ExceptionUtils.firstOrSuppressed(new IOException("Error while cleaning up the BlobStore for job " + jobId), exception);
            }
            if (exception != null) {
                throw new IOException(exception);
            }
        }, executor);
    }

    private CompletableFuture<Void> runAsyncWithWriteLock(ThrowingRunnable<IOException> runnable, Executor executor) {
        return CompletableFuture.runAsync(() -> {
            this.readWriteLock.writeLock().lock();
            try {
                runnable.run();
            }
            catch (IOException e) {
                throw new CompletionException(e);
            }
            finally {
                this.readWriteLock.writeLock().unlock();
            }
        }, executor);
    }

    public void retainJobs(Collection<JobID> jobsToRetain, Executor ioExecutor) throws IOException {
        if (this.storageDir.deref().exists()) {
            Set<JobID> jobsToRemove = BlobUtils.listExistingJobs(this.storageDir.deref().toPath());
            jobsToRemove.removeAll(jobsToRetain);
            ArrayList<CompletableFuture<Void>> cleanupResultFutures = new ArrayList<CompletableFuture<Void>>(jobsToRemove.size());
            for (JobID jobToRemove : jobsToRemove) {
                cleanupResultFutures.add(this.globalCleanupAsync(jobToRemove, ioExecutor));
            }
            try {
                FutureUtils.completeAll(cleanupResultFutures).get();
            }
            catch (InterruptedException | ExecutionException e) {
                ExceptionUtils.rethrowIOException(e);
            }
        }
    }

    @Override
    public PermanentBlobService getPermanentBlobService() {
        return this;
    }

    @Override
    public TransientBlobService getTransientBlobService() {
        return this;
    }

    @Override
    public final int getMinOffloadingSize() {
        return this.blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
    }

    @Override
    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    @VisibleForTesting
    ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> getBlobExpiryTimes() {
        return this.blobExpiryTimes;
    }

    public boolean isShutdown() {
        return this.shutdownRequested.get();
    }

    ServerSocket getServerSocket() {
        return this.serverSocket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unregisterConnection(BlobServerConnection conn) {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            this.activeConnections.remove(conn);
            this.activeConnections.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<BlobServerConnection> getCurrentActiveConnections() {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            return new ArrayList<BlobServerConnection>(this.activeConnections);
        }
    }
}

