Skip to content

Commit a768b88

Browse files
committed
Added support of topic messages batching
1 parent 9268e24 commit a768b88

File tree

4 files changed

+60
-9
lines changed

4 files changed

+60
-9
lines changed

topic/src/main/java/tech/ydb/topic/read/impl/Batch.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,12 @@ public boolean isDecompressed() {
4747
public void setDecompressed(boolean decompressed) {
4848
this.decompressed = decompressed;
4949
}
50+
51+
long getFirstCommitOffsetFrom() {
52+
return messages.get(0).getCommitOffsetFrom();
53+
}
54+
55+
long getLastOffset() {
56+
return messages.get(messages.size() - 1).getOffset();
57+
}
5058
}

topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class PartitionSessionImpl {
4545
private final PartitionSession sessionInfo;
4646
private final Executor decompressionExecutor;
4747
private final AtomicBoolean isWorking = new AtomicBoolean(true);
48+
private final int maxBatchSize;
4849

4950
private final Queue<Batch> decodingBatches = new LinkedList<>();
5051
private final ReentrantLock decodingBatchesLock = new ReentrantLock();
@@ -60,6 +61,7 @@ public class PartitionSessionImpl {
6061

6162
private PartitionSessionImpl(Builder builder) {
6263
this.id = builder.id;
64+
this.maxBatchSize = builder.maxBatchSize;
6365
this.fullId = builder.fullId;
6466
this.topicPath = builder.topicPath;
6567
this.consumerName = builder.consumerName;
@@ -308,21 +310,43 @@ private void sendDataToReadersIfNeeded() {
308310
return;
309311
}
310312
if (isReadingNow.compareAndSet(false, true)) {
311-
Batch batchToRead = readingQueue.poll();
312-
if (batchToRead == null) {
313+
List<Batch> batchesToRead = new ArrayList<>();
314+
315+
Batch next = readingQueue.poll();
316+
if (next == null) {
313317
isReadingNow.set(false);
314318
return;
315319
}
320+
321+
batchesToRead.add(next);
322+
List<Message> messagesToRead = new ArrayList<>(next.getMessages());
323+
long commitFrom = next.getFirstCommitOffsetFrom();
324+
long commitTo = next.getLastOffset() + 1;
325+
326+
int batchSize = messagesToRead.size();
327+
while (maxBatchSize <= 0 || batchSize < maxBatchSize) {
328+
next = readingQueue.peek();
329+
if (next == null) {
330+
break;
331+
}
332+
if (maxBatchSize > 0 && next.getMessages().size() + batchSize > maxBatchSize) {
333+
break;
334+
}
335+
336+
next = readingQueue.poll();
337+
338+
batchesToRead.add(next);
339+
messagesToRead.addAll(next.getMessages());
340+
batchSize += next.getMessages().size();
341+
commitTo = next.getLastOffset() + 1;
342+
}
343+
316344
// Should be called maximum in 1 thread at a time
317-
List<MessageImpl> messageImplList = batchToRead.getMessages();
318-
List<Message> messagesToRead = new ArrayList<>(messageImplList);
319-
OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(),
320-
messageImplList.get(messageImplList.size() - 1).getOffset() + 1);
345+
OffsetsRange offsetsToCommit = new OffsetsRangeImpl(commitFrom, commitTo);
321346
DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit);
322347
if (logger.isDebugEnabled()) {
323348
logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " +
324-
"to be called...", fullId, messagesToRead.size(), messagesToRead.get(0).getOffset(),
325-
messagesToRead.get(messagesToRead.size() - 1).getOffset());
349+
"to be called...", fullId, messagesToRead.size(), commitFrom, commitTo);
326350
}
327351
dataEventCallback.apply(event)
328352
.whenComplete((res, th) -> {
@@ -338,7 +362,7 @@ private void sendDataToReadersIfNeeded() {
338362
messagesToRead.get(messagesToRead.size() - 1).getOffset());
339363
}
340364
isReadingNow.set(false);
341-
batchToRead.complete();
365+
batchesToRead.forEach(Batch::complete);
342366
sendDataToReadersIfNeeded();
343367
});
344368
} else {
@@ -376,6 +400,7 @@ public void shutdown() {
376400
*/
377401
public static class Builder {
378402
private long id;
403+
private int maxBatchSize;
379404
private String fullId;
380405
private String topicPath;
381406
private String consumerName;
@@ -391,6 +416,11 @@ public Builder setId(long id) {
391416
return this;
392417
}
393418

419+
public Builder setMaxBatchSize(int maxBatchSize) {
420+
this.maxBatchSize = maxBatchSize;
421+
return this;
422+
}
423+
394424
public Builder setFullId(String fullId) {
395425
this.fullId = fullId;
396426
return this;

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
402402

403403
PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder()
404404
.setId(partitionSessionId)
405+
.setMaxBatchSize(settings.getMaxBatchSize())
405406
.setFullId(partitionSessionFullId)
406407
.setTopicPath(request.getPartitionSession().getPath())
407408
.setConsumerName(consumerName)

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class ReaderSettings {
2222
private final String readerName;
2323
private final List<TopicReadSettings> topics;
2424
private final long maxMemoryUsageBytes;
25+
private final int maxBatchSize;
2526
private final Executor decompressionExecutor;
2627
private final BiConsumer<Status, Throwable> errorsHandler;
2728

@@ -31,6 +32,7 @@ private ReaderSettings(Builder builder) {
3132
this.readerName = builder.readerName;
3233
this.topics = ImmutableList.copyOf(builder.topics);
3334
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
35+
this.maxBatchSize = builder.maxBatchSize;
3436
this.decompressionExecutor = builder.decompressionExecutor;
3537
this.errorsHandler = builder.errorsHandler;
3638
}
@@ -60,6 +62,10 @@ public long getMaxMemoryUsageBytes() {
6062
return maxMemoryUsageBytes;
6163
}
6264

65+
public int getMaxBatchSize() {
66+
return maxBatchSize;
67+
}
68+
6369
public Executor getDecompressionExecutor() {
6470
return decompressionExecutor;
6571
}
@@ -78,6 +84,7 @@ public static class Builder {
7884
private String readerName = null;
7985
private List<TopicReadSettings> topics = new ArrayList<>();
8086
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
87+
private int maxBatchSize = 0;
8188
private Executor decompressionExecutor = null;
8289
private BiConsumer<Status, Throwable> errorsHandler = null;
8390

@@ -133,6 +140,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
133140
return this;
134141
}
135142

143+
public Builder setMaxBatchSize(int maxBatchSize) {
144+
this.maxBatchSize = maxBatchSize;
145+
return this;
146+
}
147+
136148
public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
137149
this.errorsHandler = handler;
138150
return this;

0 commit comments

Comments
 (0)