Skip to content

Commit 71df495

Browse files
authored
Subscription: intro poll and prefetch v2 for tsfile topic (#15790)
1 parent 0927009 commit 71df495

File tree

31 files changed

+785
-284
lines changed

31 files changed

+785
-284
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,23 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
527527
}
528528

529529
@Override
530+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
531+
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
532+
setProperty(
533+
"subscription_prefetch_ts_file_batch_max_delay_in_ms",
534+
String.valueOf(subscriptionPrefetchTsFileBatchMaxDelayInMs));
535+
return this;
536+
}
537+
538+
@Override
539+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
540+
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
541+
setProperty(
542+
"subscription_prefetch_ts_file_batch_max_size_in_bytes",
543+
String.valueOf(subscriptionPrefetchTsFileBatchMaxSizeInBytes));
544+
return this;
545+
}
546+
530547
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
531548
setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
532549
return this;

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,25 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
542542
}
543543

544544
@Override
545+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
546+
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
547+
dnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
548+
subscriptionPrefetchTsFileBatchMaxDelayInMs);
549+
cnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
550+
subscriptionPrefetchTsFileBatchMaxDelayInMs);
551+
return this;
552+
}
553+
554+
@Override
555+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
556+
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
557+
dnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
558+
subscriptionPrefetchTsFileBatchMaxSizeInBytes);
559+
cnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
560+
subscriptionPrefetchTsFileBatchMaxSizeInBytes);
561+
return this;
562+
}
563+
545564
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
546565
dnConfig.setSubscriptionEnabled(subscriptionEnabled);
547566
cnConfig.setSubscriptionEnabled(subscriptionEnabled);

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,17 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
382382
}
383383

384384
@Override
385+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
386+
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
387+
return this;
388+
}
389+
390+
@Override
391+
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
392+
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
393+
return this;
394+
}
395+
385396
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
386397
return this;
387398
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ CommonConfig setPipeConnectorRequestSliceThresholdBytes(
168168

169169
CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
170170

171+
CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
172+
int subscriptionPrefetchTsFileBatchMaxDelayInMs);
173+
174+
CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
175+
int subscriptionPrefetchTsFileBatchMaxSizeInBytes);
176+
171177
CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);
172178

173179
default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ protected void setUpConfig() {
6969
sender.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
7070
receiver1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
7171
receiver2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
72+
73+
// reduce tsfile batch memory usage
74+
sender.getConfig().getCommonConfig().setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500);
75+
sender
76+
.getConfig()
77+
.getCommonConfig()
78+
.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
7279
}
7380

7481
@Override

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import java.util.Objects;
5858
import java.util.Properties;
5959
import java.util.concurrent.atomic.AtomicInteger;
60+
import java.util.stream.Collectors;
61+
import java.util.stream.Stream;
6062

6163
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
6264
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
@@ -305,11 +307,6 @@ public void consume_data(SubscriptionTreePullConsumer consumer, Session session)
305307
}
306308
}
307309

