Skip to content

Commit 0cbe3e1

Browse files
authored
Pipe: Add metrics for TsFile parsing to tablets (apache#16668)
1 parent 37388a1 commit 0cbe3e1

File tree

6 files changed

+162
-17
lines changed

6 files changed

+162
-17
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
2929
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
3030
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
31+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
3132
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
3233
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3334
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3435

3536
import org.apache.tsfile.read.TsFileSequenceReader;
3637
import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
3738
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
39+
import org.apache.tsfile.write.record.Tablet;
3840
import org.slf4j.Logger;
3941
import org.slf4j.LoggerFactory;
4042

@@ -60,8 +62,9 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable {
6062
protected PipeMemoryBlock allocatedMemoryBlockForModifications;
6163
protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> currentModifications;
6264

63-
protected final long initialTimeNano = System.nanoTime();
64-
protected boolean timeUsageReported = false;
65+
protected long parseStartTimeNano = -1;
66+
protected boolean parseStartTimeRecorded = false;
67+
protected boolean parseEndTimeRecorded = false;
6568

6669
protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
6770

@@ -104,21 +107,62 @@ protected TsFileInsertionEventParser(
104107
*/
105108
public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
106109

107-
@Override
108-
public void close() {
110+
/**
111+
* Record parse start time when hasNext() is called for the first time and returns true. Should be
112+
* called in Iterator.hasNext() when it's the first call.
113+
*/
114+
protected void recordParseStartTime() {
115+
if (pipeName == null || parseStartTimeRecorded) {
116+
return;
117+
}
118+
parseStartTimeNano = System.nanoTime();
119+
parseStartTimeRecorded = true;
120+
}
109121

110-
tabletInsertionIterable = null;
122+
/**
123+
* Record parse end time when hasNext() is called and returns false (last call). Should be called
124+
* in Iterator.hasNext() when it returns false.
125+
*/
126+
protected void recordParseEndTime() {
127+
if (pipeName == null || !parseStartTimeRecorded || parseEndTimeRecorded) {
128+
return;
129+
}
130+
try {
131+
final long parseEndTimeNano = System.nanoTime();
132+
final long totalTimeNanos = parseEndTimeNano - parseStartTimeNano;
133+
final String taskID = pipeName + "_" + creationTime;
134+
PipeTsFileToTabletsMetrics.getInstance().recordTsFileToTabletTime(taskID, totalTimeNanos);
135+
parseEndTimeRecorded = true;
136+
} catch (final Exception e) {
137+
LOGGER.warn("Failed to record parse end time for pipe {}", pipeName, e);
138+
}
139+
}
111140

141+
/**
142+
* Record metrics when a tablet is generated. Should be called by subclasses when generating
143+
* tablets.
144+
*
145+
* @param tablet the generated tablet
146+
*/
147+
protected void recordTabletMetrics(final Tablet tablet) {
148+
if (pipeName == null || tablet == null) {
149+
return;
150+
}
112151
try {
113-
if (pipeName != null && !timeUsageReported) {
114-
PipeTsFileToTabletsMetrics.getInstance()
115-
.recordTsFileToTabletTime(
116-
pipeName + "_" + creationTime, System.nanoTime() - initialTimeNano);
117-
timeUsageReported = true;
118-
}
152+
final String taskID = pipeName + "_" + creationTime;
153+
final long tabletMemorySize = PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
154+
PipeTsFileToTabletsMetrics.getInstance().recordTabletGenerated(taskID, tabletMemorySize);
119155
} catch (final Exception e) {
120-
LOGGER.warn("Failed to report time usage for parsing tsfile for pipe {}", pipeName, e);
156+
LOGGER.warn("Failed to record tablet metrics for pipe {}", pipeName, e);
121157
}
158+
}
159+
160+
@Override
161+
public void close() {
162+
163+
tabletInsertionIterable = null;
164+
165+
// Time recording is now handled in Iterator.hasNext(), no need to record here
122166

123167
try {
124168
if (tsFileSequenceReader != null) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,13 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
354354

355355
@Override
356356
public boolean hasNext() {
357+
boolean hasNext = false;
357358
while (tabletIterator == null || !tabletIterator.hasNext()) {
358359
if (!deviceMeasurementsMapIterator.hasNext()) {
360+
// Record end time when no more data
361+
if (parseStartTimeRecorded && !parseEndTimeRecorded) {
362+
recordParseEndTime();
363+
}
359364
close();
360365
return false;
361366
}
@@ -380,7 +385,12 @@ public boolean hasNext() {
380385
}
381386
}
382387

383-
return true;
388+
hasNext = true;
389+
// Record start time on first hasNext() that returns true
390+
if (!parseStartTimeRecorded) {
391+
recordParseStartTime();
392+
}
393+
return hasNext;
384394
}
385395

386396
@Override
@@ -391,6 +401,8 @@ public TabletInsertionEvent next() {
391401
}
392402

393403
final Tablet tablet = tabletIterator.next();
404+
// Record tablet metrics
405+
recordTabletMetrics(tablet);
394406
final boolean isAligned =
395407
deviceIsAlignedMap.getOrDefault(
396408
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,15 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
164164

165165
@Override
166166
public boolean hasNext() {
167-
return Objects.nonNull(chunkReader);
167+
final boolean hasNext = Objects.nonNull(chunkReader);
168+
if (hasNext && !parseStartTimeRecorded) {
169+
// Record start time on first hasNext() that returns true
170+
recordParseStartTime();
171+
} else if (!hasNext && parseStartTimeRecorded && !parseEndTimeRecorded) {
172+
// Record end time on last hasNext() that returns false
173+
recordParseEndTime();
174+
}
175+
return hasNext;
168176
}
169177

170178
@Override
@@ -182,6 +190,8 @@ public TabletInsertionEvent next() {
182190
// information.
183191
final boolean isAligned = currentIsAligned;
184192
final Tablet tablet = getNextTablet();
193+
// Record tablet metrics
194+
recordTabletMetrics(tablet);
185195
final boolean hasNext = hasNext();
186196
try {
187197
return sourceEvent == null

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,18 @@ && hasTablePrivilege(entry.getKey()),
163163
startTime,
164164
endTime);
165165
}
166-
if (!tabletIterator.hasNext()) {
166+
final boolean hasNext = tabletIterator.hasNext();
167+
if (hasNext && !parseStartTimeRecorded) {
168+
// Record start time on first hasNext() that returns true
169+
recordParseStartTime();
170+
} else if (!hasNext && parseStartTimeRecorded && !parseEndTimeRecorded) {
171+
// Record end time on last hasNext() that returns false
172+
recordParseEndTime();
173+
close();
174+
} else if (!hasNext) {
167175
close();
168-
return false;
169176
}
170-
return true;
177+
return hasNext;
171178
} catch (Exception e) {
172179
close();
173180
throw new PipeException("Error while parsing tsfile insertion event", e);
@@ -194,6 +201,8 @@ public TabletInsertionEvent next() {
194201
}
195202

196203
final Tablet tablet = tabletIterator.next();
204+
// Record tablet metrics
205+
recordTabletMetrics(tablet);
197206

198207
final TabletInsertionEvent next;
199208
if (!hasNext()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
2525
import org.apache.iotdb.metrics.AbstractMetricService;
2626
import org.apache.iotdb.metrics.metricsets.IMetricSet;
27+
import org.apache.iotdb.metrics.type.Counter;
2728
import org.apache.iotdb.metrics.type.Rate;
2829
import org.apache.iotdb.metrics.type.Timer;
2930
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -48,6 +49,9 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet {
4849
private final ConcurrentSkipListSet<String> pipe = new ConcurrentSkipListSet<>();
4950
private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
5051
private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
52+
private final Map<String, Counter> pipeTabletCountMap = new ConcurrentHashMap<>();
53+
private final Map<String, Counter> pipeTabletMemoryMap = new ConcurrentHashMap<>();
54+
private final Map<String, Counter> pipeParseFileCountMap = new ConcurrentHashMap<>();
5155

5256
//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////
5357

@@ -72,6 +76,27 @@ private void createMetrics(final String pipeID) {
7276
MetricLevel.IMPORTANT,
7377
Tag.NAME.toString(),
7478
pipeID));
79+
pipeTabletCountMap.putIfAbsent(
80+
pipeID,
81+
metricService.getOrCreateCounter(
82+
Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
83+
MetricLevel.IMPORTANT,
84+
Tag.NAME.toString(),
85+
pipeID));
86+
pipeTabletMemoryMap.putIfAbsent(
87+
pipeID,
88+
metricService.getOrCreateCounter(
89+
Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
90+
MetricLevel.IMPORTANT,
91+
Tag.NAME.toString(),
92+
pipeID));
93+
pipeParseFileCountMap.putIfAbsent(
94+
pipeID,
95+
metricService.getOrCreateCounter(
96+
Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
97+
MetricLevel.IMPORTANT,
98+
Tag.NAME.toString(),
99+
pipeID));
75100
}
76101

77102
@Override
@@ -98,6 +123,27 @@ private void removeMetrics(final String pipeID) {
98123
Tag.NAME.toString(),
99124
pipeID);
100125
pipeRateMap.remove(pipeID);
126+
127+
metricService.remove(
128+
MetricType.COUNTER,
129+
Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
130+
Tag.NAME.toString(),
131+
pipeID);
132+
pipeTabletCountMap.remove(pipeID);
133+
134+
metricService.remove(
135+
MetricType.COUNTER,
136+
Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
137+
Tag.NAME.toString(),
138+
pipeID);
139+
pipeTabletMemoryMap.remove(pipeID);
140+
141+
metricService.remove(
142+
MetricType.COUNTER,
143+
Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
144+
Tag.NAME.toString(),
145+
pipeID);
146+
pipeParseFileCountMap.remove(pipeID);
101147
}
102148

103149
//////////////////////////// register & deregister ////////////////////////////
@@ -151,6 +197,27 @@ public void recordTsFileToTabletTime(final String taskID, long costTimeInNanos)
151197
return;
152198
}
153199
timer.updateNanos(costTimeInNanos);
200+
// Increment file count for this pipe when parsing ends
201+
final Counter fileCount = pipeParseFileCountMap.get(taskID);
202+
if (fileCount != null) {
203+
fileCount.inc();
204+
}
205+
}
206+
207+
public void recordTabletGenerated(final String taskID, long tabletMemorySize) {
208+
if (Objects.isNull(metricService)) {
209+
return;
210+
}
211+
final Counter tabletCount = pipeTabletCountMap.get(taskID);
212+
if (tabletCount == null) {
213+
LOGGER.info("Failed to record tablet generated, pipeID({}) does not exist", taskID);
214+
return;
215+
}
216+
tabletCount.inc();
217+
final Counter tabletMemory = pipeTabletMemoryMap.get(taskID);
218+
if (tabletMemory != null) {
219+
tabletMemory.inc(tabletMemorySize);
220+
}
154221
}
155222

156223
//////////////////////////// singleton ////////////////////////////

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ public enum Metric {
198198
PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
199199
PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
200200
PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
201+
PIPE_TSFILE_TO_TABLETS_COUNT("pipe_tsfile_to_tablets_count"),
202+
PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY("pipe_tsfile_to_tablets_total_memory"),
203+
PIPE_TSFILE_PARSE_FILE_COUNT("pipe_tsfile_parse_file_count"),
201204
// subscription related
202205
SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
203206
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

0 commit comments

Comments
 (0)