package com.kingdee.bos.qing.dpp.common.datasync.impl;

import com.kingdee.bos.qing.dpp.common.datasync.DataSyncHelper;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncListener;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter;
import com.kingdee.bos.qing.dpp.common.datasync.model.SyncedFieldMeta;
import com.kingdee.bos.qing.dpp.common.gpfdist.GpfDistFileWriter;
import com.kingdee.bos.qing.dpp.common.gpfdist.GpfDistHelper;
import com.kingdee.bos.qing.dpp.common.gpfdist.GpfDistInfo;
import com.kingdee.bos.qing.dpp.common.gpfdist.GpfdistCsvFileOption;
import com.kingdee.bos.qing.dpp.common.gpfdist.IGpfdistSyncListener;
import com.kingdee.bos.qing.dpp.common.gpfdist.exception.GpfdistException;
import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.common.types.DBType;
import com.kingdee.bos.qing.dpp.common.types.DppDataType;
import com.kingdee.bos.qing.dpp.datasource.jdbcadpter.JDBCAdapter;
import com.kingdee.bos.qing.dpp.datasource.jdbcadpter.JdbcAdapterFactory;
import com.kingdee.bos.qing.dpp.exception.QDppException;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import com.kingdee.bos.qing.dpp.model.transform.settings.CsvFormat;
import com.kingdee.bos.qing.dpp.model.transform.settings.GPFDistSinkSettings;
import com.kingdee.bos.qing.dpp.model.transform.source.DppJdbcSource;
import com.kingdee.bos.qing.dpp.utils.CloseUtils;
import com.kingdee.bos.qing.dpp.utils.DBDataSourceUtil;
import com.kingdee.bos.qing.dpp.utils.DppErrorUtil;
import com.kingdee.bos.qing.dpp.utils.StringUtils;
import com.kingdee.bos.qing.filesystem.manager.AbstractQingFile;
import com.kingdee.bos.qing.filesystem.manager.FileFactory;
import com.kingdee.bos.qing.filesystem.manager.localimpl.FileSysUtil;
import com.kingdee.bos.qing.filesystem.manager.model.QingTempFileType;
import com.kingdee.bos.qing.util.ThreadPoolManage;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/common/datasync/impl/GpfdistDataSyncWriter.class */
public class GpfdistDataSyncWriter implements IDataSyncWriter {
    private CsvFormat csvFormat;
    private SyncedFieldMeta tableMeta;
    private String extTableCreateScript;
    private DppDataType[] dppDataTypes;
    private AtomicReference<Exception> csvFileDataLoadException = new AtomicReference<>();
    private AtomicInteger notLoadedCsvFileCounter = new AtomicInteger(0);
    private LinkedBlockingQueue<LoadCsvEvent> LOAD_EVENT_QUEUE = new LinkedBlockingQueue<>(10000);
    private volatile boolean running = true;
    private String jobName;
    private DppJdbcSource greenplumDataSource;
    private GpfDistFileWriter writer;
    private IGpfdistSyncListener dataSyncListener;
    private String gpfdistGatewayAddress;
    private String targetTableName;
    private GPFDistSinkSettings gpfdistSinkSettings;
    private static final Logger log = new DppLogger(GpfDistHelper.LOG_PREFIX, LoggerFactory.getLogger(GpfdistDataSyncWriter.class));
    private static final Object FINISH_LOCK = new Object();