308-
public List<Integer> consume_tsfile_withFileCount(
309-
SubscriptionTreePullConsumer consumer, String device) throws InterruptedException {
310-
return consume_tsfile(consumer, Collections.singletonList(device));
311-
}
312-
313310
public int consume_tsfile(SubscriptionTreePullConsumer consumer, String device)
314311
throws InterruptedException {
315312
return consume_tsfile(consumer, Collections.singletonList(device)).get(0);
@@ -361,15 +358,6 @@ public List<Integer> consume_tsfile(SubscriptionTreePullConsumer consumer, List<
361358
return results;
362359
}
363360

364-
public void consume_data(SubscriptionTreePullConsumer consumer)
365-
throws TException,
366-
IOException,
367-
StatementExecutionException,
368-
InterruptedException,
369-
IoTDBConnectionException {
370-
consume_data(consumer, session_dest);
371-
}
372-
373361
public void consume_data_await(
374362
SubscriptionTreePullConsumer consumer,
375363
Session session,
@@ -395,18 +383,72 @@ public void consume_data_await(
395383
}
396384

397385
public void consume_tsfile_await(
386+
SubscriptionTreePullConsumer consumer,
387+
List<String> devices,
388+
List<Integer> expected,
389+
List<Boolean> allowGte) {
390+
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
391+
for (int i = 0; i < devices.size(); i++) {
392+
counters.add(new AtomicInteger(0));
393+
}
394+
AWAIT.untilAsserted(
395+
() -> {
396+
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
397+
if (messages.isEmpty()) {
398+
session_src.executeNonQueryStatement("flush");
399+
}
400+
for (final SubscriptionMessage message : messages) {
401+
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
402+
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
403+
for (int i = 0; i < devices.size(); i++) {
404+
final Path path = new Path(devices.get(i), "s_0", true);
405+
final QueryDataSet dataSet =
406+
tsFileReader.query(
407+
QueryExpression.create(Collections.singletonList(path), null));
408+
while (dataSet.hasNext()) {
409+
dataSet.next();
410+
counters.get(i).addAndGet(1);
411+
}
412+
}
413+
} catch (IOException e) {
414+
throw new RuntimeException(e);
415+
}
416+
}
417+
consumer.commitSync(messages);
418+
for (int i = 0; i < devices.size(); i++) {
419+
if (allowGte.get(i)) {
420+
assertGte(counters.get(i).get(), expected.get(i));
421+
} else {
422+
assertEquals(counters.get(i).get(), expected.get(i));
423+
}
424+
}
425+
});
426+
}
427+
428+
public void consume_tsfile_await(
429+
SubscriptionTreePullConsumer consumer, List<String> devices, List<Integer> expected) {
430+
consume_tsfile_await(
431+
consumer,
432+
devices,
433+
expected,
434+
Stream.generate(() -> false).limit(devices.size()).collect(Collectors.toList()));
435+
}
436+
437+
public void consume_tsfile_with_file_count_await(
398438
SubscriptionTreePullConsumer consumer, List<String> devices, List<Integer> expected) {
399439
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
400440
for (int i = 0; i < devices.size(); i++) {
401441
counters.add(new AtomicInteger(0));
402442
}
443+
AtomicInteger onReceived = new AtomicInteger(0);
403444
AWAIT.untilAsserted(
404445
() -> {
405446
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
406447
if (messages.isEmpty()) {
407448
session_src.executeNonQueryStatement("flush");
408449
}
409450
for (final SubscriptionMessage message : messages) {
451+
onReceived.incrementAndGet();
410452
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
411453
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
412454
for (int i = 0; i < devices.size(); i++) {
@@ -427,6 +469,7 @@ public void consume_tsfile_await(
427469
for (int i = 0; i < devices.size(); i++) {
428470
assertEquals(counters.get(i).get(), expected.get(i));
429471
}
472+
assertEquals(onReceived.get(), expected.get(devices.size()));
430473
});
431474
}
432475

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.ArrayList;
43+
import java.util.Arrays;
4344
import java.util.List;
4445

4546
/***
@@ -135,23 +136,13 @@ public void do_test()
135136
List<String> devices = new ArrayList<>(2);
136137
devices.add(device);
137138
devices.add(device2);
138-
List<Integer> results = consume_tsfile(consumer, devices);
139-
assertEquals(results.get(0), 10);
140-
assertEquals(results.get(1), 10);
139+
consume_tsfile_await(consumer, devices, Arrays.asList(10, 10));
141140
consumer.unsubscribe(topicName);
142141
assertEquals(subs.getSubscriptions(topicName).size(), 0, "unsubscribe:show subscriptions");
143142
consumer.subscribe(topicName);
144143
assertEquals(subs.getSubscriptions().size(), 1, "subscribe again:show subscriptions");
145144
insert_data(1707782400000L, device); // 2024-02-13 08:00:00+08:00
146145
insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
147-
results = consume_tsfile(consumer, devices);
148-
assertEquals(
149-
results.get(0),
150-
15,
151-
"Unsubscribing and then re-subscribing will not retain progress. Full synchronization.");
152-
assertEquals(
153-
results.get(1),
154-
15,
155-
"Unsubscribing and then re-subscribing will not retain progress. Full synchronization.");
146+
consume_tsfile_await(consumer, devices, Arrays.asList(15, 15));
156147
}
157148
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import java.io.IOException;
4343
import java.util.ArrayList;
44+
import java.util.Arrays;
45+
import java.util.Collections;
4446
import java.util.List;
4547

4648
/***
@@ -130,9 +132,8 @@ public void do_test()
130132
// insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
131133
insert_data(System.currentTimeMillis());
132134
// Consumption data
133-
List<Integer> results = consume_tsfile_withFileCount(consumer, device);
134-
assertEquals(results.get(0), 10);
135-
assertEquals(results.get(1), 2, "number of received files");
135+
consume_tsfile_with_file_count_await(
136+
consumer, Collections.singletonList(device), Arrays.asList(10, 2));
136137
// Unsubscribe
137138
consumer.unsubscribe(topicName);
138139
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after unsubscription");
@@ -142,14 +143,7 @@ public void do_test()
142143
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
143144
// Consumption data: Progress is not retained when re-subscribing after cancellation. Full
144145
// synchronization.
145-
results = consume_tsfile_withFileCount(consumer, device);
146-
assertEquals(
147-
results.get(0),
148-
15,
149-
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
150-
assertEquals(
151-
results.get(1),
152-
3,
153-
"Number of received files: After unsubscribing and resubscribing, progress is not retained. Full synchronization.");
146+
consume_tsfile_with_file_count_await(
147+
consumer, Collections.singletonList(device), Arrays.asList(15, 3));
154148
}
155149
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.io.IOException;
4444
import java.util.ArrayList;
45+
import java.util.Arrays;
4546
import java.util.Date;
4647
import java.util.List;
4748

@@ -171,10 +172,8 @@ public void do_test()
171172
paths.add(device);
172173
paths.add(device2);
173174
paths.add(database2 + ".d_2");
174-
List<Integer> rowCountList = consume_tsfile(consumer, paths);
175-
assertGte(rowCountList.get(0), 8);
176-
assertEquals(rowCountList.get(1), 0);
177-
assertEquals(rowCountList.get(2), 0);
175+
consume_tsfile_await(
176+
consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false, false));
178177

179178
// Unsubscribe
180179
consumer.unsubscribe(topicName);
@@ -188,12 +187,7 @@ public void do_test()
188187

189188
// Consumption data: Progress is not retained after unsubscribing and resubscribing. Full
190189
// synchronization.
191-
rowCountList = consume_tsfile(consumer, paths);
192-
assertGte(
193-
rowCountList.get(0),
194-
13,
195-
"Unsubscribe and then resubscribe, progress is not retained. Full synchronization.");
196-
assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + device2);
197-
assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + database2 + ".d_2");
190+
consume_tsfile_await(
191+
consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false, false));
198192
}
199193
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.io.IOException;
4444
import java.util.ArrayList;
45+
import java.util.Arrays;
4546
import java.util.Date;
4647
import java.util.List;
4748

@@ -172,11 +173,8 @@ public void do_test()
172173
paths.add(device);
173174
paths.add(device2);
174175
paths.add(database2 + ".d_2");
175-
List<Integer> rowCountList = consume_tsfile(consumer, paths);
176176
// Subscribe and write without consuming
177-
assertEquals(rowCountList.get(0), 5, "Write without consume after subscription");
178-
assertEquals(rowCountList.get(1), 0);
179-
assertEquals(rowCountList.get(2), 0);
177+
consume_tsfile_await(consumer, paths, Arrays.asList(5, 0, 0));
180178

181179
// Unsubscribe
182180
consumer.unsubscribe(topicName);
@@ -190,12 +188,6 @@ public void do_test()
190188

191189
// Consumption data: Progress is not retained after unsubscribing and re-subscribing. Full
192190
// synchronization.
193-
rowCountList = consume_tsfile(consumer, paths);
194-
assertEquals(
195-
rowCountList.get(0),
196-
10,
197-
"Unsubscribe and then resubscribe, progress is not retained. Full synchronization.");
198-
assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + device2);
199-
assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + database2 + ".d_2");
191+
consume_tsfile_await(consumer, paths, Arrays.asList(10, 0, 0));
200192
}
201193
}

0 commit comments

Comments
 (0)