Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void run() throws JobException {
}

private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend(jobId);
Backend backend = StreamingJobUtils.selectBackend();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/writeRecords")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void updateOffset(Offset offset) {

@Override
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
Backend backend = StreamingJobUtils.selectBackend(jobId);
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
Expand Down Expand Up @@ -258,7 +258,7 @@ public boolean hasMoreDataToConsume() {

private boolean compareOffset(Map<String, String> offsetFirst, Map<String, String> offsetSecond)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend(jobId);
Backend backend = StreamingJobUtils.selectBackend();
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, offsetFirst, offsetSecond);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down Expand Up @@ -444,7 +444,7 @@ private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits) throws
}

private List<SnapshotSplit> requestTableSplits(String table) throws JobException {
Backend backend = StreamingJobUtils.selectBackend(jobId);
Backend backend = StreamingJobUtils.selectBackend();
FetchTableSplitsRequest requestParams =
new FetchTableSplitsRequest(getJobId(), sourceType.name(), sourceProperties, table);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down Expand Up @@ -493,7 +493,7 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend(jobId);
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class StreamingJobUtils {

private static final ObjectMapper objectMapper = new ObjectMapper();

private static int lastSelectedBackendIndex = 0;

public static void createMetaTableIfNotExist() throws Exception {
Optional<Database> optionalDatabase =
Env.getCurrentEnv().getInternalCatalog()
Expand Down Expand Up @@ -213,27 +215,31 @@ private static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, S
return JdbcClient.createJdbcClient(config);
}

public static Backend selectBackend(Long jobId) throws JobException {
public static Backend selectBackend() throws JobException {
Backend backend = null;
BeSelectionPolicy policy = null;

policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy = new BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();

List<Long> backendIds;
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
// jobid % backendSize
long index = backendIds.get(jobId.intValue() % backendIds.size());
backend = Env.getCurrentSystemInfo().getBackend(index);
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
return backend;
}

private static synchronized int getLastSelectedBackendIndexAndUpdate() {
int index = lastSelectedBackendIndex;
lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : index + 1;
return index;
}

public static List<CreateTableCommand> generateCreateTableCmds(String targetDb, DataSourceType sourceType,
Map<String, String> properties, Map<String, String> targetProperties)
throws JobException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_normal1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_all_type_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "streaming_all_types_nullable_with_pk"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_create_alter"
def currentDb = (sql "select database()")[0][0]
def table1 = "create_alter_user_info"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_mysql_job_dup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_exclude_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_exclude1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def tableName = "test_streaming_mysql_job_priv_tbl"
def jobName = "test_streaming_mysql_job_priv_name"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql") {
suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_restart_fe"
def options = new ClusterOptions()
options.setFeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_postgres_job", "p0,external,pg,external_docker,external_docker_pg") {
suite("test_streaming_postgres_job", "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_pg_normal1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,external_docker_pg") {
suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_all_type_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "streaming_all_types_nullable_with_pk_pg"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_streaming_postgres_job_dup", "p0,external,pg,external_docker,external_docker_pg") {
suite("test_streaming_postgres_job_dup", "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def jobName = "test_streaming_postgres_job_dup_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_postgres_job_dup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,external_docker_pg") {
suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
def tableName = "test_streaming_postgres_job_priv_tbl"
def jobName = "test_streaming_postgres_job_priv_name"
def currentDb = (sql "select database()")[0][0]
Expand Down
Loading