Skip to content

Commit 23d00ab

Browse files
authored
Subscription: poll heartbeat event when there are no consumers (#15307) (#15325)
1 parent 9af414b commit 23d00ab

File tree

5 files changed

+54
-1
lines changed

5 files changed

+54
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@ public Event waitedPoll() {
161161
return event;
162162
}
163163

164+
@Override
165+
public Event peek() {
166+
final Event event = pendingQueue.peek();
167+
if (Objects.nonNull(event)) {
168+
return event;
169+
}
170+
return tsfileInsertEventDeque.peek();
171+
}
172+
164173
@Override
165174
public void clear() {
166175
super.clear();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public SubscriptionBlockingPendingQueue(
3333

3434
public abstract Event waitedPoll();
3535

36+
public abstract Event peek();
37+
38+
public abstract void directOffer(final Event event);
39+
3640
public int size() {
3741
return inputPendingQueue.size();
3842
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
2626
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
2727
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
28+
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
2829
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
2930
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
3031
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -269,6 +270,7 @@ public boolean executePrefetch() {
269270
committedCleaner, pollableNacker, responsePrefetcher, responseSerializer);
270271
return true;
271272
} else {
273+
peekOnce();
272274
remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
273275
return false;
274276
}
@@ -311,6 +313,30 @@ public void prefetchEvent(@NonNull final SubscriptionEvent thisEvent) {
311313
prefetchingQueue.add(thisEvent);
312314
}
313315

316+
private synchronized void peekOnce() {
317+
final Event peekedEvent = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.peek());
318+
if (Objects.isNull(peekedEvent)) {
319+
return;
320+
}
321+
322+
if (!(peekedEvent instanceof PipeHeartbeatEvent)) {
323+
return;
324+
}
325+
326+
final Event polledEvent = inputPendingQueue.waitedPoll();
327+
if (!Objects.equals(peekedEvent, polledEvent)) {
328+
LOGGER.warn(
329+
"Subscription: inconsistent heartbeat event when {} peeking (broken invariant), expected {}, actual {}, offer back",
330+
this,
331+
peekedEvent,
332+
polledEvent);
333+
inputPendingQueue.directOffer(polledEvent);
334+
} else {
335+
((PipeHeartbeatEvent) peekedEvent)
336+
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
337+
}
338+
}
339+
314340
/**
315341
* Prefetch at most one {@link SubscriptionEvent} from {@link
316342
* SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
@@ -319,7 +345,7 @@ public void prefetchEvent(@NonNull final SubscriptionEvent thisEvent) {
319345
* <p>It will continuously attempt to prefetch and generate a {@link SubscriptionEvent} until
320346
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
321347
*/
322-
private void tryPrefetch() {
348+
private synchronized void tryPrefetch() {
323349
while (!inputPendingQueue.isEmpty()) {
324350
final Event event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
325351
if (Objects.isNull(event)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ public Event waitedPoll() {
5858
return filter(inputPendingQueue.waitedPoll());
5959
}
6060

61+
@Override
62+
public Event peek() {
63+
return inputPendingQueue.peek();
64+
}
65+
66+
@Override
67+
public void directOffer(final Event event) {
68+
inputPendingQueue.directOffer(event);
69+
}
70+
6171
private synchronized Event filter(final Event event) { // make it synchronized
6272
if (Objects.isNull(event)) {
6373
return null;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public E waitedPoll() {
107107
return event;
108108
}
109109

110+
public E peek() {
111+
return pendingQueue.peek();
112+
}
113+
110114
public void clear() {
111115
isClosed.set(true);
112116
pendingQueue.clear();

0 commit comments

Comments
 (0)