Skip to content

Commit f266018

Browse files
committed
last
1 parent 85092b2 commit f266018

File tree

3 files changed

+11
-8
lines changed

3 files changed

+11
-8
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,8 @@ public void testSourcePermission() {
393393
receiverEnv,
394394
"select count(*) from root.vehicle.**",
395395
"count(root.vehicle.car.temperature),",
396-
Collections.singleton("1,"));
396+
Collections.singleton("1,"),
397+
20);
397398

398399
// test showing pipe
399400
// Create another pipe, user is root

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,7 @@ private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWri
131131
true);
132132
}
133133

134-
if (LOGGER.isDebugEnabled()) {
135-
LOGGER.debug("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent);
136-
}
134+
LOGGER.info("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent);
137135
}
138136

139137
private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent)

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public abstract class IoTDBNonDataRegionSource extends IoTDBSource {
6868
// the extractor is closed and then be reused by processor.
6969
protected final AtomicBoolean hasBeenClosed = new AtomicBoolean(false);
7070

71+
protected PipeWritePlanEvent lastEvent = null;
72+
7173
protected abstract AbstractPipeListeningQueue getListeningQueue();
7274

7375
@Override
@@ -163,7 +165,7 @@ public EnrichedEvent supply() throws Exception {
163165
}
164166

165167
// Check whether snapshot being parsed exists
166-
PipeWritePlanEvent realtimeEvent = null;
168+
PipeWritePlanEvent realtimeEvent = lastEvent;
167169
if (hasNextEventInCurrentSnapshot()) {
168170
realtimeEvent = getNextEventInCurrentSnapshot();
169171
}
@@ -178,7 +180,7 @@ public EnrichedEvent supply() throws Exception {
178180
pipeName,
179181
creationTime,
180182
pipeTaskMeta,
181-
(TreePattern) treePattern,
183+
treePattern,
182184
tablePattern,
183185
userId,
184186
userName,
@@ -211,17 +213,17 @@ public EnrichedEvent supply() throws Exception {
211213

212214
// Realtime
213215
if (Objects.isNull(realtimeEvent)) {
214-
realtimeEvent = (PipeWritePlanEvent) iterator.peek(getMaxBlockingTimeMs());
216+
realtimeEvent = (PipeWritePlanEvent) iterator.next(getMaxBlockingTimeMs());
215217
}
216218
if (Objects.isNull(realtimeEvent)) {
217219
return null;
218220
}
221+
lastEvent = realtimeEvent;
219222

220223
realtimeEvent =
221224
trimRealtimeEventByPipePattern(realtimeEvent)
222225
.flatMap(this::trimRealtimeEventByPrivilege)
223226
.orElse(null);
224-
iterator.next(0);
225227

226228
if (Objects.isNull(realtimeEvent)
227229
|| !isTypeListened(realtimeEvent)
@@ -232,6 +234,7 @@ public EnrichedEvent supply() throws Exception {
232234
event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1));
233235
}
234236
event.increaseReferenceCount(IoTDBNonDataRegionSource.class.getName());
237+
lastEvent = null;
235238
return event;
236239
}
237240

@@ -253,6 +256,7 @@ public EnrichedEvent supply() throws Exception {
253256
realtimeEvent.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1));
254257
}
255258
realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionSource.class.getName());
259+
lastEvent = null;
256260
return realtimeEvent;
257261
}
258262

0 commit comments

Comments
 (0)