diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 50bb0fd2acd6f1..07d9acf9d3f99c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -112,7 +112,7 @@ public void run() throws JobException { } private void sendWriteRequest() throws JobException { - Backend backend = StreamingJobUtils.selectBackend(jobId); + Backend backend = StreamingJobUtils.selectBackend(); WriteRecordRequest params = buildRequestParams(); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/writeRecords") diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 2c898b04a07c37..560887d61ad1c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -183,7 +183,7 @@ public void updateOffset(Offset offset) { @Override public void fetchRemoteMeta(Map properties) throws Exception { - Backend backend = StreamingJobUtils.selectBackend(jobId); + Backend backend = StreamingJobUtils.selectBackend(); JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/fetchEndOffset") @@ -258,7 +258,7 @@ public boolean hasMoreDataToConsume() { private boolean compareOffset(Map offsetFirst, Map offsetSecond) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(jobId); + Backend backend = StreamingJobUtils.selectBackend(); CompareOffsetRequest requestParams = new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, offsetFirst, offsetSecond); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -444,7 +444,7 @@ private void saveChunkMeta(Map> tableSplits) throws } private List requestTableSplits(String table) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(jobId); + Backend backend = StreamingJobUtils.selectBackend(); FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest(getJobId(), sourceType.name(), sourceProperties, table); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -493,7 +493,7 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); - Backend backend = StreamingJobUtils.selectBackend(jobId); + Backend backend = StreamingJobUtils.selectBackend(); JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/close") diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 0281503448cd7d..4625417b67de36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -97,6 +97,8 @@ public class StreamingJobUtils { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static int lastSelectedBackendIndex = 0; + public static void createMetaTableIfNotExist() throws Exception { Optional optionalDatabase = Env.getCurrentEnv().getInternalCatalog() @@ -213,27 +215,31 @@ private static JdbcClient getJdbcClient(DataSourceType sourceType, Map backendIds; - backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1); + backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); if (backendIds.isEmpty()) { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } - // jobid % backendSize - long index = backendIds.get(jobId.intValue() % backendIds.size()); - backend = Env.getCurrentSystemInfo().getBackend(index); + backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } return backend; } + private static synchronized int getLastSelectedBackendIndexAndUpdate() { + int index = lastSelectedBackendIndex; + lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : index + 1; + return index; + } + public static List generateCreateTableCmds(String targetDb, DataSourceType sourceType, Map properties, Map targetProperties) throws JobException { diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy index fa3065849fc000..d77e2b769bb82a 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_name" def currentDb = (sql "select database()")[0][0] def table1 = "user_info_normal1" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy index e24e93b61770db..564f4d6c4abde8 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_all_type_name" def currentDb = (sql "select database()")[0][0] def table1 = "streaming_all_types_nullable_with_pk" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy index 5b210a2fd74295..2a40d2f48bc527 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_create_alter" def currentDb = (sql "select database()")[0][0] def table1 = "create_alter_user_info" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy index dba24d884c52d2..ecfd4a36cf3be4 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_name" def currentDb = (sql "select database()")[0][0] def table1 = "test_streaming_mysql_job_dup" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy index 27553e0c0a525e..b4b73536ccec4a 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_exclude_name" def currentDb = (sql "select database()")[0][0] def table1 = "user_info_exclude1" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index aae1bc581341d7..b6a0926265a70e 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -19,7 +19,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def tableName = "test_streaming_mysql_job_priv_tbl" def jobName = "test_streaming_mysql_job_priv_name" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy index eb0e4866143ed4..8ece9f4ba74d34 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql") { +suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") { def jobName = "test_streaming_mysql_job_restart_fe" def options = new ClusterOptions() options.setFeNum(1) diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy index 2a82b3a5777d09..7fe8cb73daaec2 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_postgres_job", "p0,external,pg,external_docker,external_docker_pg") { +suite("test_streaming_postgres_job", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { def jobName = "test_streaming_postgres_job_name" def currentDb = (sql "select database()")[0][0] def table1 = "user_info_pg_normal1" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy index dcd688bb94fc85..541941b816cf74 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy @@ -20,7 +20,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,external_docker_pg") { +suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { def jobName = "test_streaming_postgres_job_all_type_name" def currentDb = (sql "select database()")[0][0] def table1 = "streaming_all_types_nullable_with_pk_pg" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy index cae745d06e6835..1bec6cd3a2591f 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_streaming_postgres_job_dup", "p0,external,pg,external_docker,external_docker_pg") { +suite("test_streaming_postgres_job_dup", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { def jobName = "test_streaming_postgres_job_dup_name" def currentDb = (sql "select database()")[0][0] def table1 = "test_streaming_postgres_job_dup" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 6b70301e43dcd9..7b114b2ca97e7b 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -19,7 +19,7 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,external_docker_pg") { +suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { def tableName = "test_streaming_postgres_job_priv_tbl" def jobName = "test_streaming_postgres_job_priv_name" def currentDb = (sql "select database()")[0][0]