diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java index a70647055d..9a671f53ed 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java @@ -118,4 +118,29 @@ public String generateUrl(String schema) { protected String getMetadataType() { return METADATA_TYPE; } + + @Override + public Map parseMetaDataConfig() { + String url = String.format( + "jdbc:postgres://%s:%d/%s", + config.getHostname(), config.getPort(), composeJdbcProperties(config.getJdbc())); + return parseMetaDataSingleConfig(url); + } + + private String composeJdbcProperties(Map jdbcProperties) { + if (jdbcProperties == null || jdbcProperties.isEmpty()) { + return ""; + } + + StringBuilder sb = new StringBuilder(); + sb.append('?'); + jdbcProperties.forEach((k, v) -> { + sb.append(k); + sb.append("="); + sb.append(v); + sb.append("&"); + }); + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 1248f131b7..1bbb7d65f1 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -100,12 +100,17 @@ private List addSinkInsert( FlinkTableObjectIdentifier targetTable, String sinkSchemaName, FlinkTableObjectIdentifier sinkTable) { - String pkList = StringUtils.join(getPKList(table), "."); - String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, targetTable, config, sinkSchemaName, sinkTable, pkList); + Table sinkTableObject = table.getSinkTable(); + if (sinkTableObject == null) { + sinkTableObject = table; + } + String pkList = StringUtils.join(getPKList(sinkTableObject), "."); + String flinkDDL = + FlinkStatementUtil.getFlinkDDL(sinkTableObject, targetTable, config, sinkSchemaName, sinkTable, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); logger.info("Create {} FlinkSQL DDL successful...", targetTable); - return createInsertOperations(table, sourceTable, targetTable); + return createInsertOperations(sinkTableObject, sourceTable, targetTable); } @Override diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java new file mode 100644 index 0000000000..bed24c94db --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.postgres; + +import static org.mockito.Mockito.when; + +import org.dinky.data.model.FlinkCDCConfig; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link PostgresCDCBuilder#parseMetaDataConfig()} and the private + * {@code composeJdbcProperties} helper introduced in this commit. + */ +public class PostgresCDCBuilderTest { + + @Mock + private FlinkCDCConfig config; + + private PostgresCDCBuilder builder; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + builder = new PostgresCDCBuilder(config); + when(config.getHostname()).thenReturn("localhost"); + when(config.getPort()).thenReturn(5432); + when(config.getDatabase()).thenReturn("testdb"); + when(config.getUsername()).thenReturn("user"); + when(config.getPassword()).thenReturn("pass"); + } + + /** No jdbc properties -> URL should not contain '?' */ + @Test + public void testParseMetaDataConfig_noJdbcProperties() { + when(config.getJdbc()).thenReturn(null); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertNotNull(url); + Assert.assertTrue("URL should start with jdbc:postgres://", url.startsWith("jdbc:postgres://")); + Assert.assertFalse("URL should not contain '?' when jdbc props are empty", url.contains("?")); + } + + /** Empty jdbc map -> same as null, no query string */ + @Test + public void testParseMetaDataConfig_emptyJdbcProperties() { + when(config.getJdbc()).thenReturn(Collections.emptyMap()); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertFalse("URL should not contain '?' for empty jdbc map", url.contains("?")); + } + + /** Single jdbc property -> ?key=value */ + @Test + public void testParseMetaDataConfig_singleJdbcProperty() { + Map jdbc = Collections.singletonMap("ssl", "true"); + when(config.getJdbc()).thenReturn(jdbc); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain '?ssl=true'", url.contains("?ssl=true")); + // Must not end with trailing '&' + Assert.assertFalse("URL must not end with '&'", url.endsWith("&")); + } + + /** Multiple jdbc properties -> all encoded, no trailing '&' */ + @Test + public void testParseMetaDataConfig_multipleJdbcProperties() { + Map jdbc = new LinkedHashMap<>(); + jdbc.put("ssl", "true"); + jdbc.put("sslmode", "require"); + when(config.getJdbc()).thenReturn(jdbc); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain ssl param", url.contains("ssl=true")); + Assert.assertTrue("URL should contain sslmode param", url.contains("sslmode=require")); + Assert.assertFalse("URL must not end with '&'", url.endsWith("&")); + // Exactly one '?' in the query string part + Assert.assertEquals( + "URL should have exactly one '?'", + 1, + url.chars().filter(c -> c == '?').count()); + } + + /** Host / port embedded correctly in URL */ + @Test + public void testParseMetaDataConfig_urlContainsHostAndPort() { + when(config.getJdbc()).thenReturn(null); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain host", url.contains("localhost")); + Assert.assertTrue("URL should contain port", url.contains("5432")); + } +} diff --git a/dinky-common/src/main/java/org/dinky/data/model/Table.java b/dinky-common/src/main/java/org/dinky/data/model/Table.java index 2b3555937f..8415684601 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/Table.java +++ b/dinky-common/src/main/java/org/dinky/data/model/Table.java @@ -67,6 +67,9 @@ public class Table implements Serializable, Comparable, Cloneable { private List columns; + /** The sink table for the source table */ + private Table sinkTable; + /** 驱动类型, @see org.dinky.metadata.enums.DriverType */ private String driverType; diff --git a/dinky-common/src/test/java/org/dinky/data/model/TableTest.java b/dinky-common/src/test/java/org/dinky/data/model/TableTest.java index 33e17ed5bb..aac24f5f7b 100644 --- a/dinky-common/src/test/java/org/dinky/data/model/TableTest.java +++ b/dinky-common/src/test/java/org/dinky/data/model/TableTest.java @@ -87,6 +87,38 @@ void setUp() { flinkConfig = "#{schemaName}=schemaName, #{tableName}=tableName, #{abc}=abc, #{}=null, bcd=bcd"; } + // ---- sinkTable field (added in this commit) ---- + + @Test + void sinkTable_defaultIsNull() { + assertThat(table.getSinkTable(), equalTo(null)); + } + + @Test + void sinkTable_setAndGet() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + assertThat(table.getSinkTable(), equalTo(sink)); + assertThat(table.getSinkTable().getName(), equalTo("sink_orders")); + } + + @Test + void sinkTable_doesNotAffectSourceTableFields() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + // Source table fields must remain unchanged + assertThat(table.getName(), equalTo("TableNameOrigin")); + assertThat(table.getSchema(), equalTo("SchemaOrigin")); + } + + @Test + void sinkTable_canBeResetToNull() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + table.setSinkTable(null); + assertThat(table.getSinkTable(), equalTo(null)); + } + @Test void getFlinkDDL() { String result = table.getFlinkDDL(flinkConfig, "NewTableName"); diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 6bac67085c..ea1a645890 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -129,7 +129,9 @@ public TableResult execute(Executor executor) { String tableName = schemaTableName.split("\\.")[1]; table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName)); schemaList.add(schema); - + Driver sinkRealDriver = getDriver(config, schemaName); + final List
sinkTables = getSinkTables(config, schemaName); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); if (null != sinkDriver) { final String createTableOptions = config.getSink().get(FlinkCDCConfig.AUTO_CREATE_OPTIONS); Table sinkTable = (Table) table.clone(); @@ -151,6 +153,8 @@ public TableResult execute(Executor executor) { Driver driver = Driver.build(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap)); final List
tables = driver.listTables(schemaName); + Driver sinkRealDriver = getDriver(config, schemaName); + final List
sinkTables = getSinkTables(config, schemaName); for (Table table : tables) { if (!Asserts.isEquals(table.getType(), "VIEW")) { if (Asserts.isNotNullCollection(tableRegList)) { @@ -160,6 +164,7 @@ public TableResult execute(Executor executor) { table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schema.getTables().add(table); schemaTableNameList.add(table.getSchemaTableName()); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); break; } } @@ -167,6 +172,7 @@ public TableResult execute(Executor executor) { table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schemaTableNameList.add(table.getSchemaTableName()); schema.getTables().add(table); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); } } } @@ -223,19 +229,71 @@ public TableResult execute(Executor executor) { return tableResultBuilder.build(); } + private static void setSinkTable( + Table table, List
sinkTables, SinkBuilder sinkBuilder, Driver sinkRealDriver) { + String sinkTableName = sinkBuilder.getSinkTableName(table); + for (Table sinkTable : sinkTables) { + String sinkTableSchema = sinkTable.getSchema(); + String sinkTableSchemaTableName = sinkTable.getSchemaTableName(); + String currentSinkTableName = sinkTableSchemaTableName; + if (Asserts.isContainsString(sinkTableSchemaTableName, ".")) { + currentSinkTableName = sinkTableSchemaTableName.split("\\.")[1]; + } + if (sinkTableName.equals(currentSinkTableName)) { + if (null != sinkRealDriver) { + sinkTable.setColumns(sinkRealDriver.listColumnsSortByPK(sinkTableSchema, currentSinkTableName)); + } + table.setSinkTable(sinkTable); + break; + } + } + } + + private List
getSinkTables(FlinkCDCConfig config, String schemaName) throws Exception { + List
sinkTables; + Driver sinkDriver = getDriver(config, schemaName); + if (null == sinkDriver) { + return new ArrayList<>(); + } + Map sink = config.getSink(); + String schema = schemaName; + String sinkDb = sink.get(FlinkCDCConfig.SINK_DB); + if (Asserts.isNotNullString(sinkDb)) { + schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName); + } + sinkTables = sinkDriver.listTables(schema); + return sinkTables; + } + private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { Map sink = config.getSink(); String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { return null; } - String url = sink.get("url"); + return getDriver(config, schemaName); + } + + private Driver getDriver(FlinkCDCConfig config, String schemaName) throws Exception { + Map sink = config.getSink(); + String connector = sink.get("connector"); + String url; + if (Asserts.isEquals(connector, "starrocks")) { + url = sink.get("jdbc-url"); + } else if (Asserts.isEqualsIgnoreCase(connector, "doris")) { + url = "jdbc:mysql://" + sink.get("fenodes"); + } else if (Asserts.isEqualsIgnoreCase(connector, "jdbc")) { + url = sink.get("url"); + } else { + return null; + } String schema = schemaName; String sinkDb = sink.get(FlinkCDCConfig.SINK_DB); if (Asserts.isNotNullString(sinkDb)) { schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName); } - Driver driver = Driver.build(sink.get("connector"), url, sink.get("username"), sink.get("password")); + + Driver driver = Driver.build(connector, url, sink.get("username"), sink.get("password")); if (null != driver && !driver.existSchema(schema)) { driver.createSchema(schema); } diff --git a/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java new file mode 100644 index 0000000000..6446836a21 --- /dev/null +++ b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.trans.ddl; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import org.dinky.cdc.SinkBuilder; +import org.dinky.data.model.Table; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for the private static method + * {@code CreateCDCSourceOperation#setSinkTable} introduced in this commit. + * + *

The method is exercised via reflection to avoid exposing it as package-visible. + */ +class CreateCDCSourceOperationTest { + + private Method setSinkTableMethod; + private SinkBuilder sinkBuilder; + + @BeforeEach + void setUp() throws Exception { + setSinkTableMethod = CreateCDCSourceOperation.class.getDeclaredMethod( + "setSinkTable", Table.class, List.class, SinkBuilder.class, org.dinky.metadata.driver.Driver.class); + setSinkTableMethod.setAccessible(true); + + sinkBuilder = mock(SinkBuilder.class); + } + + private void invoke(Table table, List

sinkTables, SinkBuilder sb, org.dinky.metadata.driver.Driver driver) + throws Exception { + setSinkTableMethod.invoke(null, table, sinkTables, sb, driver); + } + + /** + * When the sinkBuilder returns a table name that matches one of the sink tables + * (plain name, no schema prefix), the source table's sinkTable is set. + */ + @Test + void testMatchBySinkTableName_plain() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + assertEquals("orders", sourceTable.getSinkTable().getName()); + } + + /** + * When the sink table carries a schema prefix (e.g. "public.orders"), the method + * must split on '.' and match using only the table-name part. + */ + @Test + void testMatchBySinkTableName_withSchemaPrefix() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + // getSchemaTableName() returns "public.orders" + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + } + + /** + * No matching sink table -> sinkTable stays null. + */ + @Test + void testNoMatch_sinkTableRemainsNull() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("customers", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNull(sourceTable.getSinkTable()); + } + + /** + * When sinkRealDriver is provided, columns should be loaded from it. + */ + @Test + void testMatchWithDriver_columnsAreSet() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + org.dinky.metadata.driver.Driver driver = mock(org.dinky.metadata.driver.Driver.class); + when(driver.listColumnsSortByPK("public", "orders")).thenReturn(Collections.emptyList()); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, driver); + + assertNotNull(sourceTable.getSinkTable()); + verify(driver).listColumnsSortByPK("public", "orders"); + } + + /** + * Empty sink table list -> sinkTable stays null, no exception. + */ + @Test + void testEmptySinkTableList() throws Exception { + Table sourceTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.emptyList(), sinkBuilder, null); + + assertNull(sourceTable.getSinkTable()); + } + + /** + * Multiple candidates; only the first match should be used (break after first match). + */ + @Test + void testFirstMatchWins() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable1 = new Table("orders", "schema1", null); + Table sinkTable2 = new Table("orders", "schema2", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Arrays.asList(sinkTable1, sinkTable2), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + assertEquals("schema1", sourceTable.getSinkTable().getSchema()); + } +}