Skip to content

Commit 7ddce6a

Browse files
authored
Refactor thread mode and batch memory control (#1142)
* Refactor thread mode and batch memory control * Update docs * update pulsar version * optimize list
1 parent 1a6a0cd commit 7ddce6a

14 files changed

+290
-374
lines changed

docs/aws-s3-sink.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
146146
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
147147
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
148148
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
149-
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
150149
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
151150
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
152151
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |

docs/azure-blob-storage-sink.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
127127
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
128128
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
129129
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
130-
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
131130
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
132131
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
133132
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |

docs/google-cloud-storage-sink.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
132132
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
133133
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
134134
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
135-
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
136135
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
137136
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
138137
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
<jackson.version>2.13.2</jackson.version>
4949
<jackson-databind.version>2.13.4.2</jackson-databind.version>
5050
<lombok.version>1.18.32</lombok.version>
51-
<pulsar.version>3.2.2.1</pulsar.version>
51+
<pulsar.version>4.0.0.8</pulsar.version>
5252
<avro.version>1.11.4</avro.version>
5353
<hadoop.version>3.3.6</hadoop.version>
5454
<parquet.version>1.13.1</parquet.version>

src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ public class BlobStoreAbstractConfig implements Serializable {
105105
private int batchSize = 10;
106106
private long batchTimeMs = 1000;
107107
private BatchModel batchModel = BatchModel.BLEND;
108+
@Deprecated // never to use
108109
private int pendingQueueSize = -1;
110+
@Deprecated // never to use
111+
private String partitioner;
109112

110113
// #### metadata configuration ####
111114
private boolean withMetadata;
@@ -176,13 +179,6 @@ public void validate() {
176179
+ "when formatType is 'json'.");
177180
}
178181

179-
if (pendingQueueSize <= 0) {
180-
pendingQueueSize = batchSize;
181-
}
182-
checkArgument(pendingQueueSize > 0, "pendingQueueSize must be a positive integer.");
183-
checkArgument(pendingQueueSize >= batchSize, "pendingQueueSize must be larger than or "
184-
+ "equal to batchSize");
185-
186182
if (avroCodec != null && (avroCodec.isEmpty() || avroCodec.equals("none"))) {
187183
avroCodec = null;
188184
}

src/main/java/org/apache/pulsar/io/jcloud/batch/BatchContainer.java

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919
package org.apache.pulsar.io.jcloud.batch;
2020

21-
import com.google.common.collect.Lists;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.LinkedList;
2224
import java.util.List;
23-
import java.util.concurrent.ArrayBlockingQueue;
2425
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.locks.Condition;
27+
import java.util.concurrent.locks.ReentrantLock;
2528
import org.apache.pulsar.client.api.schema.GenericRecord;
2629
import org.apache.pulsar.functions.api.Record;
2730

@@ -37,21 +40,35 @@ public class BatchContainer {
3740
private final long maxBatchTimeMs;
3841
private final AtomicLong currentBatchSize = new AtomicLong(0L);
3942
private final AtomicLong currentBatchBytes = new AtomicLong(0L);
40-
private final ArrayBlockingQueue<Record<GenericRecord>> pendingFlushQueue;
41-
private volatile long lastPoolRecordsTime;
43+
private volatile long lastPollRecordsTime;
44+
private final List<Record<GenericRecord>> pendingFlushList;
45+
private final ReentrantLock lock = new ReentrantLock();
46+
private final Condition notFull = lock.newCondition();
4247

43-
public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs, int maxPendingQueueSize) {
48+
public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs) {
4449
this.maxBatchSize = maxBatchSize;
4550
this.maxBatchBytes = maxBatchBytes;
4651
this.maxBatchTimeMs = maxBatchTimeMs;
47-
this.pendingFlushQueue = new ArrayBlockingQueue<>(maxPendingQueueSize);
48-
this.lastPoolRecordsTime = System.currentTimeMillis();
52+
this.lastPollRecordsTime = System.currentTimeMillis();
53+
this.pendingFlushList = new LinkedList<>();
4954
}
5055

5156
public void add(Record<GenericRecord> record) throws InterruptedException {
52-
pendingFlushQueue.put(record);
53-
updateCurrentBatchSize(1);
54-
updateCurrentBatchBytes(record.getMessage().get().size());
57+
lock.lock();
58+
try {
59+
// Allow exceeding the maximum value once
60+
long recordSize = record.getMessage().get().size();
61+
pendingFlushList.add(record);
62+
currentBatchSize.incrementAndGet();
63+
currentBatchBytes.addAndGet(recordSize);
64+
65+
// Wait if the batch needs to be flushed
66+
while (needFlush()) {
67+
notFull.await();
68+
}
69+
} finally {
70+
lock.unlock();
71+
}
5572
}
5673

5774
public long getCurrentBatchSize() {
@@ -62,42 +79,32 @@ public long getCurrentBatchBytes() {
6279
return currentBatchBytes.get();
6380
}
6481

65-
public void updateCurrentBatchSize(long delta) {
66-
currentBatchSize.addAndGet(delta);
67-
}
68-
69-
public void updateCurrentBatchBytes(long delta) {
70-
currentBatchBytes.addAndGet(delta);
71-
}
72-
73-
public boolean isEmpty() {
74-
return pendingFlushQueue.isEmpty();
82+
public List<Record<GenericRecord>> pollNeedFlushRecords() {
83+
if (currentBatchSize.get() == 0) {
84+
return Collections.emptyList();
85+
}
86+
lock.lock();
87+
try {
88+
if (!needFlush()) {
89+
return Collections.emptyList();
90+
}
91+
List<Record<GenericRecord>> needFlushRecords = new ArrayList<>(pendingFlushList);
92+
pendingFlushList.clear();
93+
// Clear the pending list
94+
currentBatchSize.set(0);
95+
currentBatchBytes.set(0);
96+
lastPollRecordsTime = System.currentTimeMillis();
97+
return needFlushRecords;
98+
} finally {
99+
notFull.signalAll();
100+
lock.unlock();
101+
}
75102
}
76103

77-
public boolean needFlush() {
104+
private boolean needFlush() {
78105
long currentTime = System.currentTimeMillis();
79106
return currentBatchSize.get() >= maxBatchSize
80107
|| currentBatchBytes.get() >= maxBatchBytes
81-
|| (currentTime - lastPoolRecordsTime) >= maxBatchTimeMs;
82-
}
83-
84-
public List<Record<GenericRecord>> pollNeedFlushRecords() {
85-
final List<Record<GenericRecord>> needFlushRecords = Lists.newArrayList();
86-
long recordsToInsertBytes = 0;
87-
while (!pendingFlushQueue.isEmpty() && needFlushRecords.size() < maxBatchSize
88-
&& recordsToInsertBytes < maxBatchBytes) {
89-
Record<GenericRecord> r = pendingFlushQueue.poll();
90-
if (r != null) {
91-
if (r.getMessage().isPresent()) {
92-
long recordBytes = r.getMessage().get().size();
93-
recordsToInsertBytes += recordBytes;
94-
}
95-
needFlushRecords.add(r);
96-
}
97-
}
98-
updateCurrentBatchBytes(-1 * recordsToInsertBytes);
99-
updateCurrentBatchSize(-1 * needFlushRecords.size());
100-
lastPoolRecordsTime = System.currentTimeMillis();
101-
return needFlushRecords;
108+
|| (currentTime - lastPollRecordsTime) >= maxBatchTimeMs;
102109
}
103110
}

0 commit comments

Comments
 (0)