/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateProcessMessage;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;

public class AutodetectCommunicator
implements Closeable {
    private static final Logger logger = LogManager.getLogger(AutodetectCommunicator.class);
    private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1L);
    private final Job job;
    private final AutodetectProcess autodetectProcess;
    private final StateStreamer stateStreamer;
    private final DataCountsReporter dataCountsReporter;
    private final AutodetectResultProcessor autodetectResultProcessor;
    private final BiConsumer<Exception, Boolean> onFinishHandler;
    private final ExecutorService autodetectWorkerExecutor;
    private final NamedXContentRegistry xContentRegistry;
    private final boolean includeTokensField;
    private volatile CategorizationAnalyzer categorizationAnalyzer;
    private volatile boolean processKilled;

    AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer, DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor, BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
        this.job = job;
        this.autodetectProcess = process;
        this.stateStreamer = stateStreamer;
        this.dataCountsReporter = dataCountsReporter;
        this.autodetectResultProcessor = autodetectResultProcessor;
        this.onFinishHandler = onFinishHandler;
        this.xContentRegistry = xContentRegistry;
        this.autodetectWorkerExecutor = autodetectWorkerExecutor;
        this.includeTokensField = job.getAnalysisConfig().getCategorizationFieldName() != null;
    }

    public void restoreState(ModelSnapshot modelSnapshot) {
        this.autodetectProcess.restoreState(this.stateStreamer, modelSnapshot);
    }

    private DataToProcessWriter createProcessWriter(DataDescription dataDescription) {
        return DataToProcessWriterFactory.create(true, this.includeTokensField, this.autodetectProcess, dataDescription, this.job.getAnalysisConfig(), this.dataCountsReporter, this.xContentRegistry);
    }

    public void writeHeader() throws IOException {
        this.createProcessWriter(this.job.getDataDescription()).writeHeader();
    }

    public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistry, XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
        this.submitOperation(() -> {
            if (params.isResettingBuckets()) {
                this.autodetectProcess.writeResetBucketsControlMessage(params);
            }
            CountingInputStream countingStream = new CountingInputStream(inputStream, this.dataCountsReporter);
            DataToProcessWriter autodetectWriter = this.createProcessWriter(params.getDataDescription().orElse(this.job.getDataDescription()));
            if (this.includeTokensField && this.categorizationAnalyzer == null) {
                this.createCategorizationAnalyzer(analysisRegistry);
            }
            CountDownLatch latch = new CountDownLatch(1);
            AtomicReference dataCountsAtomicReference = new AtomicReference();
            AtomicReference exceptionAtomicReference = new AtomicReference();
            autodetectWriter.write(countingStream, this.categorizationAnalyzer, xContentType, (dataCounts, e) -> {
                dataCountsAtomicReference.set(dataCounts);
                exceptionAtomicReference.set(e);
                latch.countDown();
            });
            latch.await();
            autodetectWriter.flushStream();
            if (exceptionAtomicReference.get() != null) {
                throw (Exception)exceptionAtomicReference.get();
            }
            return (DataCounts)dataCountsAtomicReference.get();
        }, handler);
    }

    @Override
    public void close() {
        Future<Object> future = this.autodetectWorkerExecutor.submit(() -> {
            this.checkProcessIsAlive();
            try {
                if (this.autodetectProcess.isReady()) {
                    this.autodetectProcess.close();
                } else {
                    this.killProcess(false, false);
                    this.stateStreamer.cancel();
                }
                this.autodetectResultProcessor.awaitCompletion();
            }
            finally {
                this.onFinishHandler.accept(null, true);
            }
            logger.info("[{}] autodetect connection for job closed", (Object)this.job.getId());
            return null;
        });
        try {
            future.get();
            this.autodetectWorkerExecutor.shutdown();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            if (this.processKilled) {
                throw ExceptionsHelper.conflictStatusException((String)"Close job interrupted by kill request", (Object[])new Object[0]);
            }
            throw FutureUtils.rethrowExecutionException((ExecutionException)e);
        }
        finally {
            this.destroyCategorizationAnalyzer();
        }
    }

    public void killProcess(boolean awaitCompletion, boolean finish) throws IOException {
        this.killProcess(awaitCompletion, finish, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
        try {
            this.processKilled = true;
            this.autodetectResultProcessor.setProcessKilled();
            this.autodetectWorkerExecutor.shutdown();
            this.autodetectProcess.kill(awaitCompletion);
            if (awaitCompletion) {
                try {
                    this.autodetectResultProcessor.awaitCompletion();
                }
                catch (TimeoutException e) {
                    logger.warn((Message)new ParameterizedMessage("[{}] Timed out waiting for killed job", (Object)this.job.getId()), (Throwable)e);
                }
            }
        }
        finally {
            if (finish) {
                this.onFinishHandler.accept(null, finalizeJob);
            }
            this.destroyCategorizationAnalyzer();
        }
    }

    public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
        this.submitOperation(() -> {
            if (update.getModelPlotConfig() != null) {
                this.autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
            }
            if (update.getPerPartitionCategorizationConfig() != null) {
                this.autodetectProcess.writeUpdatePerPartitionCategorizationMessage(update.getPerPartitionCategorizationConfig());
            }
            if (update.getFilters() != null) {
                this.autodetectProcess.writeUpdateFiltersMessage(update.getFilters());
            }
            if (update.getDetectorUpdates() != null) {
                for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
                    if (detectorUpdate.getRules() == null) continue;
                    this.autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
                }
            }
            if (update.getScheduledEvents() != null) {
                this.autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), this.job.getAnalysisConfig().getBucketSpan());
            }
            return null;
        }, handler);
    }

    public void flushJob(FlushJobParams params, BiConsumer<FlushAcknowledgement, Exception> handler) {
        this.submitOperation(() -> {
            String flushId = this.autodetectProcess.flushJob(params);
            return this.waitFlushToCompletion(flushId, params.isWaitForNormalization());
        }, handler);
    }

    public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handler) {
        BiConsumer<Void, Exception> forecastConsumer = (aVoid, e) -> {
            if (e == null) {
                FlushJobParams flushParams = FlushJobParams.builder().waitForNormalization(false).build();
                this.flushJob(flushParams, (flushAcknowledgement, flushException) -> {
                    if (flushException != null) {
                        String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", this.job.getId());
                        handler.accept((Void)null, (Exception)ExceptionsHelper.serverError((String)msg, (Throwable)e));
                    } else {
                        handler.accept(null, null);
                    }
                });
            } else {
                handler.accept((Void)null, (Exception)e);
            }
        };
        this.submitOperation(() -> {
            this.autodetectProcess.forecastJob(params);
            return null;
        }, forecastConsumer);
    }

    public void persistJob(BiConsumer<Void, Exception> handler) {
        this.submitOperation(() -> {
            this.autodetectProcess.persistState();
            return null;
        }, handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    FlushAcknowledgement waitFlushToCompletion(String flushId, boolean waitForNormalization) throws Exception {
        FlushAcknowledgement flushAcknowledgement;
        logger.debug("[{}] waiting for flush", (Object)this.job.getId());
        try {
            flushAcknowledgement = this.autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
            while (flushAcknowledgement == null) {
                this.checkProcessIsAlive();
                this.checkResultsProcessorIsAlive();
                flushAcknowledgement = this.autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
            }
        }
        finally {
            this.autodetectResultProcessor.clearAwaitingFlush(flushId);
        }
        if (!this.processKilled) {
            if (waitForNormalization) {
                logger.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", (Object)this.job.getId());
                this.autodetectResultProcessor.waitUntilRenormalizerIsIdle();
            }
            logger.debug("[{}] Flush completed", (Object)this.job.getId());
        }
        return flushAcknowledgement;
    }

    private void checkProcessIsAlive() {
        if (!this.autodetectProcess.isProcessAlive()) {
            throw new ElasticsearchException("[{}] Unexpected death of autodetect: {}", new Object[]{this.job.getId(), this.autodetectProcess.readError()});
        }
    }

    private void checkResultsProcessorIsAlive() {
        if (this.autodetectResultProcessor.isFailed()) {
            throw new ElasticsearchException("[{}] Unexpected death of the result processor", new Object[]{this.job.getId()});
        }
    }

    public ZonedDateTime getProcessStartTime() {
        return this.autodetectProcess.getProcessStartTime();
    }

    public ModelSizeStats getModelSizeStats() {
        return this.autodetectResultProcessor.modelSizeStats();
    }

    public TimingStats getTimingStats() {
        return this.autodetectResultProcessor.timingStats();
    }

    public DataCounts getDataCounts() {
        return this.dataCountsReporter.runningTotalStats();
    }

    void destroyCategorizationAnalyzer() {
        if (this.categorizationAnalyzer != null) {
            this.categorizationAnalyzer.close();
            this.categorizationAnalyzer = null;
        }
    }

    private <T> void submitOperation(final CheckedSupplier<T, Exception> operation, final BiConsumer<T, Exception> handler) {
        this.autodetectWorkerExecutor.execute((Runnable)new AbstractRunnable(){

            public void onFailure(Exception e) {
                if (AutodetectCommunicator.this.processKilled) {
                    handler.accept(null, ExceptionsHelper.conflictStatusException((String)"[{}] Could not submit operation to process as it has been killed", (Object[])new Object[]{AutodetectCommunicator.this.job.getId()}));
                } else {
                    logger.error((Message)new ParameterizedMessage("[{}] Unexpected exception writing to process", (Object)AutodetectCommunicator.this.job.getId()), (Throwable)e);
                    handler.accept(null, e);
                }
            }

            protected void doRun() throws Exception {
                if (AutodetectCommunicator.this.processKilled) {
                    handler.accept(null, ExceptionsHelper.conflictStatusException((String)"[{}] Could not submit operation to process as it has been killed", (Object[])new Object[]{AutodetectCommunicator.this.job.getId()}));
                } else {
                    AutodetectCommunicator.this.checkProcessIsAlive();
                    handler.accept(operation.get(), null);
                }
            }
        });
    }

    private void createCategorizationAnalyzer(AnalysisRegistry analysisRegistry) throws IOException {
        AnalysisConfig analysisConfig = this.job.getAnalysisConfig();
        CategorizationAnalyzerConfig categorizationAnalyzerConfig = analysisConfig.getCategorizationAnalyzerConfig();
        if (categorizationAnalyzerConfig == null) {
            categorizationAnalyzerConfig = CategorizationAnalyzerConfig.buildDefaultCategorizationAnalyzer((List)analysisConfig.getCategorizationFilters());
        }
        this.categorizationAnalyzer = new CategorizationAnalyzer(analysisRegistry, categorizationAnalyzerConfig);
    }
}

