From e3d7c5d99083cbbcca7e446bfe026cf201995647 Mon Sep 17 00:00:00 2001 From: saintkayluk Date: Fri, 18 Feb 2022 17:47:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BF=BD=E7=95=A5mysql=E6=BA=90=E5=BA=93?= =?UTF-8?q?=E7=9A=84=E8=99=9A=E6=8B=9F=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MySqlSrcIgnoreVirtualColumn.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/MySqlSrcIgnoreVirtualColumn.java diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/MySqlSrcIgnoreVirtualColumn.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/MySqlSrcIgnoreVirtualColumn.java new file mode 100644 index 0000000..d13e6a0 --- /dev/null +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/MySqlSrcIgnoreVirtualColumn.java @@ -0,0 +1,81 @@ +package com.clougence.cloudcanal.dataprocess.datatransform; + +import com.clougence.cloudcanal.sdk.api.CloudCanalProcessorV2; +import com.clougence.cloudcanal.sdk.api.JavaDsType; +import com.clougence.cloudcanal.sdk.api.ProcessorContext; +import com.clougence.cloudcanal.sdk.api.modelv2.CustomData; +import com.clougence.cloudcanal.sdk.api.modelv2.CustomRecordV2; +import com.clougence.cloudcanal.sdk.api.modelv2.SchemaInfo; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.*; + +/** + * ºöÂÔmysqlÔ´¶ËÐéÄâ×Ö¶Î + * + * @author Saint Kay + * @date 2022/2/16 + */ +public class MySqlSrcIgnoreVirtualColumn implements CloudCanalProcessorV2 { + + protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); + + private Map> virtualColumnMap = new HashMap<>(); + + + @Override + public void start(ProcessorContext context) { + if (context.getSrcDsType().getContextDsType() != JavaDsType.JdbcDataSource) { + throw new IllegalArgumentException("src ds type is not JdbcDataSource"); + } + DataSource dataSource = (DataSource) context.getSrcRdbDs(); + String sql = "SELECT TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME FROM information_schema.`COLUMNS` WHERE EXTRA = 'VIRTUAL GENERATED';"; + try { + Connection conn = dataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql); + ResultSet rs = ps.executeQuery(); + Set tmp = new HashSet(); + while (rs.next()) { + String tableSchema = rs.getString("TABLE_SCHEMA"); + String tableName = rs.getString("TABLE_NAME"); + String columnName = rs.getString("COLUMN_NAME"); + if (tmp.add(tableSchema + "." + tableName)) { + virtualColumnMap.put(new SchemaInfo(null, tableSchema, tableName), new ArrayList<>()); + } + virtualColumnMap.get(new SchemaInfo(null, tableSchema, tableName)).add(columnName); + } + } catch (SQLException e) { + customLogger.error(ExceptionUtils.getRootCauseMessage(e), e); + } + } + + + @Override + public List process(CustomData data) { + List re = new ArrayList<>(); + if (virtualColumnMap.containsKey(data.getSchemaInfo())) { + List virtualColumns = virtualColumnMap.get(data.getSchemaInfo()); + for (CustomRecordV2 recordV2 : data.getRecords()) { + for (String v : virtualColumns) { + recordV2.dropField(v); + } + } + } + re.add(data); + return re; + } + + @Override + public void stop() { + + } + + +} -- Gitee