package kd.bos.algo.dataset.input;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kd.bos.algo.AlgoException;
import kd.bos.algo.InputExecutor;
import kd.bos.algo.Row;
import kd.bos.algo.config.AlgoConfiguration;
import kd.bos.algo.dataset.InnerRowIterator;
import kd.bos.algo.util.concurrent.AlgoExecutors;
import kd.bos.thread.ThreadLifeCycleManager;

/* loaded from: input_file:kd/bos/algo/dataset/input/ParallelInputExecutorsRowIterator.class */
public class ParallelInputExecutorsRowIterator extends InnerRowIterator implements Closeable {
    private InputExecutor[] executors;
    private Throwable error;
    private boolean eof;
    private boolean closed;
    private AtomicInteger threadCount;
    private CountDownLatch startLatch;
    private static ExecutorService es = AlgoExecutors.newCachedThreadPool(AlgoConfiguration.PARALLELINPUT_THREADPOOL_CORESIZE.getInt(), AlgoConfiguration.PARALLELINPUT_THREADPOOL_MAXSIZE.getInt(), "Input");
    private LinkedBlockingQueue<InputExecutor> jobs = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<List<Row>> rowQueue = new LinkedBlockingQueue<>(4);
    private boolean started = false;
    private int parallelism = AlgoConfiguration.PARALLELINPUT_PARRALLELISM.getInt();
    private int bufferSize = AlgoConfiguration.PARALLELINPUT_BUFFERSIZE.getInt();
    private List<Row> rowList = null;
    private Iterator<Row> rowIterator = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algo/dataset/input/ParallelInputExecutorsRowIterator$MyRunnable.class */
    public class MyRunnable implements Runnable {
        MyRunnable() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:26:0x0090, code lost:
        
            if (r0.size() <= 0) goto L82;
         */
        /* JADX WARN: Code restructure failed: missing block: B:28:0x009a, code lost:
        
            if (r6.this$0.closed != false) goto L89;
         */
        /* JADX WARN: Code restructure failed: missing block: B:30:0x00a4, code lost:
        
            if (r6.this$0.error != null) goto L88;
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x00ad, code lost:
        
            if (java.lang.Thread.currentThread().isInterrupted() != false) goto L90;
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00c1, code lost:
        
            if (r6.this$0.rowQueue.offer(r0, 5, java.util.concurrent.TimeUnit.SECONDS) == false) goto L91;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 290
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: kd.bos.algo.dataset.input.ParallelInputExecutorsRowIterator.MyRunnable.run():void");
        }
    }

    public ParallelInputExecutorsRowIterator(InputExecutor[] inputExecutorArr) {
        this.executors = inputExecutorArr;
        init();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v23, types: [java.lang.Runnable] */
    private void init() {
        for (InputExecutor inputExecutor : this.executors) {
            this.jobs.add(inputExecutor);
        }
        int length = this.executors.length > this.parallelism ? this.parallelism : this.executors.length;
        this.threadCount = new AtomicInteger(length);
        this.startLatch = new CountDownLatch(length);
        for (int i = 0; i < length; i++) {
            MyRunnable myRunnable = new MyRunnable();
            if (AlgoConfiguration.INPUT_RUNNABLE_WRAP.getBoolean()) {
                myRunnable = ThreadLifeCycleManager.wrapRunnable(myRunnable);
            }
            try {
                es.execute(myRunnable);
            } catch (RuntimeException e) {
                this.error = new AlgoException(e, "Can't allocate enough threads in pool.", new Object[0]);
                return;
            }
        }
    }

    @Override // kd.bos.algo.dataset.InnerRowIterator
    protected boolean _hasNext() {
        if (this.error != null) {
            throw AlgoException.wrap(this.error);
        }
        if (!this.started) {
            try {
                this.startLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.started = true;
        }
        if (this.rowIterator != null) {
            if (this.rowIterator.hasNext()) {
                return true;
            }
            this.rowIterator = null;
        }
        while (this.error == null) {
            if (this.closed) {
                return false;
            }
            if (this.eof) {
                this.rowList = this.rowQueue.poll();
                if (this.rowList != null) {
                    this.rowIterator = this.rowList.iterator();
                }
                return this.rowIterator != null;
            }
            try {
                this.rowList = this.rowQueue.poll(1L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            if (this.rowList != null) {
                this.rowIterator = this.rowList.iterator();
                return true;
            }
            continue;
        }
        throw AlgoException.wrap(this.error);
    }

    @Override // kd.bos.algo.dataset.InnerRowIterator
    protected Row _next() {
        if (this.error != null) {
            throw AlgoException.wrap(this.error);
        }
        return this.rowIterator.next();
    }

    @Override // kd.bos.algo.dataset.InnerRowIterator
    public void close() {
        this.closed = true;
    }
}
