package org.broadinstitute.gatk.utils.nanoScheduler;

import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;
import org.broadinstitute.gatk.utils.MultiThreadedErrorTracker;
import org.broadinstitute.gatk.utils.threading.NamedThreadFactory;

/* loaded from: input_file:org/broadinstitute/gatk/utils/nanoScheduler/NanoScheduler.class */
public class NanoScheduler<InputType, MapType, ReduceType> {
    private static final Logger logger = Logger.getLogger(NanoScheduler.class);
    private static final boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
    protected static final int UPDATE_PROGRESS_FREQ = 100;
    final int bufferSize;
    final int nThreads;
    final ExecutorService masterExecutor;
    final ExecutorService mapExecutor;
    final MultiThreadedErrorTracker errorTracker;
    boolean shutdown;
    boolean debug;
    private NSProgressFunction<InputType> progressFunction;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/broadinstitute/gatk/utils/nanoScheduler/NanoScheduler$MasterJob.class */
    public class MasterJob implements Callable<ReduceType> {
        final Iterator<InputType> inputReader;
        final NSMapFunction<InputType, MapType> map;
        final ReduceType initialValue;
        final NSReduceFunction<MapType, ReduceType> reduce;

        private MasterJob(Iterator<InputType> it, NSMapFunction<InputType, MapType> nSMapFunction, ReduceType reducetype, NSReduceFunction<MapType, ReduceType> nSReduceFunction) {
            this.inputReader = it;
            this.map = nSMapFunction;
            this.initialValue = reducetype;
            this.reduce = nSReduceFunction;
        }

