package kd.ebg.egf.common.framework.lock;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.entity.DynamicObjectCollection;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.dlock.DLock;
import kd.bos.orm.query.QFilter;
import kd.bos.servicehelper.BusinessDataServiceHelper;
import kd.bos.servicehelper.QueryServiceHelper;
import kd.bos.servicehelper.operation.DeleteServiceHelper;
import kd.bos.servicehelper.operation.SaveServiceHelper;
import kd.ebg.egf.common.cache.CosmicCache;
import kd.ebg.egf.common.context.EBContext;
import kd.ebg.egf.common.context.RequestContextUtils;
import kd.ebg.egf.common.entity.base.EBException;
import kd.ebg.egf.common.exception.EBConnection;
import kd.ebg.egf.common.exception.EBExceiptionUtil;
import kd.ebg.egf.common.exception.EBExceptionEnum;
import kd.ebg.egf.common.framework.service.BankLoginConfigService;
import kd.ebg.egf.common.log.BankNewLogProp;
import kd.ebg.egf.common.log.EBGLogger;
import kd.ebg.egf.common.model.bank.login.BankLoginConfigKey;
import kd.ebg.egf.common.model.monitor.MonitorRequestInfo;
import kd.ebg.egf.common.repository.monitor.BankReqRecordRepository;
import kd.ebg.egf.common.utils.JsonUtil;
import kd.ebg.egf.common.utils.string.StrUtil;
import kd.ebg.egf.common.utils.string.StringUtils;
import kd.ebg.egf.common.zookeeper.cluster.ClusterManager;
import kd.ebg.egf.common.zookeeper.node.SingleNodeIDGetter;
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:kd/ebg/egf/common/framework/lock/DistributedFepAccess.class */
public class DistributedFepAccess implements FEPAccess {
    public static final String entityName = "aqap_login_lock_monitor";
    public static final String properties = "time_stamp,thread_name,biz_type,date_time,node,log_no,trace_no,bank_version,bank_login,lock_path";
    private List<String> toType = Arrays.asList("balance", "batchBalance", "detail", "queryPay", "queryOverseaPay", "queryLinkpay");
    private String key;
    private int timeout;
    private int concurrencyCount;
    private InterProcessSemaphoreV2 semaphore;
    private ConcurrentMap<String, Lease> leaseConcurrentMap;
    private static final EBGLogger logger = EBGLogger.getInstance().getLogger(DistributedFepAccess.class);
    private static final Integer TYPE_START = 0;
    private static final Integer TYPE_END = 1;
    static ThreadLocal<Lease> leaseThreadLocal = new ThreadLocal<>();

    public DistributedFepAccess(String str, int i) {
        this.timeout = 60000;
        String property = System.getProperty("dubbo.consumer.timeout");
        if (StringUtils.isNotEmpty(property)) {
            try {
                this.timeout = Integer.parseInt(property);
            } catch (NumberFormatException e) {
                logger.error("超时时间格式转换异常", e);
            }
        }
        this.key = str;
        if (i > 0) {
            this.concurrencyCount = i;
        } else {
            this.concurrencyCount = 1;
        }
        ClusterManager clusterManager = ClusterManager.getInstance();
        if (clusterManager == null) {
            logger.error("clusterManager为空");
        } else {
            this.semaphore = clusterManager.getSemaphore(str, this.concurrencyCount);
        }
        this.leaseConcurrentMap = new ConcurrentHashMap(32);
    }

    private String getValue(String str) {
        return ClusterManager.getInstance().getNodeValue(str);
    }

    private void setNodeTime(String str) {
        ClusterManager.getInstance().setNodeValue(str);
    }

