package kd.ai.aicc.core;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alipay.api.java_websocket.client.WebSocketClient;
import com.alipay.api.java_websocket.handshake.ServerHandshake;
import com.google.common.hash.Hashing;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Base64;
import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kd.ai.aicc.core.Executor;
import kd.ai.aicc.core.domain.Instance;
import kd.ai.aicc.core.domain.Task;
import kd.bos.exception.ErrorCode;
import kd.bos.exception.KDBizException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;

/* loaded from: input_file:kd/ai/aicc/core/XfWebSocketRunner.class */
public class XfWebSocketRunner {
    private static final Log log = LogFactory.getLog(XfWebSocketRunner.class);
    private static final String ERROR_FLAG = "ERROR:";
    private static final int ERROR = 9;
    private static final int CONTINUE = 0;
    private static final int END = 2;
    private static final int MAX_TIME_OUT = 300000;
    private Executor.TaskRunner taskRunner;
    BlockingQueue<String> messageQueue = new LinkedBlockingQueue();
    private int seqNo = 1;
    private final StringBuilder responseTextSb = new StringBuilder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/ai/aicc/core/XfWebSocketRunner$XfWebSocketClient.class */
    public static class XfWebSocketClient extends WebSocketClient {
        private static final Log log = LogFactory.getLog(XfWebSocketClient.class);
        BlockingQueue<String> messageQueue;

        public XfWebSocketClient(BlockingQueue<String> blockingQueue, URI uri) {
            super(uri);
            this.messageQueue = blockingQueue;
        }

        public void onMessage(String str) {
            if (this.messageQueue.offer(str)) {
                return;
            }
            log.warn("offer message: {} error", str);
        }

        public void onError(Exception exc) {
            if (this.messageQueue.offer(XfWebSocketRunner.ERROR_FLAG + exc.getMessage())) {
                return;
            }
            log.warn("offer error to queue ");
        }

        public void onOpen(ServerHandshake serverHandshake) {
        }

        public void onClose(int i, String str, boolean z) {
        }
    }

    public void run(Executor.TaskRunner taskRunner) {
        this.taskRunner = taskRunner;
        Instance atInstance = taskRunner.getAtInstance();
        Task task = taskRunner.getTask();
        try {
            XfWebSocketClient xfWebSocketClient = new XfWebSocketClient(this.messageQueue, new URI(getAuthUrl(atInstance.getUrl(), atInstance.getSecretKey(), atInstance.getProxyUserSecretKey())));
            boolean connectBlocking = xfWebSocketClient.connectBlocking(10L, TimeUnit.SECONDS);
            log.info("xunfei task id : {} 连接服务器 : {} {}", new Object[]{Long.valueOf(task.getId()), atInstance.getUrl(), Boolean.valueOf(connectBlocking)});
            if (connectBlocking) {
                xfWebSocketClient.send(buildRequestBody(task, atInstance).toString());
                onMessage();
            } else {
                ErrorCode connectXfError = Constant.connectXfError(atInstance.getUrl());
                taskRunner.execTaskFailed(connectXfError.getCode(), connectXfError.getMessage());
            }
        } catch (Throwable th) {
            log.error(String.format("xunfei task id : %s, error : %s", Long.valueOf(task.getId()), th.getMessage()), th);
            ErrorCode internalError = Constant.internalError(th.getMessage());
            taskRunner.execTaskFailed(internalError.getCode(), internalError.getMessage());
        }
    }