    /* loaded from: input_file:com/kingdee/bos/qing/dpp/common/datasync/impl/GpfdistDataSyncWriter$CsvDataLoader.class */
    private class CsvDataLoader implements Runnable {
        private CsvDataLoader() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                if (!GpfdistDataSyncWriter.this.running) {
                    break;
                }
                if (GpfdistDataSyncWriter.this.csvFileDataLoadException.get() != null) {
                    synchronized (GpfdistDataSyncWriter.FINISH_LOCK) {
                        GpfdistDataSyncWriter.this.notLoadedCsvFileCounter.set(0);
                        GpfdistDataSyncWriter.FINISH_LOCK.notifyAll();
                    }
                    break;
                }
                try {
                    LoadCsvEvent loadCsvEvent = (LoadCsvEvent) GpfdistDataSyncWriter.this.LOAD_EVENT_QUEUE.poll(5000L, TimeUnit.MILLISECONDS);
                    if (null == loadCsvEvent) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 - currentTimeMillis > 60000) {
                            GpfdistDataSyncWriter.this.dataSyncListener.onHeartBeat();
                            currentTimeMillis = currentTimeMillis2;
                        }
                    } else {
                        String str = null;
                        try {
                            try {
                                str = GpfdistDataSyncWriter.this.createExternalTable(loadCsvEvent.getCsvFileName(), loadCsvEvent.getGpfDistInfo(), loadCsvEvent.getCsvFileFullPath());
                                GpfdistDataSyncWriter.this.notifyExternalTableCreated(str);
                                GpfdistDataSyncWriter.this.insertDataFromExtTable(str);
                                GpfdistDataSyncWriter.this.notifyExternalTableLoadFinished(str, loadCsvEvent.getDataSize());
                                GpfdistDataSyncWriter.this.dropExternalTable(str);
                                GpfdistDataSyncWriter.this.deleteFile(loadCsvEvent.getCsvFileName());
                                GpfdistDataSyncWriter.this.notLoadedCsvFileCounter.decrementAndGet();
                                synchronized (GpfdistDataSyncWriter.FINISH_LOCK) {
                                    GpfdistDataSyncWriter.FINISH_LOCK.notifyAll();
                                }
                            } catch (Throwable th) {
                                GpfdistDataSyncWriter.this.dropExternalTable(str);
                                GpfdistDataSyncWriter.this.deleteFile(loadCsvEvent.getCsvFileName());
                                GpfdistDataSyncWriter.this.notLoadedCsvFileCounter.decrementAndGet();
                                synchronized (GpfdistDataSyncWriter.FINISH_LOCK) {
                                    GpfdistDataSyncWriter.FINISH_LOCK.notifyAll();
                                    throw th;
                                }
                            }
                        } catch (Exception e) {
                            GpfdistDataSyncWriter.this.csvFileDataLoadException.compareAndSet(null, DppErrorUtil.getCauseError(e));
                            GpfdistDataSyncWriter.this.dropExternalTable(str);
                            GpfdistDataSyncWriter.this.deleteFile(loadCsvEvent.getCsvFileName());
                            GpfdistDataSyncWriter.this.notLoadedCsvFileCounter.decrementAndGet();
                            synchronized (GpfdistDataSyncWriter.FINISH_LOCK) {
                                GpfdistDataSyncWriter.FINISH_LOCK.notifyAll();
                            }
                        }
                        currentTimeMillis = System.currentTimeMillis();
                    }
                } catch (InterruptedException e2) {
                    GpfdistDataSyncWriter.this.csvFileDataLoadException.compareAndSet(null, e2);
                }
            }
            GpfdistDataSyncWriter.this.notLoadedCsvFileCounter.set(0);
            if (GpfdistDataSyncWriter.this.LOAD_EVENT_QUEUE.size() > 0) {
                Iterator it = GpfdistDataSyncWriter.this.LOAD_EVENT_QUEUE.iterator();
                while (it.hasNext()) {
                    GpfdistDataSyncWriter.this.deleteFile(((LoadCsvEvent) it.next()).getCsvFileName());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/kingdee/bos/qing/dpp/common/datasync/impl/GpfdistDataSyncWriter$LoadCsvEvent.class */
    public static class LoadCsvEvent {
        private GpfDistInfo gpfDistInfo;
        private String csvFileName;
        private long dataSize;
        private String csvFileFullPath;

        public LoadCsvEvent(GpfDistInfo gpfDistInfo, String str, long j, String str2) {
            this.gpfDistInfo = gpfDistInfo;
            this.csvFileName = str;
            this.dataSize = j;
            this.csvFileFullPath = str2;
        }

        public long getDataSize() {
            return this.dataSize;
        }

        public GpfDistInfo getGpfDistInfo() {
            return this.gpfDistInfo;
        }

        public String getCsvFileName() {
            return this.csvFileName;
        }

        public String getCsvFileFullPath() {
            return this.csvFileFullPath;
        }
    }

    public GpfdistDataSyncWriter(String str, SyncedFieldMeta syncedFieldMeta, GPFDistSinkSettings gPFDistSinkSettings) {
        this.jobName = str;
        this.csvFormat = gPFDistSinkSettings.getCsvFormat();
        this.gpfdistGatewayAddress = gPFDistSinkSettings.getGpfdistGatewayAddress();
        this.tableMeta = syncedFieldMeta;
        this.greenplumDataSource = gPFDistSinkSettings.buildSinkSource();
        this.gpfdistSinkSettings = gPFDistSinkSettings;
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void setDataSyncListener(IDataSyncListener iDataSyncListener) {
        this.dataSyncListener = (IGpfdistSyncListener) iDataSyncListener;
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void begin() throws Exception {
        log.info("begin gpf data loader,jobName=" + this.jobName);
        if (this.gpfdistSinkSettings.getInsertTable() == null) {
            this.targetTableName = GpfDistHelper.preparePhysicalTable(this.tableMeta.getFieldList(), this.greenplumDataSource, false);
            this.gpfdistSinkSettings.setInsertTable(this.targetTableName);
            if (this.gpfdistSinkSettings.isAutoAddReservedField()) {
                this.gpfdistSinkSettings.setNewBatchSeq(1L);
            }
            this.gpfdistSinkSettings.setTableNewCreated(true);
        } else {
            this.targetTableName = this.gpfdistSinkSettings.getInsertTable();
            if (this.gpfdistSinkSettings.isAutoAddReservedField()) {
                this.gpfdistSinkSettings.setNewBatchSeq(DataSyncHelper.getCurrentMaxBatchSeq(this.gpfdistSinkSettings.buildSinkSource(), this.targetTableName) + 1);
            }
            this.gpfdistSinkSettings.setTableNewCreated(false);
        }
        this.extTableCreateScript = buildExtTableCreateScript();
        this.dppDataTypes = new DppDataType[this.tableMeta.getSize()];
        for (int i = 0; i < this.tableMeta.getSize(); i++) {
            this.dppDataTypes[i] = this.tableMeta.getFieldOutputType(i);
        }
        ThreadPoolManage.submit(ThreadPoolManage.QingThreadPoolName.QING_MODELER_LONG_TIME_TASK_HANDLER, new CsvDataLoader());
        this.dataSyncListener.onPhysicalTableCreated(this.targetTableName);
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void finishOnErr() {
        try {
            if (null != this.writer && !this.writer.isFinished()) {
                this.writer.finish();
            }
            this.running = false;
            waitCsvDataLoadFinish();
        } finally {
            this.dataSyncListener.onError(this.csvFileDataLoadException.get());
        }
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void finish() throws Exception {
        try {
            try {
                if (null != this.writer && !this.writer.isFinished()) {
                    this.writer.finish();
                    offerNewCsvFileToLoad(new LoadCsvEvent(this.writer.getGpfdistInfo(), this.writer.getCsvFileName(), this.writer.getWriteRowSize(), this.writer.getCsvFileFullPath()));
                }
                waitCsvDataLoadFinish();
                this.running = false;
                if (this.csvFileDataLoadException.get() != null) {
                    throw this.csvFileDataLoadException.get();
                }
            } catch (Exception e) {
                throw e;
            }
        } finally {
            if (!false) {
                this.dataSyncListener.onFinish();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dropExternalTable(String str) {
        try {
            DataSyncHelper.dropTable(this.greenplumDataSource, str, true);
            log.info("drop external table succeed,ext tableName :" + str);
        } catch (Exception e) {
            log.error("drop table failed,tableName:" + str, e);
        }
    }

    private void waitCsvDataLoadFinish() {
        while (this.notLoadedCsvFileCounter.get() > 0) {
            try {
                synchronized (FINISH_LOCK) {
                    FINISH_LOCK.wait(500L);
                }
            } catch (InterruptedException e) {
                log.warn("wait csv data load finish is interrupted");
                return;
            }
        }
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void setError(Exception exc) {
        this.csvFileDataLoadException.set(exc);
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void flush() throws Exception {
    }

    @Override // com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter
    public void writeRowData(Object[] objArr) throws Exception {
        Exception exc = this.csvFileDataLoadException.get();
        if (exc != null) {
            throw exc;
        }
        if (null == this.writer) {
            this.writer = new GpfDistFileWriter(createCsvFile(), GpfDistHelper.requestGpfdist(this.gpfdistSinkSettings));
            this.writer.start(getFileOption());
        }
        this.writer.writeRowContent(objArr);
        if (this.writer.isFinished()) {
            offerNewCsvFileToLoad(new LoadCsvEvent(this.writer.getGpfdistInfo(), this.writer.getCsvFileName(), this.writer.getWriteRowSize(), this.writer.getCsvFileFullPath()));
            this.writer = new GpfDistFileWriter(createCsvFile(), GpfDistHelper.requestGpfdist(this.gpfdistSinkSettings));
            this.writer.start(getFileOption());
        }
    }

    private GpfdistCsvFileOption getFileOption() {
        GpfdistCsvFileOption gpfdistCsvFileOption = new GpfdistCsvFileOption();
        gpfdistCsvFileOption.setCsvFormat(this.csvFormat);
        gpfdistCsvFileOption.setCsvMaxRowSize(this.gpfdistSinkSettings.getCsvFileMaxRowSize());
        gpfdistCsvFileOption.setDataTypes(this.dppDataTypes);
        return gpfdistCsvFileOption;
    }

    private void offerNewCsvFileToLoad(LoadCsvEvent loadCsvEvent) throws InterruptedException {
        this.notLoadedCsvFileCounter.incrementAndGet();
        this.LOAD_EVENT_QUEUE.put(loadCsvEvent);
    }

    private AbstractQingFile createCsvFile() throws GpfdistException {
        AbstractQingFile newTempFile = FileFactory.newTempFile(QingTempFileType.DS_CACHE);
        try {
            if (newTempFile.createNewFile()) {
                return newTempFile;
            }
            throw new GpfdistException("create new csv file failed,maybe no access to create on target dir");
        } catch (IOException e) {
            throw new GpfdistException("create new csv file failed", e);
        }
    }

    private String buildExtTableCreateScript() {
        StringBuilder sb = new StringBuilder();
        sb.append(" CREATE EXTERNAL TABLE #tableName ");
        sb.append(" ( ");
        List<DppField> fieldList = this.tableMeta.getFieldList();
        int size = fieldList.size();
        for (int i = 0; i < size; i++) {
            sb.append(fieldList.get(i).getTableUniqueFieldName()).append(" ").append(this.tableMeta.getFieldTypeInDB(i));
            if (i < size - 1) {
                sb.append(",");
            }
        }
        sb.append(" ) ");
        sb.append(" LOCATION ( #gpfLocation )  ON ALL ");
        sb.append(" FORMAT 'CSV' ");
        sb.append(" ( delimiter '").append(this.csvFormat.getDelimiter()).append("' ");
        sb.append(" null '").append(this.csvFormat.getNullValue()).append("' ");
        sb.append(" escape '").append(this.csvFormat.getEscapeValue()).append("' ");
        sb.append(" quote '").append(this.csvFormat.getQuote()).append("' )");
        sb.append(" ENCODING 'UTF8';");
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String createExternalTable(String str, GpfDistInfo gpfDistInfo, String str2) throws Exception {
        Connection connection = null;
        Statement statement = null;
        try {
            Class.forName(JdbcAdapterFactory.getJdbcAdapter(DBType.GREENPLUM).getDbDriver());
            connection = DBDataSourceUtil.getDataSource(this.greenplumDataSource).getConnection();
            statement = connection.createStatement();
            String str3 = "ext_r_" + str.replace("-", "");
            String replace = this.extTableCreateScript.replace("#tableName", DataSyncHelper.appendSchemaToTableName(this.greenplumDataSource, str3)).replace("#gpfLocation", "'gpfdist://" + ((StringUtils.isNullOrEmpty(this.gpfdistGatewayAddress) ? gpfDistInfo.getIp() : this.gpfdistGatewayAddress) + ":" + gpfDistInfo.getPort()) + "/" + getCsvFileGpfdistPath(str2) + "'");
            statement.execute(replace);
            log.info("jobName:" + this.jobName + ",create external table script:" + replace);
            CloseUtils.close(connection);
            CloseUtils.close(statement);
            return str3;
        } catch (Throwable th) {
            CloseUtils.close(connection);
            CloseUtils.close(statement);
            throw th;
        }
    }

    private String getCsvFileGpfdistPath(String str) throws GpfdistException {
        String substring;
        String str2 = System.getenv("QING_DFS_MODE");
        if (str2 == null || !Boolean.parseBoolean(str2)) {
            int indexOf = str.indexOf(FileSysUtil.getBaseDirectory());
            if (indexOf < 0) {
                throw new GpfdistException("illegal csv file full path on NFS storage mode (" + str + ")");
            }
            substring = str.substring(indexOf + FileSysUtil.getBaseDirectory().length());
        } else {
            int indexOf2 = str.indexOf("level1");
            if (indexOf2 == -1) {
                indexOf2 = str.indexOf("level2");
            }
            if (indexOf2 < 0) {
                throw new GpfdistException("illegal csv file full path on DFS storage mode (" + str + ")");
            }
            substring = str.substring(indexOf2);
        }
        if (File.separatorChar == '\\') {
            substring = substring.replace(File.separatorChar, '/');
        }
        return substring;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void insertDataFromExtTable(String str) throws ClassNotFoundException, SQLException {
        JDBCAdapter jdbcAdapter = JdbcAdapterFactory.getJdbcAdapter(DBType.GREENPLUM);
        Connection connection = null;
        Statement statement = null;
        try {
            Class.forName(jdbcAdapter.getDbDriver());
            connection = DBDataSourceUtil.getDataSource(this.greenplumDataSource).getConnection();
            statement = connection.createStatement();
            statement.execute("insert into " + DataSyncHelper.appendSchemaToTableName(this.greenplumDataSource, jdbcAdapter.getLeftDelimiter() + this.targetTableName + jdbcAdapter.getRightDelimiter()) + " select * from " + DataSyncHelper.appendSchemaToTableName(this.greenplumDataSource, str));
            log.info("load data ext table succeed,external table name:" + str);
            CloseUtils.close(connection);
            CloseUtils.close(statement);
        } catch (Throwable th) {
            CloseUtils.close(connection);
            CloseUtils.close(statement);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteFile(String str) {
        FileFactory.clearTempFile(QingTempFileType.DS_CACHE, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyExternalTableCreated(String str) throws QDppException {
        this.dataSyncListener.onExtTableCreated(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyExternalTableLoadFinished(String str, long j) throws QDppException {
        this.dataSyncListener.onExtTableLoadFinish(str, j);
    }
}
