diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index cb06cc45a6d..ffeecf3f7e2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -46,6 +46,7 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader; +import org.apache.flink.cdc.connectors.mysql.source.reader.async.PartitionedDeserializationScheduler; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; @@ -127,11 +128,35 @@ public static MySqlSourceBuilder builder() { this( configFactory, deserializationSchema, - (sourceReaderMetrics, sourceConfig) -> - new MySqlRecordEmitter<>( + (sourceReaderMetrics, sourceConfig) -> { + if (!sourceConfig.isParallelDeserializeEnabled()) { + return new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges())); + sourceConfig.isIncludeSchemaChanges()); + } + final int cores = Math.max(1, Runtime.getRuntime().availableProcessors()); + int pkWorkers = + sourceConfig.getParallelDeserializePkWorkers() > 0 + ? sourceConfig.getParallelDeserializePkWorkers() + : Math.max(1, cores - 1); + int deserThreads = + sourceConfig.getParallelDeserializeThreads() > 0 + ? sourceConfig.getParallelDeserializeThreads() + : Math.max(0, pkWorkers / 2); + int queueCapacity = + Math.max(1, sourceConfig.getParallelDeserializeQueueCapacity()); + return new MySqlRecordEmitter<>( + deserializationSchema, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges(), + new PartitionedDeserializationScheduler<>( + deserializationSchema, + deserThreads, + pkWorkers, + 0, + queueCapacity)); + }); } MySqlSource( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..dea0bbba29e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -71,6 +71,12 @@ public class MySqlSourceConfig implements Serializable { public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + // ---------------- Parallel deserialize options ---------------- + private final boolean parallelDeserializeEnabled; + private final int parallelDeserializePkWorkers; + private final int parallelDeserializeThreads; + private final int parallelDeserializeQueueCapacity; + // -------------------------------------------------------------------------------------------- // Debezium Configurations // -------------------------------------------------------------------------------------------- @@ -108,7 +114,11 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean parallelDeserializeEnabled, + int parallelDeserializePkWorkers, + int parallelDeserializeThreads, + int parallelDeserializeQueueCapacity) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -152,6 +162,10 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.parallelDeserializeEnabled = parallelDeserializeEnabled; + this.parallelDeserializePkWorkers = parallelDeserializePkWorkers; + this.parallelDeserializeThreads = parallelDeserializeThreads; + this.parallelDeserializeQueueCapacity = parallelDeserializeQueueCapacity; } public String getHostname() { @@ -243,6 +257,23 @@ public boolean isAssignUnboundedChunkFirst() { return assignUnboundedChunkFirst; } + // ---------------- Parallel deserialize getters ---------------- + public boolean isParallelDeserializeEnabled() { + return parallelDeserializeEnabled; + } + + public int getParallelDeserializePkWorkers() { + return parallelDeserializePkWorkers; + } + + public int getParallelDeserializeThreads() { + return parallelDeserializeThreads; + } + + public int getParallelDeserializeQueueCapacity() { + return parallelDeserializeQueueCapacity; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..2f976c477fd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -69,6 +69,11 @@ public class MySqlSourceConfigFactory implements Serializable { private Duration heartbeatInterval = MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(); private Properties dbzProperties; private Map chunkKeyColumns = new HashMap<>(); + // ---------------- Parallel deserialize options ---------------- + private boolean parallelDeserializeEnabled = false; + private int parallelDeserializePkWorkers = 0; + private int parallelDeserializeThreads = 0; + private int parallelDeserializeQueueCapacity = 65536; private boolean skipSnapshotBackfill = false; private boolean parseOnLineSchemaChanges = false; private boolean treatTinyInt1AsBoolean = true; @@ -324,6 +329,31 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde return this; } + // ---------------- scan.parallel-deserialize.* options ---------------- + /** Enable parallel deserialization and ordered delivery to output. */ + public MySqlSourceConfigFactory parallelDeserializeEnabled(boolean enabled) { + this.parallelDeserializeEnabled = enabled; + return this; + } + + /** Number of per-key workers. 0 = auto from CPU. */ + public MySqlSourceConfigFactory parallelDeserializePkWorkers(int pkWorkers) { + this.parallelDeserializePkWorkers = pkWorkers; + return this; + } + + /** Number of shared deserialization threads. 0 = auto from CPU. */ + public MySqlSourceConfigFactory parallelDeserializeThreads(int threads) { + this.parallelDeserializeThreads = threads; + return this; + } + + /** Bounded capacity per-partition queue. */ + public MySqlSourceConfigFactory parallelDeserializeQueueCapacity(int capacity) { + this.parallelDeserializeQueueCapacity = capacity; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -421,6 +451,10 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + parallelDeserializeEnabled, + parallelDeserializePkWorkers, + parallelDeserializeThreads, + parallelDeserializeQueueCapacity); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 449e7f608f2..5ebb1d852ef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -20,6 +20,7 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.reader.async.AsyncScheduler; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; @@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; /** * The {@link RecordEmitter} implementation for {@link MySqlSourceReader}. @@ -54,6 +56,8 @@ public class MySqlRecordEmitter implements RecordEmitter outputCollector; + /** Optional: parallel scheduler; null means parallelization is disabled. */ + private final AsyncScheduler scheduler; public MySqlRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, @@ -63,6 +67,20 @@ public MySqlRecordEmitter( this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; this.outputCollector = new OutputCollector<>(); + this.scheduler = null; + } + + /** Constructor that injects a parallel scheduler. */ + public MySqlRecordEmitter( + DebeziumDeserializationSchema debeziumDeserializationSchema, + MySqlSourceReaderMetrics sourceReaderMetrics, + boolean includeSchemaChanges, + AsyncScheduler scheduler) { + this.debeziumDeserializationSchema = debeziumDeserializationSchema; + this.sourceReaderMetrics = sourceReaderMetrics; + this.includeSchemaChanges = includeSchemaChanges; + this.outputCollector = new OutputCollector<>(); + this.scheduler = scheduler; } @Override @@ -70,9 +88,39 @@ public void emitRecord( SourceRecords sourceRecords, SourceOutput output, MySqlSplitState splitState) throws Exception { final Iterator elementIterator = sourceRecords.iterator(); + if (scheduler == null || !scheduler.isEnabled()) { + while (elementIterator.hasNext()) { + processElement(elementIterator.next(), output, splitState); + } + return; + } + + // Parallel path: partitioned deserialization + source-thread replay + advance offset after + // replay + final AtomicInteger pendingPartitionTasks = new AtomicInteger(0); while (elementIterator.hasNext()) { - processElement(elementIterator.next(), output, splitState); + SourceRecord next = elementIterator.next(); + if (RecordUtils.isDataChangeRecord(next)) { + reportMetrics(next); + scheduler.schedulePartitioned(next, pendingPartitionTasks); + } else { + // Control / non-DML events: flush all enqueued DML first to preserve + // original ordering, then emit the control event inline. + scheduler.waitAndDrainAll( + output, + pendingPartitionTasks, + (offset) -> updateOffsetAfterEmit(splitState, offset)); + processElement(next, output, splitState); + } + scheduler.drainRound( + output, + (offset) -> updateOffsetAfterEmit(splitState, offset)); } + // wait until all scheduled work for this batch has been emitted + scheduler.waitAndDrainAll( + output, + pendingPartitionTasks, + (offset) -> updateOffsetAfterEmit(splitState, offset)); } protected void processElement( @@ -115,6 +163,13 @@ private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceReco } } + private static void updateOffsetAfterEmit(MySqlSplitState splitState, BinlogOffset offset) { + if (offset == null || !splitState.isBinlogSplitState()) { + return; + } + splitState.asBinlogSplitState().setStartingOffset(offset); + } + private void emitElement(SourceRecord element, SourceOutput output) throws Exception { outputCollector.output = output; outputCollector.currentMessageTimestamp = RecordUtils.getMessageTimestamp(element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/AsyncScheduler.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/AsyncScheduler.java new file mode 100644 index 00000000000..dbf2d794baf --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/AsyncScheduler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.reader.async; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Parallel scheduler interface: performs thread-safe in-source deserialization and advances offsets + * when replaying on the source thread. + */ +public interface AsyncScheduler { + + /** Whether parallelization is enabled (false = fall back to single-threaded path). */ + boolean isEnabled(); + + /** + * Schedule data-change events by primary-key partition; pendingTasks counts the number of tasks + * that are enqueued but not yet replayed. + */ + void schedulePartitioned(SourceRecord record, AtomicInteger pendingTasks); + + /** Perform one replay round on the source thread; advance offset via onAfterEmit. */ + void drainRound(SourceOutput output, Consumer onAfterEmit); + + /** Wait until all scheduled tasks have completed and been replayed. */ + void waitAndDrainAll( + SourceOutput output, AtomicInteger pendingTasks, Consumer onAfterEmit) + throws InterruptedException; +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/PartitionedDeserializationScheduler.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/PartitionedDeserializationScheduler.java new file mode 100644 index 00000000000..b842b56f81a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/async/PartitionedDeserializationScheduler.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.reader.async; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.util.Collector; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Partitioned parallel deserialization scheduler: single-reader, multi-worker; strong ordering per + * primary key; replay on the source thread; advance offsets after replay. + */ +public class PartitionedDeserializationScheduler implements AsyncScheduler { + + private static final int MAX_DRAIN_PER_PARTITION_PER_ROUND = 64; + + private final DebeziumDeserializationSchema deserializer; + private final int deserPoolSize; + private final int partitionWorkers; + private final int emitPoolSize; // reserved, currently unused + private final int queueCapacity; + + private volatile ExecutorService deserExecutor; + private volatile ExecutorService[] partitionExecutors; + private volatile BlockingQueue>[] partitionQueues; + + private final AtomicInteger globalSequence = new AtomicInteger(0); + private volatile int nextGlobalToEmit = 0; + private final java.util.concurrent.ConcurrentHashMap> globalReadyBatches = + new java.util.concurrent.ConcurrentHashMap<>(); + private final Object globalEmissionLock = new Object(); + + public PartitionedDeserializationScheduler( + DebeziumDeserializationSchema deserializer, + int deserPoolSize, + int partitionWorkers, + int emitPoolSize, + int queueCapacity) { + this.deserializer = Objects.requireNonNull(deserializer); + this.deserPoolSize = Math.max(0, deserPoolSize); + this.partitionWorkers = Math.max(0, partitionWorkers); + this.emitPoolSize = Math.max(0, emitPoolSize); + this.queueCapacity = Math.max(1, queueCapacity); + } + + @Override + public boolean isEnabled() { + return deserPoolSize > 0 || partitionWorkers > 0; + } + + @Override + public void schedulePartitioned(SourceRecord record, AtomicInteger pendingTasks) { + ensurePartitionExecutors(); + int pid = computePartitionIndex(record); + pendingTasks.incrementAndGet(); + partitionExecutors[pid].execute( + () -> { + Batch batch = deserializeToBatch(record); + BlockingQueue> q = partitionQueues[pid]; + handleEnqueue(q, batch); + pendingTasks.decrementAndGet(); + }); + } + + @Override + public void drainRound( + SourceOutput output, java.util.function.Consumer onAfterEmit) { + // Replay global batches in strict submission order + synchronized (globalEmissionLock) { + while (true) { + Batch batch = globalReadyBatches.remove(nextGlobalToEmit); + if (batch == null) break; + emitBatch(batch, output); + if (onAfterEmit != null && batch.lastOffset != null) + onAfterEmit.accept(batch.lastOffset); + nextGlobalToEmit++; + } + } + // Round-robin drain per-partition queues + if (partitionQueues != null) { + for (BlockingQueue> q : partitionQueues) { + Batch batch; + int polled = 0; + while ((batch = q.poll()) != null && polled < MAX_DRAIN_PER_PARTITION_PER_ROUND) { + emitBatch(batch, output); + if (onAfterEmit != null && batch.lastOffset != null) + onAfterEmit.accept(batch.lastOffset); + polled++; + } + } + } + } + + @Override + public void waitAndDrainAll( + SourceOutput output, + AtomicInteger pendingTasks, + java.util.function.Consumer onAfterEmit) + throws InterruptedException { + while (pendingTasks.get() > 0) { + drainRound(output, onAfterEmit); + Thread.sleep(1L); + } + drainRound(output, onAfterEmit); + } + + // ---------------- helpers ---------------- + private Batch deserializeToBatch(SourceRecord element) { + try { + // One sourceRecord may contain multiple records, need to deserialize it into a list. + final List list = new ArrayList<>(1); + Collector c = + new Collector() { + @Override + public void collect(T record) { + list.add(record); + } + + @Override + public void close() {} + }; + deserializer.deserialize(element, c); + BinlogOffset offset = RecordUtils.getBinlogPosition(element); + if (list.isEmpty()) { + return Batch.empty(offset); + } else if (list.size() == 1) { + return Batch.single(list.get(0), offset); + } else { + return Batch.multiple(list, offset); + } + } catch (Exception e) { + throw new RuntimeException("Async deserialization failed", e); + } + } + + private void emitBatch(Batch batch, SourceOutput output) { + if (batch.singleRecord != null) { + output.collect(batch.singleRecord); + return; + } + if (batch.records == null || batch.records.isEmpty()) { + return; + } + Object[] snapshot = batch.records.toArray(); + for (Object o : snapshot) { + @SuppressWarnings("unchecked") + T t = (T) o; + output.collect(t); + } + } + + private void ensureDeserExecutor() { + if (deserExecutor == null && deserPoolSize > 0) { + synchronized (this) { + if (deserExecutor == null) { + deserExecutor = + Executors.newFixedThreadPool( + deserPoolSize, new NamedThreadFactory("mysql-cdc-deser")); + } + } + } + } + + private void ensurePartitionExecutors() { + if (partitionWorkers <= 0) return; + if (partitionExecutors == null || partitionQueues == null) { + synchronized (this) { + if (partitionExecutors == null) { + partitionExecutors = new ExecutorService[partitionWorkers]; + for (int i = 0; i < partitionWorkers; i++) { + partitionExecutors[i] = + Executors.newSingleThreadExecutor( + new NamedThreadFactory("mysql-cdc-pkworker-" + i)); + } + } + if (partitionQueues == null) { + @SuppressWarnings("unchecked") + BlockingQueue>[] qs = + (BlockingQueue>[]) new BlockingQueue[partitionWorkers]; + for (int i = 0; i < partitionWorkers; i++) + qs[i] = new ArrayBlockingQueue<>(queueCapacity); + partitionQueues = qs; + } + } + } + } + + private int computePartitionIndex(SourceRecord record) { + Object key = record.key(); + int h = (key == null) ? 0 : key.hashCode(); + h ^= (h >>> 16); + int idx = h % partitionWorkers; + return idx < 0 ? -idx : idx; + } + + private void handleEnqueue(BlockingQueue> q, Batch payload) { + try { + q.put(payload); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while enqueuing partition payload", ie); + } + } + + private static final class Batch { + final X singleRecord; // fast-path for single element + final List records; // used for multi-element batches + final BinlogOffset lastOffset; + + private Batch(X singleRecord, List records, BinlogOffset lastOffset) { + this.singleRecord = singleRecord; + this.records = records; + this.lastOffset = lastOffset; + } + + static Batch single(X one, BinlogOffset offset) { + return new Batch<>(one, null, offset); + } + + static Batch multiple(List many, BinlogOffset offset) { + return new Batch<>(null, many, offset); + } + + static Batch empty(BinlogOffset offset) { + return new Batch<>(null, java.util.Collections.emptyList(), offset); + } + } + + private static final class NamedThreadFactory implements ThreadFactory { + private final String prefix; + private final AtomicInteger idx = new AtomicInteger(1); + + private NamedThreadFactory(String prefix) { + this.prefix = Objects.requireNonNull(prefix); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, prefix + '-' + idx.getAndIncrement()); + t.setDaemon(true); + return t; + } + } +}