        @Override // java.util.concurrent.Callable
        public ReduceType call() {
            InputProducer inputProducer = new InputProducer(this.inputReader);
            MapResultsQueue<MapType> mapResultsQueue = new MapResultsQueue<>();
            Reducer<MapType, ReduceType> reducer = new Reducer<>(this.reduce, NanoScheduler.this.errorTracker, this.initialValue);
            CountDownLatch countDownLatch = new CountDownLatch(NanoScheduler.this.nThreads);
            for (int i = 0; i < NanoScheduler.this.nThreads; i++) {
                try {
                    NanoScheduler.this.mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultsQueue, countDownLatch, this.map, reducer));
                } catch (Throwable th) {
                    NanoScheduler.this.errorTracker.notifyOfError(th);
                    return this.initialValue;
                }
            }
            return (ReduceType) waitForCompletion(mapResultsQueue, countDownLatch, reducer);
        }

        private ReduceType waitForCompletion(MapResultsQueue<MapType> mapResultsQueue, CountDownLatch countDownLatch, Reducer<MapType, ReduceType> reducer) throws InterruptedException {
            countDownLatch.await();
            reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
            return reducer.getReduceResult();
        }
    }

    /* loaded from: input_file:org/broadinstitute/gatk/utils/nanoScheduler/NanoScheduler$ReadMapReduceJob.class */
    private class ReadMapReduceJob implements Runnable {
        final InputProducer<InputType> inputProducer;
        final MapResultsQueue<MapType> mapResultQueue;
        final NSMapFunction<InputType, MapType> map;
        final Reducer<MapType, ReduceType> reducer;
        final CountDownLatch runningMapJobs;

        private ReadMapReduceJob(InputProducer<InputType> inputProducer, MapResultsQueue<MapType> mapResultsQueue, CountDownLatch countDownLatch, NSMapFunction<InputType, MapType> nSMapFunction, Reducer<MapType, ReduceType> reducer) {
            this.inputProducer = inputProducer;
            this.mapResultQueue = mapResultsQueue;
            this.runningMapJobs = countDownLatch;
            this.map = nSMapFunction;
            this.reducer = reducer;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (!z) {
                try {
                    try {
                        InputProducer<InputType>.InputValue next = this.inputProducer.next();
                        if (next.isEOFMarker()) {
                            z = true;
                        } else {
                            InputType value = next.getValue();
                            this.mapResultQueue.put(new MapResult<>(this.map.apply(value), next.getId()));
                            this.reducer.reduceAsMuchAsPossible(this.mapResultQueue, false);
                            NanoScheduler.this.updateProgress(next.getId(), value);
                        }
                    } catch (Throwable th) {
                        NanoScheduler.this.errorTracker.notifyOfError(th);
                        this.runningMapJobs.countDown();
                        return;
                    }
                } finally {
                    this.runningMapJobs.countDown();
                }
            }
        }
    }

    public NanoScheduler(int i) {
        this(i * 100, i);
    }

    protected NanoScheduler(int i, int i2) {
        this.errorTracker = new MultiThreadedErrorTracker();
        this.shutdown = false;
        this.debug = false;
        this.progressFunction = null;
        if (i < 1) {
            throw new IllegalArgumentException("bufferSize must be >= 1, got " + i);
        }
        if (i2 < 1) {
            throw new IllegalArgumentException("nThreads must be >= 1, got " + i2);
        }
        this.bufferSize = i;
        this.nThreads = i2;
        if (i2 == 1) {
            this.masterExecutor = null;
            this.mapExecutor = null;
        } else {
            this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
            this.mapExecutor = Executors.newFixedThreadPool(i2, new NamedThreadFactory("NS-map-thread-%d"));
        }
    }

    @Ensures({"result > 0"})
    public int getnThreads() {
        return this.nThreads;
    }

    @Ensures({"result > 0"})
    public int getBufferSize() {
        return this.bufferSize;
    }

    public void shutdown() {
        if (this.nThreads > 1) {
            shutdownExecutor("mapExecutor", this.mapExecutor);
            shutdownExecutor("masterExecutor", this.masterExecutor);
        }
        this.shutdown = true;
    }

    @Ensures({"executorService.isShutdown()"})
    @Requires({"name != null", "executorService != null"})
    private void shutdownExecutor(String str, ExecutorService executorService) {
        if (executorService.isShutdown() || executorService.isTerminated()) {
            throw new IllegalStateException("Executor service " + str + " is already shut down!");
        }
        List<Runnable> shutdownNow = executorService.shutdownNow();
        if (!shutdownNow.isEmpty()) {
            throw new IllegalStateException(shutdownNow.size() + " remaining tasks found in an executor " + str + ", unexpected behavior!");
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public boolean isDebug() {
        return this.debug;
    }

    @Requires({"format != null"})
    protected void debugPrint(String str, Object... objArr) {
        if (isDebug()) {
            logger.warn("Thread " + Thread.currentThread().getId() + ":" + String.format(str, objArr));
        }
    }

    public void setDebug(boolean z) {
        this.debug = z;
    }

    public void setProgressFunction(NSProgressFunction<InputType> nSProgressFunction) {
        this.progressFunction = nSProgressFunction;
    }

    public ReduceType execute(Iterator<InputType> it, NSMapFunction<InputType, MapType> nSMapFunction, ReduceType reducetype, NSReduceFunction<MapType, ReduceType> nSReduceFunction) {
        if (isShutdown()) {
            throw new IllegalStateException("execute called on already shutdown NanoScheduler");
        }
        if (it == null) {
            throw new IllegalArgumentException("inputReader cannot be null");
        }
        if (nSMapFunction == null) {
            throw new IllegalArgumentException("map function cannot be null");
        }
        if (nSReduceFunction == null) {
            throw new IllegalArgumentException("reduce function cannot be null");
        }
        return getnThreads() == 1 ? executeSingleThreaded(it, nSMapFunction, reducetype, nSReduceFunction) : executeMultiThreaded(it, nSMapFunction, reducetype, nSReduceFunction);
    }

    @Requires({"inputReader != null", "map != null", "reduce != null"})
    private ReduceType executeSingleThreaded(Iterator<InputType> it, NSMapFunction<InputType, MapType> nSMapFunction, ReduceType reducetype, NSReduceFunction<MapType, ReduceType> nSReduceFunction) {
        ReduceType reducetype2 = reducetype;
        int i = 0;
        while (it.hasNext()) {
            InputType next = it.next();
            MapType apply = nSMapFunction.apply(next);
            int i2 = i;
            i++;
            updateProgress(i2, next);
            reducetype2 = nSReduceFunction.apply(apply, reducetype2);
        }
        return reducetype2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateProgress(int i, InputType inputtype) {
        if (this.progressFunction == null || i % 100 != 0) {
            return;
        }
        this.progressFunction.progress(inputtype);
    }

    @Requires({"inputReader != null", "map != null", "reduce != null"})
    private ReduceType executeMultiThreaded(Iterator<InputType> it, NSMapFunction<InputType, MapType> nSMapFunction, ReduceType reducetype, NSReduceFunction<MapType, ReduceType> nSReduceFunction) {
        debugPrint("Executing nanoScheduler", new Object[0]);
        Future submit = this.masterExecutor.submit(new MasterJob(it, nSMapFunction, reducetype, nSReduceFunction));
        while (true) {
            handleErrors();
            try {
                ReduceType reducetype2 = (ReduceType) submit.get(100L, TimeUnit.MILLISECONDS);
                handleErrors();
                return reducetype2;
            } catch (InterruptedException e) {
                this.errorTracker.notifyOfError(e);
            } catch (ExecutionException e2) {
                this.errorTracker.notifyOfError(e2);
            } catch (TimeoutException e3) {
            }
        }
    }

    private void handleErrors() {
        if (this.errorTracker.hasAnErrorOccurred()) {
            this.masterExecutor.shutdownNow();
            this.mapExecutor.shutdownNow();
            this.errorTracker.throwErrorIfPending();
        }
    }
}
