/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.action;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.RequiredField;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidations;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetector;
import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.task.AbstractJobPersistentTasksExecutor;

public class TransportStartDataFrameAnalyticsAction
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, NodeAcknowledgedResponse> {
    private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class);
    private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active";
    private final XPackLicenseState licenseState;
    private final Client client;
    private final PersistentTasksService persistentTasksService;
    private final DataFrameAnalyticsConfigProvider configProvider;
    private final MlMemoryTracker memoryTracker;
    private final DataFrameAnalyticsAuditor auditor;
    private final SourceDestValidator sourceDestValidator;

    @Inject
    public TransportStartDataFrameAnalyticsAction(TransportService transportService, Client client, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, XPackLicenseState licenseState, IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService, DataFrameAnalyticsConfigProvider configProvider, MlMemoryTracker memoryTracker, DataFrameAnalyticsAuditor auditor) {
        super("cluster:admin/xpack/ml/data_frame/analytics/start", transportService, clusterService, threadPool, actionFilters, StartDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver, NodeAcknowledgedResponse::new, "same");
        this.licenseState = licenseState;
        this.client = client;
        this.persistentTasksService = persistentTasksService;
        this.configProvider = configProvider;
        this.memoryTracker = memoryTracker;
        this.auditor = Objects.requireNonNull(auditor);
        this.sourceDestValidator = new SourceDestValidator(indexNameExpressionResolver, transportService.getRemoteClusterService(), null, null, clusterService.getNodeName(), License.OperationMode.PLATINUM.description());
    }

    protected ClusterBlockException checkBlock(StartDataFrameAnalyticsAction.Request request, ClusterState state) {
        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
    }

    protected void masterOperation(final StartDataFrameAnalyticsAction.Request request, ClusterState state, final ActionListener<NodeAcknowledgedResponse> listener) {
        logger.debug(() -> new ParameterizedMessage("[{}] received start request", (Object)request.getId()));
        if (!MachineLearningField.ML_API_FEATURE.check(this.licenseState)) {
            listener.onFailure((Exception)LicenseUtils.newComplianceException((String)"ml"));
            return;
        }
        ActionListener<PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>> waitForAnalyticsToStart = new ActionListener<PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>>(){

            public void onResponse(PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task) {
                TransportStartDataFrameAnalyticsAction.this.waitForAnalyticsStarted((PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>)task, request.getTimeout(), (ActionListener<NodeAcknowledgedResponse>)listener);
            }

            public void onFailure(Exception e) {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                    e = new ElasticsearchStatusException("Cannot start data frame analytics [{}] because it has already been started", RestStatus.CONFLICT, e, new Object[]{request.getId()});
                }
                listener.onFailure((Exception)e);
            }
        };
        ActionListener memoryUsageHandledListener = ActionListener.wrap(arg_0 -> this.lambda$masterOperation$1(request, (ActionListener)waitForAnalyticsToStart, arg_0), arg_0 -> listener.onFailure(arg_0));
        ActionListener startContextListener = ActionListener.wrap(startContext -> this.estimateMemoryUsageAndUpdateMemoryTracker((StartContext)startContext, (ActionListener<StartContext>)memoryUsageHandledListener), arg_0 -> listener.onFailure(arg_0));
        this.getStartContext(request.getId(), (ActionListener<StartContext>)startContextListener);
    }

    private void estimateMemoryUsageAndUpdateMemoryTracker(StartContext startContext, ActionListener<StartContext> listener) {
        String jobId = startContext.config.getId();
        ActionListener explainListener = ActionListener.wrap(explainResponse -> {
            ByteSizeValue expectedMemoryWithoutDisk = explainResponse.getMemoryEstimation().getExpectedMemoryWithoutDisk();
            this.auditor.info(jobId, Messages.getMessage((String)"Estimated memory usage [{0}]", (Object[])new Object[]{expectedMemoryWithoutDisk}));
            if (startContext.config.getModelMemoryLimit().compareTo(expectedMemoryWithoutDisk) < 0) {
                String warning = Messages.getMessage((String)"Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. The analytics job may fail due to configured memory constraints.", (Object[])new Object[]{startContext.config.getModelMemoryLimit(), expectedMemoryWithoutDisk});
                this.auditor.warning(jobId, warning);
                logger.warn("[{}] {}", (Object)jobId, (Object)warning);
            }
            this.memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(jobId, startContext.config.getModelMemoryLimit().getBytes(), (ActionListener<Void>)ActionListener.wrap(aVoid -> listener.onResponse((Object)startContext), arg_0 -> ((ActionListener)listener).onFailure(arg_0)));
        }, arg_0 -> listener.onFailure(arg_0));
        PutDataFrameAnalyticsAction.Request explainRequest = new PutDataFrameAnalyticsAction.Request(startContext.config);
        ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (ActionType)ExplainDataFrameAnalyticsAction.INSTANCE, (ActionRequest)explainRequest, (ActionListener)explainListener);
    }

    private void getStartContext(String id, ActionListener<StartContext> finalListener) {
        ActionListener validateMappingsMergeListener = ActionListener.wrap(startContext -> this.validateSourceIndexHasAnalyzableData((StartContext)startContext, finalListener), arg_0 -> finalListener.onFailure(arg_0));
        ActionListener toValidateMappingsListener = ActionListener.wrap(startContext -> MappingsMerger.mergeMappings(this.client, ((StartContext)startContext).config.getHeaders(), ((StartContext)startContext).config.getSource(), (ActionListener<ImmutableOpenMap<String, MappingMetadata>>)ActionListener.wrap(mappings -> validateMappingsMergeListener.onResponse(startContext), arg_0 -> ((ActionListener)finalListener).onFailure(arg_0))), arg_0 -> finalListener.onFailure(arg_0));
        ActionListener toValidateDestEmptyListener = ActionListener.wrap(startContext -> {
            switch (((StartContext)startContext).startingState) {
                case FIRST_TIME: {
                    this.checkDestIndexIsEmptyIfExists(this.client, (StartContext)startContext, (ActionListener<StartContext>)toValidateMappingsListener);
                    break;
                }
                case RESUMING_REINDEXING: 
                case RESUMING_ANALYZING: 
                case RESUMING_INFERENCE: {
                    toValidateMappingsListener.onResponse(startContext);
                    break;
                }
                case FINISHED: {
                    logger.info("[{}] Job has already finished", (Object)((StartContext)startContext).config.getId());
                    finalListener.onFailure((Exception)((Object)ExceptionsHelper.badRequestException((String)"Cannot start because the job has already finished", (Object[])new Object[0])));
                    break;
                }
                default: {
                    finalListener.onFailure((Exception)((Object)ExceptionsHelper.serverError((String)"Unexpected starting state {}", (Object[])new Object[]{((StartContext)startContext).startingState})));
                }
            }
        }, arg_0 -> finalListener.onFailure(arg_0));
        ActionListener toValidateExtractionPossibleListener = ActionListener.wrap(startContext -> new ExtractedFieldsDetectorFactory(this.client).createFromSource(((StartContext)startContext).config, (ActionListener<ExtractedFieldsDetector>)ActionListener.wrap(extractedFieldsDetector -> {
            ((StartContext)startContext).extractedFields = (ExtractedFields)extractedFieldsDetector.detect().v1();
            toValidateDestEmptyListener.onResponse(startContext);
        }, arg_0 -> ((ActionListener)finalListener).onFailure(arg_0))), arg_0 -> finalListener.onFailure(arg_0));
        ActionListener startContextListener = ActionListener.wrap(startContext -> {
            ((StartContext)startContext).config.getSource().getParsedQuery();
            this.sourceDestValidator.validate(this.clusterService.state(), ((StartContext)startContext).config.getSource().getIndex(), ((StartContext)startContext).config.getDest().getIndex(), null, SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(aBoolean -> toValidateExtractionPossibleListener.onResponse(startContext), arg_0 -> ((ActionListener)finalListener).onFailure(arg_0)));
        }, arg_0 -> finalListener.onFailure(arg_0));
        ActionListener getConfigListener = ActionListener.wrap(config -> this.getProgress((DataFrameAnalyticsConfig)config, (ActionListener<List<PhaseProgress>>)ActionListener.wrap(progress -> startContextListener.onResponse((Object)new StartContext((DataFrameAnalyticsConfig)config, (List)progress)), arg_0 -> ((ActionListener)finalListener).onFailure(arg_0))), arg_0 -> finalListener.onFailure(arg_0));
        this.configProvider.get(id, (ActionListener<DataFrameAnalyticsConfig>)getConfigListener);
    }

    private void validateSourceIndexHasAnalyzableData(StartContext startContext, ActionListener<StartContext> listener) {
        ActionListener validateAtLeastOneAnalyzedFieldListener = ActionListener.wrap(aVoid -> this.validateSourceIndexRowsCount(startContext, listener), arg_0 -> listener.onFailure(arg_0));
        this.validateSourceIndexHasAtLeastOneAnalyzedField(startContext, (ActionListener<Void>)validateAtLeastOneAnalyzedFieldListener);
    }

    private void validateSourceIndexHasAtLeastOneAnalyzedField(StartContext startContext, ActionListener<Void> listener) {
        Set requiredFields = startContext.config.getAnalysis().getRequiredFields().stream().map(RequiredField::getName).collect(Collectors.toSet());
        long nonRequiredFieldsCount = startContext.extractedFields.getAllFields().stream().filter(extractedField -> !requiredFields.contains(extractedField.getName())).count();
        if (nonRequiredFieldsCount == 0L) {
            StringBuilder msgBuilder = new StringBuilder("at least one field must be included in the analysis");
            if (!requiredFields.isEmpty()) {
                msgBuilder.append(" (excluding fields ").append(requiredFields).append(")");
            }
            listener.onFailure((Exception)((Object)ExceptionsHelper.badRequestException((String)msgBuilder.toString(), (Object[])new Object[0])));
        } else {
            listener.onResponse(null);
        }
    }

    private void validateSourceIndexRowsCount(StartContext startContext, ActionListener<StartContext> listener) {
        DataFrameDataExtractorFactory extractorFactory = DataFrameDataExtractorFactory.createForSourceIndices(this.client, "validate_source_index_has_rows-" + startContext.config.getId(), startContext.config, startContext.extractedFields);
        extractorFactory.newExtractor(false).collectDataSummaryAsync((ActionListener<DataFrameDataExtractor.DataSummary>)ActionListener.wrap(dataSummary -> {
            if (dataSummary.rows == 0L) {
                listener.onFailure((Exception)((Object)ExceptionsHelper.badRequestException((String)"Unable to start {} as no documents in the source indices [{}] contained all the fields selected for analysis. If you are relying on automatic field selection then there are currently mapped fields that do not exist in any indexed documents, and you will have to switch to explicit field selection and include only fields that exist in indexed documents.", (Object[])new Object[]{startContext.config.getId(), Strings.arrayToCommaDelimitedString((Object[])startContext.config.getSource().getIndex())})));
            } else if (Math.floor(startContext.config.getAnalysis().getTrainingPercent() * (double)dataSummary.rows) >= Math.pow(2.0, 32.0)) {
                listener.onFailure((Exception)((Object)ExceptionsHelper.badRequestException((String)"Unable to start because too many documents (more than 2^32) are included in the analysis. Consider downsampling.", (Object[])new Object[0])));
            } else {
                listener.onResponse((Object)startContext);
            }
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    private void getProgress(DataFrameAnalyticsConfig config, ActionListener<List<PhaseProgress>> listener) {
        GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(config.getId());
        ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (ActionType)GetDataFrameAnalyticsStatsAction.INSTANCE, (ActionRequest)getStatsRequest, (ActionListener)ActionListener.wrap(statsResponse -> {
            List stats = statsResponse.getResponse().results();
            if (stats.isEmpty()) {
                listener.onFailure((Exception)((Object)ExceptionsHelper.missingDataFrameAnalytics((String)config.getId())));
            } else {
                listener.onResponse((Object)((GetDataFrameAnalyticsStatsAction.Response.Stats)stats.get(0)).getProgress());
            }
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    private void checkDestIndexIsEmptyIfExists(Client client, StartContext startContext, ActionListener<StartContext> listener) {
        String destIndex = startContext.config.getDest().getIndex();
        SearchRequest destEmptySearch = new SearchRequest(new String[]{destIndex});
        destEmptySearch.source().size(0);
        destEmptySearch.allowPartialSearchResults(false);
        ClientHelper.executeWithHeadersAsync((Map)startContext.config.getHeaders(), (String)"ml", (Client)client, (ActionType)SearchAction.INSTANCE, (ActionRequest)destEmptySearch, (ActionListener)ActionListener.wrap(searchResponse -> {
            if (searchResponse.getHits().getTotalHits().value > 0L) {
                listener.onFailure((Exception)((Object)ExceptionsHelper.badRequestException((String)"dest index [{}] must be empty", (Object[])new Object[]{destIndex})));
            } else {
                listener.onResponse((Object)startContext);
            }
        }, e -> {
            if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof IndexNotFoundException) {
                listener.onResponse((Object)startContext);
            } else {
                listener.onFailure(e);
            }
        }));
    }

    private void waitForAnalyticsStarted(final PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task, TimeValue timeout, final ActionListener<NodeAcknowledgedResponse> listener) {
        final AnalyticsPredicate predicate = new AnalyticsPredicate();
        this.persistentTasksService.waitForPersistentTaskCondition(task.getId(), (Predicate)predicate, timeout, (PersistentTasksService.WaitForPersistentTaskListener)new PersistentTasksService.WaitForPersistentTaskListener<PersistentTaskParams>(){

            public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
                if (predicate.exception != null) {
                    TransportStartDataFrameAnalyticsAction.this.cancelAnalyticsStart((PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>)task, predicate.exception, (ActionListener<NodeAcknowledgedResponse>)listener);
                } else {
                    TransportStartDataFrameAnalyticsAction.this.auditor.info(((StartDataFrameAnalyticsAction.TaskParams)task.getParams()).getId(), "Started analytics");
                    listener.onResponse((Object)new NodeAcknowledgedResponse(true, predicate.node));
                }
            }

            public void onFailure(Exception e) {
                listener.onFailure(e);
            }

            public void onTimeout(TimeValue timeout) {
                logger.error((Message)new ParameterizedMessage("[{}] timed out when starting task after [{}]. Assignment explanation [{}]", new Object[]{((StartDataFrameAnalyticsAction.TaskParams)task.getParams()).getId(), timeout, predicate.assignmentExplanation}));
                if (predicate.assignmentExplanation != null) {
                    TransportStartDataFrameAnalyticsAction.this.cancelAnalyticsStart((PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>)task, (Exception)((Object)new ElasticsearchStatusException("Could not start data frame analytics task, timed out after [{}] waiting for task assignment. Assignment explanation [{}]", RestStatus.TOO_MANY_REQUESTS, new Object[]{timeout, predicate.assignmentExplanation})), (ActionListener<NodeAcknowledgedResponse>)listener);
                } else {
                    listener.onFailure((Exception)((Object)new ElasticsearchException("Starting data frame analytics [{}] timed out after [{}]", new Object[]{((StartDataFrameAnalyticsAction.TaskParams)task.getParams()).getId(), timeout})));
                }
            }
        });
    }

    private void cancelAnalyticsStart(final PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask, final Exception exception, final ActionListener<NodeAcknowledgedResponse> listener) {
        this.persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>(){

            public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
                listener.onFailure(exception);
            }

            public void onFailure(Exception e) {
                logger.error((Message)new ParameterizedMessage("[{}] Failed to cancel persistent task that could not be assigned due to [{}]", (Object)((StartDataFrameAnalyticsAction.TaskParams)persistentTask.getParams()).getId(), (Object)exception.getMessage()), (Throwable)e);
                listener.onFailure(exception);
            }
        });
    }

    private /* synthetic */ void lambda$masterOperation$1(StartDataFrameAnalyticsAction.Request request, ActionListener waitForAnalyticsToStart, StartContext startContext) throws Exception {
        StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(request.getId(), startContext.config.getVersion(), startContext.config.isAllowLazyStart());
        this.persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId((String)request.getId()), "xpack/ml/data_frame/analytics", (PersistentTaskParams)taskParams, waitForAnalyticsToStart);
    }

    private static class StartContext {
        private final DataFrameAnalyticsConfig config;
        private final DataFrameAnalyticsTask.StartingState startingState;
        private volatile ExtractedFields extractedFields;

        private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progressOnStart) {
            this.config = config;
            this.startingState = DataFrameAnalyticsTask.determineStartingState(config.getId(), progressOnStart);
        }
    }

    private static class AnalyticsPredicate
    implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
        private volatile Exception exception;
        private volatile String node = "";
        private volatile String assignmentExplanation;

        private AnalyticsPredicate() {
        }

        @Override
        public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
            if (persistentTask == null) {
                return false;
            }
            PersistentTasksCustomMetadata.Assignment assignment = persistentTask.getAssignment();
            if (assignment != null && assignment.equals((Object)JobNodeSelector.AWAITING_LAZY_ASSIGNMENT)) {
                return true;
            }
            String reason = "__unknown__";
            if (assignment != null && !assignment.equals((Object)PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT) && !assignment.isAssigned()) {
                this.assignmentExplanation = assignment.getExplanation();
                if (this.assignmentExplanation.contains(TransportStartDataFrameAnalyticsAction.PRIMARY_SHARDS_INACTIVE)) {
                    return false;
                }
                this.exception = new ElasticsearchStatusException("Could not start data frame analytics task, allocation explanation [{}]", RestStatus.TOO_MANY_REQUESTS, new Object[]{assignment.getExplanation()});
                return true;
            }
            DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState)persistentTask.getState();
            reason = taskState != null ? taskState.getReason() : reason;
            DataFrameAnalyticsState analyticsState = taskState == null ? DataFrameAnalyticsState.STOPPED : taskState.getState();
            switch (analyticsState) {
                case STARTED: {
                    this.node = persistentTask.getExecutorNode();
                    return true;
                }
                case STOPPING: {
                    this.exception = ExceptionsHelper.conflictStatusException((String)"the task has been stopped while waiting to be started", (Object[])new Object[0]);
                    return true;
                }
                case STARTING: 
                case STOPPED: {
                    return false;
                }
            }
            this.exception = ExceptionsHelper.serverError((String)"Unexpected task state [{}] {}while waiting to be started", (Object[])new Object[]{analyticsState, reason == null ? "" : "with reason [" + reason + "] "});
            return true;
        }
    }

    public static class TaskExecutor
    extends AbstractJobPersistentTasksExecutor<StartDataFrameAnalyticsAction.TaskParams> {
        private final Client client;
        private final DataFrameAnalyticsManager manager;
        private final DataFrameAnalyticsAuditor auditor;
        private final XPackLicenseState licenseState;
        private volatile ClusterState clusterState;

        public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager, DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver, XPackLicenseState licenseState) {
            super("xpack/ml/data_frame/analytics", "ml_utility", settings, clusterService, memoryTracker, resolver);
            this.client = Objects.requireNonNull(client);
            this.manager = Objects.requireNonNull(manager);
            this.auditor = Objects.requireNonNull(auditor);
            this.licenseState = licenseState;
            clusterService.addListener(event -> {
                this.clusterState = event.state();
            });
        }

        protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask, Map<String, String> headers) {
            return new DataFrameAnalyticsTask(id, type, action, parentTaskId, headers, this.client, this.manager, this.auditor, (StartDataFrameAnalyticsAction.TaskParams)persistentTask.getParams(), this.licenseState);
        }

        public PersistentTasksCustomMetadata.Assignment getAssignment(StartDataFrameAnalyticsAction.TaskParams params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
            boolean isMemoryTrackerRecentlyRefreshed = this.memoryTracker.isRecentlyRefreshed();
            Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = this.getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
            if (optionalAssignment.isPresent()) {
                return optionalAssignment.get();
            }
            JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, candidateNodes, params.getId(), "xpack/ml/data_frame/analytics", this.memoryTracker, params.isAllowLazyStart() ? Integer.MAX_VALUE : this.maxLazyMLNodes, node -> TaskExecutor.nodeFilter(node, params));
            PersistentTasksCustomMetadata.Assignment assignment = jobNodeSelector.selectNode(this.maxOpenJobs, Integer.MAX_VALUE, this.maxMachineMemoryPercent, this.maxNodeMemory, this.useAutoMemoryPercentage);
            this.auditRequireMemoryIfNecessary(params.getId(), this.auditor, assignment, jobNodeSelector, isMemoryTrackerRecentlyRefreshed);
            return assignment;
        }

        protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params, PersistentTaskState state) {
            DataFrameAnalyticsTask dfaTask = (DataFrameAnalyticsTask)task;
            DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState)state;
            DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? DataFrameAnalyticsState.STOPPED : analyticsTaskState.getState();
            logger.info("[{}] Starting data frame analytics from state [{}]", (Object)params.getId(), (Object)analyticsState);
            if (DataFrameAnalyticsState.STOPPING.equals((Object)analyticsState)) {
                logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", (Object)params.getId());
                task.markAsCompleted();
                return;
            }
            if (DataFrameAnalyticsState.FAILED.equals((Object)analyticsState)) {
                return;
            }
            ActionListener statsListener = ActionListener.wrap(statsResponse -> {
                GetDataFrameAnalyticsStatsAction.Response.Stats stats = (GetDataFrameAnalyticsStatsAction.Response.Stats)statsResponse.getResponse().results().get(0);
                dfaTask.setStatsHolder(new StatsHolder(stats.getProgress(), stats.getMemoryUsage(), stats.getAnalysisStats(), stats.getDataCounts()));
                this.executeTask(dfaTask);
            }, dfaTask::setFailed);
            ActionListener indexCheckListener = ActionListener.wrap(ok -> ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (ActionType)GetDataFrameAnalyticsStatsAction.INSTANCE, (ActionRequest)new GetDataFrameAnalyticsStatsAction.Request(params.getId()), (ActionListener)statsListener), error -> {
                Throwable cause = ExceptionsHelper.unwrapCause((Throwable)error);
                logger.error((Message)new ParameterizedMessage("[{}] failed to create internal index [{}]", (Object)params.getId(), (Object)".ml-inference-000004"), cause);
                dfaTask.setFailed((Exception)error);
            });
            MlIndexAndAlias.createSystemIndexIfNecessary((Client)this.client, (ClusterState)this.clusterState, (SystemIndexDescriptor)MachineLearning.getInferenceIndexSystemIndexDescriptor(), (TimeValue)MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT, (ActionListener)indexCheckListener);
        }

        private void executeTask(DataFrameAnalyticsTask task) {
            DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, task.getAllocationId(), null);
            task.updatePersistentTaskState((PersistentTaskState)startedState, ActionListener.wrap(response -> this.manager.execute(task, this.clusterState, MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT), arg_0 -> ((DataFrameAnalyticsTask)task).markAsFailed(arg_0)));
        }

        public static String nodeFilter(DiscoveryNode node, StartDataFrameAnalyticsAction.TaskParams params) {
            String id = params.getId();
            if (node.getVersion().before(StartDataFrameAnalyticsAction.TaskParams.VERSION_INTRODUCED)) {
                return "Not opening job [" + id + "] on node [" + JobNodeSelector.nodeNameAndVersion(node) + "], because the data frame analytics requires a node of version [" + StartDataFrameAnalyticsAction.TaskParams.VERSION_INTRODUCED + "] or higher";
            }
            if (node.getVersion().before(StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED) && params.getVersion().onOrAfter(StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED)) {
                return "Not opening job [" + id + "] on node [" + JobNodeSelector.nodeNameAndVersion(node) + "], because the data frame analytics created for version [" + params.getVersion() + "] requires a node of version [" + StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED + "] or higher";
            }
            return null;
        }

        @Override
        protected String[] indicesOfInterest(StartDataFrameAnalyticsAction.TaskParams params) {
            return new String[]{MlConfigIndex.indexName(), MlStatsIndex.indexPattern(), AnomalyDetectorsIndex.jobStateIndexPattern()};
        }

        @Override
        protected String getJobId(StartDataFrameAnalyticsAction.TaskParams params) {
            return params.getId();
        }
    }
}

