package kd.bos.algox.flink.core;

import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.util.ConfigurationUtil;

/* loaded from: input_file:kd/bos/algox/flink/core/InputSemaphoreFactory.class */
public class InputSemaphoreFactory {
    private static InputSemaphore inputSemaphore;
    private static boolean semaphoreEnable = ConfigurationUtil.getBoolean("algox.input.semaphore.enable", false).booleanValue();
    private static int inputCount = ConfigurationUtil.getInteger("algox.input.semaphore.count", 20).intValue();
    private static final Log logger = LogFactory.getLog(InputSemaphore.class);

    public static synchronized InputSemaphore getSemaphonre() {
        return inputSemaphore;
    }

    static {
        if (semaphoreEnable) {
            inputSemaphore = new InputJdkSemaphore(inputCount);
        } else {
            inputSemaphore = new InputEmptySemaphore(inputCount);
        }
        ConfigurationUtil.observeInteger("algox.input.semaphore.count", inputCount, num -> {
            synchronized (InputSemaphoreFactory.class) {
                if (semaphoreEnable) {
                    int availablePermits = inputCount - inputSemaphore.availablePermits();
                    InputJdkSemaphore inputJdkSemaphore = new InputJdkSemaphore(num.intValue());
                    if (availablePermits >= 0) {
                        try {
                            if (availablePermits < num.intValue()) {
                                inputJdkSemaphore.acquire(availablePermits);
                            }
                        } catch (InterruptedException e) {
                            logger.error("change semaphore count error", e);
                        }
                    }
                    inputSemaphore = inputJdkSemaphore;
                }
                inputCount = num.intValue();
            }
        });
        ConfigurationUtil.observeBoolean("algox.input.semaphore.enable", semaphoreEnable, bool -> {
            synchronized (InputSemaphoreFactory.class) {
                if (bool.booleanValue()) {
                    inputSemaphore = new InputJdkSemaphore(inputCount);
                } else {
                    inputSemaphore = new InputEmptySemaphore(1);
                }
                semaphoreEnable = bool.booleanValue();
            }
        });
    }
}