    public void onMessage() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 300000) {
            String poll = this.messageQueue.poll(100L, TimeUnit.MILLISECONDS);
            while (true) {
                String str = poll;
                if (str != null) {
                    if (str.startsWith(ERROR_FLAG)) {
                        ErrorCode internalError = Constant.internalError(str.substring(ERROR_FLAG.length()));
                        this.taskRunner.execTaskFailed(internalError.getCode(), internalError.getMessage());
                        return;
                    } else {
                        int processMessage = processMessage(str);
                        if (processMessage == END || processMessage == ERROR) {
                            return;
                        } else {
                            poll = this.messageQueue.poll(100L, TimeUnit.MILLISECONDS);
                        }
                    }
                }
            }
        }
        if (System.currentTimeMillis() - currentTimeMillis > 300000) {
            ErrorCode taskTimeOutError = Constant.taskTimeOutError(String.valueOf(this.taskRunner.getTask().getId()));
            this.taskRunner.execTaskFailed(taskTimeOutError.getCode(), taskTimeOutError.getMessage());
        }
    }

    public int processMessage(String str) {
        try {
            JSONObject parseObject = JSON.parseObject(str);
            JSONObject jSONObject = parseObject.getJSONObject("header");
            int intValue = jSONObject.getIntValue("code");
            if (intValue != 0) {
                this.taskRunner.execTaskFailed(String.valueOf(intValue), jSONObject.getString(Constant.RESULT_MESSAGE));
                return ERROR;
            }
            int intValue2 = jSONObject.getIntValue("status");
            JSONArray jSONArray = parseObject.getJSONObject("payload").getJSONObject("choices").getJSONArray("text");
            StringBuilder sb = new StringBuilder();
            for (int i = CONTINUE; i < jSONArray.size(); i++) {
                sb.append(jSONArray.getJSONObject(i).getString("content"));
            }
            this.responseTextSb.append((CharSequence) sb);
            if (this.taskRunner.getTask().isStream()) {
                Executor.TaskRunner taskRunner = this.taskRunner;
                String sb2 = sb.toString();
                int i2 = this.seqNo;
                this.seqNo = i2 + 1;
                taskRunner.notifyStream(sb2, i2);
            }
            if (intValue2 != END) {
                return CONTINUE;
            }
            if (this.taskRunner.getTask().isStream()) {
                this.taskRunner.notifyStreamDone(this.seqNo);
            }
            this.taskRunner.execTaskSuccess(this.responseTextSb.toString(), !this.taskRunner.getTask().isStream());
            usage(parseObject.getJSONObject("payload").getJSONObject("usage"));
            return END;
        } catch (Exception e) {
            log.error(String.format("xunfei task id : %s ,error : %s", Long.valueOf(this.taskRunner.getTask().getId()), e.getMessage()), e);
            ErrorCode internalError = Constant.internalError(e.getMessage());
            this.taskRunner.execTaskFailed(internalError.getCode(), internalError.getMessage());
            return ERROR;
        }
    }

    private void usage(JSONObject jSONObject) {
    }

    private JSONObject buildRequestBody(Task task, Instance instance) {
        JSONObject parseObject = JSON.parseObject(task.getRequestBody());
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("app_id", instance.getClientId());
        jSONObject.put("uid", String.valueOf(task.getId()));
        parseObject.put("header", jSONObject);
        return parseObject;
    }

    public static String getAuthUrl(String str, String str2, String str3) {
        try {
            URL url = new URL(str.replace("ws://", "http://").replace("wss://", "https://"));
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
            String format = simpleDateFormat.format(new Date());
            try {
                return String.format("%s?authorization=%s&date=%s&host=%s", url.toString(), Base64.getEncoder().encodeToString(String.format("api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"", str2, "hmac-sha256", "host date request-line", Base64.getEncoder().encodeToString(Hashing.hmacSha256(str3.getBytes(StandardCharsets.UTF_8)).hashString("host: " + url.getHost() + "\ndate: " + format + "\nGET " + url.getPath() + " HTTP/1.1", StandardCharsets.UTF_8).asBytes())).getBytes(StandardCharsets.UTF_8)), URLEncoder.encode(format, Constant.UTF8), url.getHost()).replace("http://", "ws://").replace("https://", "wss://");
            } catch (UnsupportedEncodingException e) {
                log.error(e.getMessage(), e);
                throw new KDBizException(Constant.internalError(e.getMessage()), new Object[CONTINUE]);
            }
        } catch (MalformedURLException e2) {
            log.error(e2.getMessage(), e2);
            throw new KDBizException(Constant.internalError(e2.getMessage()), new Object[CONTINUE]);
        }
    }
}