    private void setNodeInfo(String str) {
        ClusterManager clusterManager = ClusterManager.getInstance();
        EBContext context = EBContext.getContext();
        String bizName = context.getBizName();
        String logger_batch_no = context.getLogger_batch_no();
        HashMap hashMap = new HashMap(16);
        hashMap.put("timeStamp", String.valueOf(System.currentTimeMillis()));
        hashMap.put("threadName", Thread.currentThread().getName());
        hashMap.put("bizType", bizName);
        hashMap.put("dateTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        hashMap.put("node", SingleNodeIDGetter.getInstance().nodeID());
        hashMap.put("logNo", logger_batch_no);
        hashMap.put("traceNo", RequestContext.get().getTraceId());
        hashMap.put("bankVersion", context.getBankVersionID());
        hashMap.put("bankLogin", context.getBankLoginID());
        hashMap.put("lockPath", str);
        clusterManager.setNodeValue(str, JsonUtil.toJson(hashMap));
        saveLockInfo(hashMap);
    }

    private boolean deleteNode(String str) {
        return ClusterManager.getInstance().deleteNode(str);
    }

    private boolean isExisted(String str) {
        return ClusterManager.getInstance().isExistedNode(str);
    }

    private void checkParticipantNodes() {
        try {
            Collection<String> participantNodes = this.semaphore.getParticipantNodes();
            StringBuilder sb = new StringBuilder();
            if (participantNodes.size() > 0) {
                sb.append(ResManager.loadKDString("当前存在前置机锁ZK节点:", "DistributedFepAccess_0", "ebg-egf-common", new Object[0])).append(StrUtil.CRLF);
                for (String str : participantNodes) {
                    sb.append("/semaphore/").append(this.key).append("/leases/").append(str).append(StrUtil.CRLF);
                    String str2 = "/semaphore/" + this.key + "/leases/" + str;
                    String value = getValue(str2);
                    logger.info("前置机锁[ " + str + " ]节点值为：" + value);
                    if (StringUtils.isNotEmpty(value)) {
                        try {
                            if (System.currentTimeMillis() - Long.valueOf(value).longValue() > 3600000) {
                                deleteNode(str2);
                                logger.info("前置机锁[ " + str + " ]节点过期，有效时间：3600s");
                            }
                        } catch (NumberFormatException e) {
                            logger.info("临时锁节点存储的值不为数字类型，重新设置为时间戳");
                            setNodeTime(str2);
                        }
                    }
                }
                logger.info(sb.toString());
            }
        } catch (Throwable th) {
            if (th instanceof KeeperException.NoNodeException) {
                logger.info("当前前置机锁未占用");
            } else {
                logger.error(th.getMessage());
            }
        }
    }

    private boolean isEBG_SIM() {
        return "EBG_SIM".equals(EBContext.getContext().getBankVersionID());
    }

    @Override // kd.ebg.egf.common.framework.lock.FEPAccess
    public void lock() {
        Lease acquire;
        if (isEBG_SIM()) {
            record("processTime", TYPE_START.intValue());
            return;
        }
        try {
            if (!ClusterManager.getInstance().isConnection()) {
                throw new EBConnection(ResManager.loadKDString("失去与zookeeper连接", "DistributedFepAccess_4", "ebg-egf-common", new Object[0]));
            }
            logger.info("准备获取分布式授权，线程：{};\r\n当前业务的前置机并发锁数量为{}", Thread.currentThread().getName(), Integer.valueOf(this.concurrencyCount));
            freeExpiredLock();
            String bizName = EBContext.getContext().getBizName();
            record("waitTime", TYPE_START.intValue());
            if (StringUtils.isNotEmpty(bizName) && this.toType.contains(bizName)) {
                acquire = this.semaphore.acquire(this.timeout, TimeUnit.MILLISECONDS);
                if (acquire == null) {
                    throw EBExceiptionUtil.serviceException(String.format(ResManager.loadKDString("等待前置机锁超时，超时时间：%s", "DistributedFepAccess_1", "ebg-egf-common", new Object[0]), Integer.valueOf(this.timeout)));
                }
            } else {
                acquire = this.semaphore.acquire();
            }
            if (acquire == null) {
                throw EBExceiptionUtil.serviceException(ResManager.loadKDString("Lease为空值", "DistributedFepAccess_2", "ebg-egf-common", new Object[0]));
            }
            this.leaseConcurrentMap.put(Thread.currentThread().getName(), acquire);
            leaseThreadLocal.set(acquire);
            record("waitTime", TYPE_END.intValue());
            String str = "/semaphore/" + this.key + "/leases/" + acquire.getNodeName();
            setNodeInfo(str);
            logger.info(String.format(ResManager.loadKDString("获取分布式授权成功，%s", "DistributedFepAccess_3", "ebg-egf-common", new Object[0]), str));
            record("processTime", TYPE_START.intValue());
        } catch (Throwable th) {
            if (th instanceof EBConnection) {
                throw EBExceiptionUtil.loginAccessException(String.format(ResManager.loadKDString("获取分布式锁失败,连接异常：%s", "DistributedFepAccess_5", "ebg-egf-common", new Object[0]), th.getMessage()), th);
            }
            if (!(th instanceof CuratorConnectionLossException) && !(th instanceof KeeperException.ConnectionLossException)) {
                throw EBExceiptionUtil.loginAccessException(ResManager.loadKDString("获取分布式锁失败", "DistributedFepAccess_7", "ebg-egf-common", new Object[0]), th);
            }
            throw EBExceiptionUtil.loginAccessException(ResManager.loadKDString("获取分布式锁失败,失去与zookeeper连接", "DistributedFepAccess_6", "ebg-egf-common", new Object[0]), th);
        }
    }

    @Override // kd.ebg.egf.common.framework.lock.FEPAccess
    public void lock(int i) {
        try {
            if (!ClusterManager.getInstance().isConnection()) {
                throw new EBConnection(ResManager.loadKDString("失去与zookeeper连接", "DistributedFepAccess_4", "ebg-egf-common", new Object[0]));
            }
            logger.info("准备获取分布式授权，线程：" + Thread.currentThread().getName());
            Lease acquire = this.semaphore.acquire(i, TimeUnit.SECONDS);
            if (acquire == null) {
                throw EBExceiptionUtil.serviceException(String.format(ResManager.loadKDString("等待前置机锁超时，超时时间：%s", "DistributedFepAccess_1", "ebg-egf-common", new Object[0]), Integer.valueOf(this.timeout)));
            }
            if (acquire == null) {
                throw EBExceiptionUtil.serviceException(ResManager.loadKDString("Lease为空值", "DistributedFepAccess_2", "ebg-egf-common", new Object[0]));
            }
            this.leaseConcurrentMap.put(Thread.currentThread().getName(), acquire);
            leaseThreadLocal.set(acquire);
        } catch (Throwable th) {
            if (th instanceof EBConnection) {
                throw EBExceiptionUtil.loginAccessException(String.format(ResManager.loadKDString("获取分布式锁失败,连接异常：%s", "DistributedFepAccess_5", "ebg-egf-common", new Object[0]), th.getMessage()), th);
            }
            if (!(th instanceof CuratorConnectionLossException) && !(th instanceof KeeperException.ConnectionLossException)) {
                throw EBExceiptionUtil.loginAccessException(ResManager.loadKDString("获取分布式锁失败", "DistributedFepAccess_7", "ebg-egf-common", new Object[0]), th);
            }
            throw EBExceiptionUtil.loginAccessException(ResManager.loadKDString("获取分布式锁失败,失去与zookeeper连接", "DistributedFepAccess_6", "ebg-egf-common", new Object[0]), th);
        }
    }

    @Override // kd.ebg.egf.common.framework.lock.FEPAccess
    public boolean isFree() {
        try {
            Collection participantNodes = this.semaphore.getParticipantNodes();
            logger.info("ZK分布式锁当前临时节点的个数：" + participantNodes.size());
            logger.info("当前分布式锁个数为：" + this.concurrencyCount);
            return participantNodes.size() < this.concurrencyCount;
        } catch (Throwable th) {
            return true;
        }
    }

    @Override // kd.ebg.egf.common.framework.lock.FEPAccess
    public void release() {
        if (isEBG_SIM()) {
            record("processTime", TYPE_END.intValue());
            return;
        }
        Lease lease = this.leaseConcurrentMap.get(Thread.currentThread().getName());
        if (!Objects.nonNull(lease)) {
            Lease lease2 = leaseThreadLocal.get();
            if (Objects.nonNull(lease2)) {
                this.semaphore.returnLease(lease2);
                leaseThreadLocal.remove();
                StringBuilder sb = new StringBuilder();
                sb.append("/semaphore/").append(this.key).append("/leases/").append(lease2.getNodeName());
                deleteLockInfo(sb.toString());
                logger.info("释放分布式授权完成");
            } else {
                logger.info("当前线程没有获取授权");
            }
        } else if (ClusterManager.getInstance().isConnection()) {
            this.semaphore.returnLease(lease);
            this.leaseConcurrentMap.remove(Thread.currentThread().getName(), lease);
            leaseThreadLocal.remove();
            String str = "/semaphore/" + this.key + "/leases/" + lease.getNodeName();
            deleteLockInfo(str);
            logger.info(String.format(ResManager.loadKDString("释放分布式授权成功，%s", "DistributedFepAccess_8", "ebg-egf-common", new Object[0]), str));
        } else {
            logger.error("zookeeper连接异常，释放分布式授权失败，" + lease.getNodeName());
        }
        record("processTime", TYPE_END.intValue());
    }

    private void record(String str, int i) {
        LocalDateTime now = LocalDateTime.now();
        Long valueOf = Long.valueOf(now.toInstant(ZoneOffset.of("+8")).toEpochMilli());
        if (i == TYPE_START.intValue()) {
            RequestContextUtils.setRunningParam(str, String.valueOf(valueOf));
            if ("waitTime".equalsIgnoreCase(str)) {
                RequestContextUtils.setRunningParam("waitLockTime", JsonUtil.toJson(now));
                return;
            }
            return;
        }
        if (i == TYPE_END.intValue()) {
            String runningParam = RequestContextUtils.getRunningParam(str);
            if (runningParam != null) {
                long longValue = valueOf.longValue() - Long.parseLong(runningParam);
                RequestContextUtils.setRunningParam(str, String.valueOf(longValue));
                if ("waitTime".equalsIgnoreCase(str)) {
                    logger.info("等待获取前置机锁耗时：{} ms", Long.valueOf(longValue));
                } else {
                    logger.info("银行请求处理耗时：{} ms", Long.valueOf(longValue));
                }
            }
            if ("waitTime".equalsIgnoreCase(str)) {
                RequestContextUtils.setRunningParam("getLockTime", JsonUtil.toJson(now));
            } else if ("processTime".equalsIgnoreCase(str)) {
                try {
                    store();
                } catch (Throwable th) {
                    logger.error("记录请求统计信息出现异常：{}", th.getMessage());
                }
            }
        }
    }

    private void store() {
        EBContext context = EBContext.getContext();
        String requestSeqID = context.getRequestSeqID();
        MonitorRequestInfo monitorRequestInfo = getMonitorRequestInfo(requestSeqID);
        Map<String, String> runningParams = context.getRunningParams();
        long j = 0;
        long j2 = 0;
        if (runningParams.containsKey("processTime")) {
            j = Long.parseLong(runningParams.get("processTime"));
        }
        if (runningParams.containsKey("waitTime")) {
            j2 = Long.parseLong(runningParams.get("waitTime"));
        }
        if (runningParams.containsKey("waitLockTime")) {
            monitorRequestInfo.setWaitLockTime((LocalDateTime) JsonUtil.fromJson(runningParams.get("waitLockTime"), LocalDateTime.class));
        }
        if (runningParams.containsKey("getLockTime")) {
            monitorRequestInfo.setGetLockTime((LocalDateTime) JsonUtil.fromJson(runningParams.get("getLockTime"), LocalDateTime.class));
        }
        if (EBExceptionEnum.isConnectionException(context.getProcessFlag()) && runningParams.containsKey("Exception")) {
            monitorRequestInfo.setExtData(runningParams.get("Exception"));
            monitorRequestInfo.setStatus(EBException.CONNECTION_EXCEPTION);
        }
        String bankVersionID = context.getBankVersionID();
        BankLoginConfigKey bankLoginConfigKey = new BankLoginConfigKey();
        bankLoginConfigKey.setBankLoginId(context.getBankLoginID());
        bankLoginConfigKey.setBankVersionId(bankVersionID);
        bankLoginConfigKey.setCustomID(context.getCustomID());
        bankLoginConfigKey.setBankConfigId("leaseNum");
        int bankLoginLeaseNum = getBankLoginLeaseNum(bankLoginConfigKey);
        boolean z = j2 > 3000;
        monitorRequestInfo.setRequestID(requestSeqID);
        monitorRequestInfo.setBlockFlag(Boolean.valueOf(z));
        monitorRequestInfo.setBlockMillis(Long.valueOf(j2));
        monitorRequestInfo.setProcessMillis(Long.valueOf(j));
        monitorRequestInfo.setCustomID(context.getCustomID());
        monitorRequestInfo.setBankVersionID(context.getBankVersionID());
        monitorRequestInfo.setBankLoginID(context.getBankLoginID());
        monitorRequestInfo.setType(context.getBizName());
        monitorRequestInfo.setLoginLockNum(Integer.valueOf(bankLoginLeaseNum));
        BankReqRecordRepository.getInstance().save(monitorRequestInfo);
    }

    private MonitorRequestInfo getMonitorRequestInfo(String str) {
        Map<String, String> all = CosmicCache.getAll(EBContext.getContext().getCustomID());
        return (all == null || !all.containsKey(str)) ? new MonitorRequestInfo() : (MonitorRequestInfo) JsonUtil.fromJson(all.get(str), MonitorRequestInfo.class);
    }

    private int getBankLoginLeaseNum(BankLoginConfigKey bankLoginConfigKey) {
        try {
            String bankLoginConfigValue = BankLoginConfigService.getInstance().getBankLoginConfigValue(bankLoginConfigKey);
            if (StringUtils.isNotEmpty(bankLoginConfigValue) && StringUtils.isNumeric(bankLoginConfigValue)) {
                return Integer.parseInt(bankLoginConfigValue);
            }
            return 1;
        } catch (Throwable th) {
            logger.error("查询前置机并发锁出现异常：" + th.getMessage(), th);
            return 1;
        }
    }

    private void saveLockInfo(Map<String, String> map) {
        try {
            DynamicObject newDynamicObject = BusinessDataServiceHelper.newDynamicObject(entityName);
            newDynamicObject.set("time_stamp", map.get("timeStamp"));
            newDynamicObject.set("thread_name", map.get("threadName"));
            newDynamicObject.set("biz_type", map.get("bizType"));
            newDynamicObject.set("date_time", map.get("dateTime"));
            newDynamicObject.set("node", map.get("node"));
            newDynamicObject.set("log_no", map.get("logNo"));
            newDynamicObject.set("trace_no", map.get("traceNo"));
            newDynamicObject.set(BankNewLogProp.BANK_VERSION, map.get("bankVersion"));
            newDynamicObject.set("bank_login", map.get("bankLogin"));
            newDynamicObject.set("lock_path", map.get("lockPath"));
            newDynamicObject.set("status", "A");
            SaveServiceHelper.save(new DynamicObject[]{newDynamicObject});
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }

    private void deleteLockInfo(String str) {
        try {
            if (isExisted(str)) {
                logger.warn("前置机锁节点未释放，尝试重新释放：" + str);
                deleteNode(str);
            } else {
                DeleteServiceHelper.delete(entityName, QFilter.of("lock_path=?", new Object[]{str}).toArray());
            }
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }

    private void freeExpiredLock() {
        DLock create = DLock.create(this.key, "login_lock_monitor");
        if (create.tryLock()) {
            try {
                DynamicObjectCollection query = QueryServiceHelper.query(entityName, properties, QFilter.of("bank_login=?", new Object[]{EBContext.getContext().getBankLoginID()}).toArray());
                for (int i = 0; i < query.size(); i++) {
                    String string = ((DynamicObject) query.get(i)).getString("time_stamp");
                    String string2 = ((DynamicObject) query.get(i)).getString("lock_path");
                    if (System.currentTimeMillis() - Long.valueOf(string).longValue() > 30 * 60 * 1000) {
                        if (isExisted(string2)) {
                            logger.info(String.format("前置机锁节点超时%s分钟，准备清理zk节点", 30L));
                            if (deleteNode(string2)) {
                                deleteLockInfo(string2);
                                logger.info(String.format("超时zk节点清理完成：%s", string2));
                            } else {
                                logger.info(String.format("超时zk节点清理失败：%s", string2));
                            }
                        } else {
                            deleteLockInfo(string2);
                        }
                    }
                }
            } finally {
                create.unlock();
            }
        }
    }
}
