package kd.bos.ksql.shell.trace.client;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import kd.bos.ksql.shell.trace.LogItem;
import kd.bos.util.DisCardUtil;

/* loaded from: input_file:kd/bos/ksql/shell/trace/client/NTraceClient.class */
public class NTraceClient {
    public static final int DEFAULT_PORT = 9791;
    private int port;
    private Socket socket;
    private String hostname;
    private Thread thread;

    /* loaded from: input_file:kd/bos/ksql/shell/trace/client/NTraceClient$LogItemReader.class */
    class LogItemReader implements Runnable {
        private LogItemReaderInputStream logItemInputStream;
        private ITraceListener listener;

        public LogItemReader(LogItemReaderInputStream logItemReaderInputStream, ITraceListener iTraceListener) {
            this.logItemInputStream = logItemReaderInputStream;
            this.listener = iTraceListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.listener.receive(LogItem.readLogItem(this.logItemInputStream));
                } catch (IOException e) {
                    DisCardUtil.discard();
                }
            }
        }
    }

    /* loaded from: input_file:kd/bos/ksql/shell/trace/client/NTraceClient$LogItemReaderInputStream.class */
    class LogItemReaderInputStream extends InputStream {
        private ByteBuffer buff;

        public LogItemReaderInputStream(ByteBuffer byteBuffer) {
            this.buff = byteBuffer;
        }

        @Override // java.io.InputStream
        public synchronized int read() throws IOException {
            if (this.buff.position() == this.buff.limit()) {
                this.buff.clear();
                try {
                    wait();
                } catch (InterruptedException e) {
                    DisCardUtil.discard();
                }
            }
            return this.buff.get();
        }
    }

    /* loaded from: input_file:kd/bos/ksql/shell/trace/client/NTraceClient$TraceRunner.class */
    class TraceRunner implements Runnable {
        private ITraceListener listener;
        private SocketChannel socketChannel;

        public TraceRunner(ITraceListener iTraceListener, SocketChannel socketChannel) {
            this.listener = iTraceListener;
            this.socketChannel = socketChannel;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Selector open = Selector.open();
                this.socketChannel.register(open, 1);
                ByteBuffer allocateDirect = ByteBuffer.allocateDirect(1024);
                allocateDirect.clear();
                LogItemReaderInputStream logItemReaderInputStream = new LogItemReaderInputStream(allocateDirect);
                Thread thread = new Thread(new LogItemReader(logItemReaderInputStream, this.listener));
                thread.setName("KSQL NTraceClient LogItemReader");
                thread.start();
                while (open.select() > 0) {
                    Iterator<SelectionKey> it = open.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        it.remove();
                        SocketChannel socketChannel = (SocketChannel) next.channel();
                        synchronized (logItemReaderInputStream) {
                            socketChannel.read(allocateDirect);
                            allocateDirect.flip();
                        }
                        logItemReaderInputStream.notify();
                    }
                }
            } catch (IOException e) {
                DisCardUtil.discard();
            }
        }
    }

    public NTraceClient(String str) {
        this(str, 9791);
    }

    public NTraceClient(String str, int i) {
        this.port = i;
        this.hostname = str;
    }

    public void connect(ITraceListener iTraceListener) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.hostname, this.port);
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(false);
        open.connect(inetSocketAddress);
        while (!open.finishConnect()) {
            Thread.yield();
        }
        this.thread = new Thread(new TraceRunner(iTraceListener, open));
        this.thread.start();
    }

    public void close() throws IOException {
        this.socket.close();
    }
}
