package kd.isc.kem.core.subscribe.definition;

import com.google.common.base.Stopwatch;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.isc.kem.common.constants.LogStatus;
import kd.isc.kem.common.constants.NodeLogStatus;
import kd.isc.kem.common.exception.ExceptionUtil;
import kd.isc.kem.common.exception.KemException;
import kd.isc.kem.common.model.JsonHashMap;
import kd.isc.kem.common.util.KemAssert;
import kd.isc.kem.common.util.LogUtil;
import kd.isc.kem.core.exception.KemCoreError;
import kd.isc.kem.core.subscribe.definition.ProcessorDefinition;
import kd.isc.kem.core.subscribe.handler.LogHandler;
import kd.isc.kem.core.subscribe.model.FilterModel;
import kd.isc.kem.core.subscribe.model.NodeLogModel;
import kd.isc.kem.core.subscribe.model.NodeOutput;
import kd.isc.kem.core.subscribe.model.SourceModel;
import kd.isc.kem.core.subscribe.model.SubscribeInfo;
import kd.isc.kem.core.subscribe.model.SubscriberContext;
import kd.isc.kem.core.subscribe.model.TargetModel;
import kd.isc.kem.core.subscribe.model.TargetRetryModel;

/* loaded from: input_file:kd/isc/kem/core/subscribe/definition/ProcessorDefinition.class */
public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type, ModelType>, ModelType> extends OptionalIdentifiedDefinition<Type> {
    protected static final Log LOG = LogFactory.getLog(ProcessorDefinition.class);
    protected static final JsonHashMap NULL = null;
    private final SubscriberContext subscriberContext;
    private final NodeType nodeType;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/isc/kem/core/subscribe/definition/ProcessorDefinition$NodeInOur.class */
    public static class NodeInOur<Model> {
        private JsonHashMap input;
        private NodeOutput output;
        private boolean isSuccess;
        private Throwable error;

        NodeInOur(Supplier<JsonHashMap> supplier, Function<JsonHashMap, NodeOutput> function, Model model) {
            this.isSuccess = true;
            try {
                this.input = supplier.get();
            } catch (Exception e) {
                this.error = e;
                this.isSuccess = false;
            }
            try {
                if (this.isSuccess) {
                    this.output = function.apply(this.input);
                }
            } catch (Exception e2) {
                this.error = e2;
                this.isSuccess = false;
            }
        }

        JsonHashMap getIn() {
            return this.input;
        }

        NodeOutput getOut() {
            return this.output;
        }
    }

    public ProcessorDefinition(SubscriberContext subscriberContext, NodeType nodeType) {
        KemAssert.notNull(subscriberContext, "parentContext must not be null");
        KemAssert.notNull(nodeType, "nodeType must not be null");
        if (NodeType.LOG == nodeType && NodeType.LOG == subscriberContext.getNodeType()) {
            throw new KemException(KemCoreError.CoreError, new Object[]{"cannot have connected log nodes"});
        }
        this.subscriberContext = new SubscriberContext(subscriberContext, nodeType);
        this.nodeType = nodeType;
        LogUtil.info(LOG, super.getClass().getName() + ": " + nodeType + "：" + this.subscriberContext);
    }

    public SourceDefinition source(Function<SubscriberContext, SourceModel> function) {
        return (SourceDefinition) wrap(() -> {
            return (SourceDefinition) definitionWrap(SourceDefinition::new, function);
        }, NodeType.SOURCE);
    }

    public FilterDefinition filter(Function<SubscriberContext, FilterModel> function) {
        return (FilterDefinition) wrap(() -> {
            return (FilterDefinition) definitionWrap(FilterDefinition::new, function);
        }, NodeType.FILTER);
    }

    public Optional<TargetDefinition> targets(Function<SubscriberContext, TargetModel> function) {
        Function function2 = TargetDefinition::new;
        return Optional.ofNullable(wrap(() -> {
            return (TargetDefinition) definitionWrap(function2, function);
        }, NodeType.TARGET));
    }

    public Optional<TargetDefinition> retryTargets(Function<SubscriberContext, TargetModel> function, TargetRetryModel targetRetryModel, boolean z) {
        Function function2 = subscriberContext -> {
            return new TargetDefinition(subscriberContext, targetRetryModel, z);
        };
        return Optional.ofNullable(wrap(() -> {
            return (TargetDefinition) definitionWrap(function2, function);
        }, NodeType.TARGET));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <Def extends ProcessorDefinition<Def, Model>, Model> Def definitionWrap(Function<SubscriberContext, Def> function, BiFunction<SubscriberContext, Boolean, Model> biFunction, boolean z) {
        Def apply = function.apply(getSubscriberContext());
        KemAssert.notNull(apply, "definition must not be null");
        Model apply2 = biFunction.apply(apply.getSubscriberContext(), Boolean.valueOf(z));
        KemAssert.notNull(apply2, "model must not be null");
        return (Def) endDefinition(apply, apply2);
    }

    private <Def extends ProcessorDefinition<Def, Model>, Model> Def definitionWrap(Function<SubscriberContext, Def> function, Function<SubscriberContext, Model> function2) {
        Def apply = function.apply(getSubscriberContext());
        KemAssert.notNull(apply, "definition must not be null");
        Model apply2 = function2.apply(apply.getSubscriberContext());
        KemAssert.notNull(apply2, "model must not be null");
        return (Def) endDefinition(apply, apply2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <Def extends ProcessorDefinition<Def, Model>, Model> Def wrap(Supplier<Def> supplier, NodeType nodeType) {
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            return supplier.get();
        } catch (Exception e) {
            LOG.error("KEM_LOG:：" + nodeType, e);
            if (!(e instanceof KemException) || !KemCoreError.NOT_RECORD_LOG.getCode().equals(e.getErrorCode().getCode())) {
                LogHandler.recordNodeLog(new NodeLogModel(getSubscribeInfo().getSubInstanceId(), 0L, nodeType, NodeLogStatus.Fail, null, null, e, true, 0, createStarted.stop().elapsed(TimeUnit.MILLISECONDS)), false);
            }
            throw new KemException(e, KemCoreError.CoreError, new Object[]{ExceptionUtil.getMessage(e)});
        }
    }

    public SubscriberContext getSubscriberContext() {
        return this.subscriberContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscribeInfo getSubscribeInfo() {
        return getSubscriberContext().getInfo();
    }

    private <Def extends ProcessorDefinition<Def, Model>, Model> Def endDefinition(Def def, Model model) {
        SubscriberContext subscriberContext = def.getSubscriberContext();
        if (subscriberContext.getNodeType() != NodeType.LOG && subscriberContext.getNodeIn() != null && !subscriberContext.getNodeIn().isStatus()) {
            LogHandler.updateLog(subscriberContext.getInfo().getSubInstanceId(), LogStatus.Ignore);
            return null;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            NodeInOur<ModelType> execute = def.execute(model);
            subscriberContext.setInput(execute.getIn());
            subscriberContext.setOutput(execute.getOut());
            if (!((NodeInOur) execute).isSuccess) {
                throw ((NodeInOur) execute).error;
            }
            subscriberContext.setCost(createStarted.stop().elapsed(TimeUnit.MILLISECONDS));
            return def;
        } catch (Throwable th) {
            LOG.error("KEM_LOG:：" + subscriberContext.getNodeType(), th);
            LogHandler.recordNodeLog(new NodeLogModel(getSubscribeInfo().getSubInstanceId(), def.getNodeId(), subscriberContext.getNodeType(), NodeLogStatus.Fail, null, null, th, true, 0, createStarted.stop().elapsed(TimeUnit.MILLISECONDS)), false);
            throw new KemException(th, KemCoreError.NOT_RECORD_LOG, new Object[]{ExceptionUtil.getMessage(th)});
        }
    }

    NodeType getNodeType() {
        return this.nodeType;
    }

    @Override // kd.isc.kem.core.subscribe.definition.OptionalIdentifiedDefinition
    long getNodeId() {
        return getSubscriberContext().getNodeId();
    }

    abstract JsonHashMap getInput(ModelType modeltype);

    abstract NodeOutput getOutput(JsonHashMap jsonHashMap, ModelType modeltype);

    NodeInOur<ModelType> execute(ModelType modeltype) {
        return new NodeInOur<>(() -> {
            return getInput(modeltype);
        }, jsonHashMap -> {
            return getOutput(jsonHashMap, modeltype);
        }, modeltype);
    }
}
