Skip to content

Commit 5c3cb56

Browse files
chore(19408): track PcesWriter metrics more granularly (hiero-ledger#19458)
Signed-off-by: mxtartaglia <maxi@swirldslabs.com>
1 parent a3b7bb6 commit 5c3cb56

File tree

10 files changed

+164
-214
lines changed

10 files changed

+164
-214
lines changed

platform-sdk/base-utility/src/main/java/org/hiero/base/io/streams/SerializableDataOutputStream.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,12 @@ protected void writeClassIdVersion(@NonNull final SerializableDet serializable,
254254
* @param codec the codec to use to write the record
255255
* @param <T> the type of the record
256256
* @throws IOException thrown if any IO problems occur
257+
* @return the length in bytes that were written
257258
*/
258-
public <T> void writePbjRecord(@NonNull final T record, @NonNull final Codec<T> codec) throws IOException {
259-
writeInt(codec.measureRecord(record));
259+
public <T> long writePbjRecord(@NonNull final T record, @NonNull final Codec<T> codec) throws IOException {
260+
final int recordSize = codec.measureRecord(record);
261+
writeInt(recordSize);
260262
codec.write(record, writableSequentialData);
263+
return recordSize + Integer.BYTES;
261264
}
262265
}

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
import org.hiero.consensus.model.node.NodeId;
1212

1313
public class DefaultInlinePcesWriter implements InlinePcesWriter {
14+
1415
private final CommonPcesWriter commonPcesWriter;
1516
private final NodeId selfId;
1617
private final FileSyncOption fileSyncOption;
18+
private final PcesWriterPerEventMetrics pcesWriterPerEventMetrics;
1719

1820
/**
1921
* Constructor
@@ -33,6 +35,9 @@ public DefaultInlinePcesWriter(
3335
.getConfiguration()
3436
.getConfigData(PcesConfig.class)
3537
.inlinePcesSyncOption();
38+
39+
this.pcesWriterPerEventMetrics =
40+
new PcesWriterPerEventMetrics(platformContext.getMetrics(), platformContext.getTime());
3641
}
3742

3843
@Override
@@ -45,7 +50,9 @@ public void beginStreamingNewEvents() {
4550
*/
4651
@NonNull
4752
@Override
48-
public PlatformEvent writeEvent(@NonNull PlatformEvent event) {
53+
public PlatformEvent writeEvent(@NonNull final PlatformEvent event) {
54+
pcesWriterPerEventMetrics.startWriteEvent();
55+
4956
// if we aren't streaming new events yet, assume that the given event is already durable
5057
if (!commonPcesWriter.isStreamingNewEvents()) {
5158
return event;
@@ -58,17 +65,24 @@ public PlatformEvent writeEvent(@NonNull PlatformEvent event) {
5865

5966
try {
6067
commonPcesWriter.prepareOutputStream(event);
61-
commonPcesWriter.getCurrentMutableFile().writeEvent(event);
68+
pcesWriterPerEventMetrics.startFileWrite();
69+
final long size = commonPcesWriter.getCurrentMutableFile().writeEvent(event);
70+
pcesWriterPerEventMetrics.endFileWrite(size);
6271

6372
if (fileSyncOption == FileSyncOption.EVERY_EVENT
6473
|| (fileSyncOption == FileSyncOption.EVERY_SELF_EVENT
6574
&& event.getCreatorId().equals(selfId))) {
75+
76+
pcesWriterPerEventMetrics.startFileSync();
6677
commonPcesWriter.getCurrentMutableFile().sync();
78+
pcesWriterPerEventMetrics.endFileSync();
6779
}
68-
6980
return event;
7081
} catch (final IOException e) {
7182
throw new UncheckedIOException(e);
83+
} finally {
84+
pcesWriterPerEventMetrics.endWriteEvent();
85+
pcesWriterPerEventMetrics.clear();
7286
}
7387
}
7488

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PcesFileChannelWriter.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ public class PcesFileChannelWriter implements PcesFileWriter {
2929
private WritableSequentialData writableSequentialData;
3030
/** Tracks the size of the file in bytes */
3131
private int fileSize;
32-
/** Keeps stats of the writing process */
33-
private final PcesFileWriterStats stats;
34-
3532
/**
3633
* Create a new writer that writes events to a file using a {@link FileChannel}.
3734
*
@@ -57,7 +54,6 @@ public PcesFileChannelWriter(@NonNull final Path filePath, @NonNull final List<O
5754
this.channel = FileChannel.open(filePath, allOpenOptions.toArray(OpenOption[]::new));
5855
this.buffer = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
5956
this.writableSequentialData = BufferedData.wrap(buffer);
60-
this.stats = new PcesFileWriterStats();
6157
}
6258

6359
@Override
@@ -67,23 +63,18 @@ public void writeVersion(final int version) throws IOException {
6763
}
6864

6965
@Override
70-
public void writeEvent(@NonNull final GossipEvent event) throws IOException {
71-
long startTime = System.nanoTime();
66+
public long writeEvent(@NonNull final GossipEvent event) throws IOException {
7267
final int size = GossipEvent.PROTOBUF.measureRecord(event);
73-
boolean bufferExpanded = false;
74-
try {
75-
if (size > buffer.capacity()) {
76-
MemoryUtils.closeDirectByteBuffer(buffer);
77-
buffer = ByteBuffer.allocateDirect(size);
78-
writableSequentialData = BufferedData.wrap(buffer);
79-
bufferExpanded = true;
80-
}
81-
buffer.putInt(size);
82-
GossipEvent.PROTOBUF.write(event, writableSequentialData);
83-
flipWriteClear();
84-
} finally {
85-
stats.updateWriteStats(startTime, System.nanoTime(), size, bufferExpanded);
68+
final boolean expandBuffer = size > buffer.capacity();
69+
if (expandBuffer) {
70+
MemoryUtils.closeDirectByteBuffer(buffer);
71+
buffer = ByteBuffer.allocateDirect(size);
72+
writableSequentialData = BufferedData.wrap(buffer);
8673
}
74+
buffer.putInt(size);
75+
GossipEvent.PROTOBUF.write(event, writableSequentialData);
76+
flipWriteClear();
77+
return size;
8778
}
8879

8980
/**
@@ -94,18 +85,13 @@ public void writeEvent(@NonNull final GossipEvent event) throws IOException {
9485
private void flipWriteClear() throws IOException {
9586

9687
buffer.flip();
97-
long writeStart = System.nanoTime();
98-
try {
99-
final int bytesWritten = channel.write(buffer);
100-
fileSize += bytesWritten;
101-
if (bytesWritten != buffer.limit()) {
102-
throw new IOException(
103-
"Failed to write data to file. Wrote " + bytesWritten + " bytes out of " + buffer.limit());
104-
}
105-
buffer.clear();
106-
} finally {
107-
stats.updatePartialWriteStats(writeStart, System.nanoTime());
88+
final int bytesWritten = channel.write(buffer);
89+
fileSize += bytesWritten;
90+
if (bytesWritten != buffer.limit()) {
91+
throw new IOException(
92+
"Failed to write data to file. Wrote " + bytesWritten + " bytes out of " + buffer.limit());
10893
}
94+
buffer.clear();
10995
}
11096

11197
@Override
@@ -115,11 +101,8 @@ public void flush() throws IOException {
115101

116102
@Override
117103
public void sync() throws IOException {
118-
long startTime = System.nanoTime();
119104
// benchmarks show that this has horrible performance for the channel writer (in mac-os)
120105
channel.force(false);
121-
122-
stats.updateSyncStats(startTime, System.nanoTime());
123106
}
124107

125108
@Override
@@ -131,8 +114,4 @@ public void close() throws IOException {
131114
public long fileSize() {
132115
return fileSize;
133116
}
134-
135-
public PcesFileWriterStats getStats() {
136-
return stats;
137-
}
138117
}

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PcesFileManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ public void finishedWritingFile(@NonNull final PcesMutableFile file) {
237237
metrics.getPreconsensusEventAverageFileSpan().update(file.getSpan());
238238
metrics.getPreconsensusEventAverageUnUtilizedFileSpan().update(file.getUnUtilizedSpan());
239239
updateFileSizeMetrics();
240-
metrics.updateMetricsWithPcesFileWritingStats(file.writerStats());
241240
}
242241

243242
/**

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PcesFileWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ public interface PcesFileWriter {
2121
* Write an event to the file.
2222
*
2323
* @param event the event to write
24+
* @return the length of the written data
2425
*/
25-
void writeEvent(@NonNull final GossipEvent event) throws IOException;
26+
long writeEvent(@NonNull final GossipEvent event) throws IOException;
2627

2728
/**
2829
* Flush the file.
@@ -43,7 +44,4 @@ public interface PcesFileWriter {
4344
* @return the size of the file in bytes
4445
*/
4546
long fileSize();
46-
47-
/** Keeps stats of the writing process */
48-
PcesFileWriterStats getStats();
4947
}

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PcesFileWriterStats.java

Lines changed: 0 additions & 99 deletions
This file was deleted.

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PcesMetrics.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@
33

44
import com.swirlds.common.metrics.RunningAverageMetric;
55
import com.swirlds.common.metrics.SpeedometerMetric;
6-
import com.swirlds.metrics.api.Counter;
76
import com.swirlds.metrics.api.DoubleGauge;
87
import com.swirlds.metrics.api.LongGauge;
98
import com.swirlds.metrics.api.Metrics;
109
import edu.umd.cs.findbugs.annotations.NonNull;
11-
import java.util.Objects;
1210

1311
/**
1412
* Metrics for preconsensus events.
1513
*/
1614
public class PcesMetrics {
1715

1816
private static final String CATEGORY = "platform";
19-
private final Metrics metrics;
2017

2118
private static final LongGauge.Config PRECONSENSUS_EVENT_FILE_COUNT_CONFIG = new LongGauge.Config(
2219
CATEGORY, "preconsensusEventFileCount")
@@ -72,29 +69,12 @@ public class PcesMetrics {
7269
.withDescription("The age of the oldest preconsensus event file, in seconds.");
7370
private final LongGauge preconsensusEventFileOldestSeconds;
7471

75-
private static final RunningAverageMetric.Config AVG_EVENT_SIZE = new RunningAverageMetric.Config(
76-
CATEGORY, "pcesAvgEventSize")
77-
.withDescription("The average size of an event");
78-
private static final RunningAverageMetric.Config PCES_AVG_SYNC_DURATION = new RunningAverageMetric.Config(
79-
CATEGORY, "pcesAvgSyncDuration")
80-
.withDescription("The average duration of the sync method");
81-
private static final RunningAverageMetric.Config PCES_AVG_WRITE_DURATION = new RunningAverageMetric.Config(
82-
CATEGORY, "pcesAvgWriteDuration")
83-
.withDescription("The average duration of the write fs call");
84-
private static final RunningAverageMetric.Config PCES_AVG_TOTAL_WRITE_DURATION = new RunningAverageMetric.Config(
85-
CATEGORY, "pcesAvgTotalWriteDuration")
86-
.withDescription("The average of the total duration of the write method");
87-
private static final Counter.Config PCES_BUFFER_EXPANSIONS_COUNTER = new Counter.Config(
88-
CATEGORY, "pcesBufferExpansionCounter")
89-
.withDescription("How many times the write buffer needed to be expanded");
90-
9172
/**
9273
* Construct preconsensus event metrics.
9374
*
9475
* @param metrics the metrics manager for the platform
9576
*/
9677
public PcesMetrics(final @NonNull Metrics metrics) {
97-
this.metrics = Objects.requireNonNull(metrics);
9878
preconsensusEventFileCount = metrics.getOrCreate(PRECONSENSUS_EVENT_FILE_COUNT_CONFIG);
9979
preconsensusEventFileAverageSizeMB = metrics.getOrCreate(PRECONSENSUS_EVENT_FILE_AVERAGE_SIZE_MB_CONFIG);
10080
preconsensusEventFileTotalSizeGB = metrics.getOrCreate(PRECONSENSUS_EVENT_FILE_TOTAL_SIZE_GB_CONFIG);
@@ -106,13 +86,6 @@ public PcesMetrics(final @NonNull Metrics metrics) {
10686
preconsensusEventFileYoungestIdentifier =
10787
metrics.getOrCreate(PRECONSENSUS_EVENT_FILE_YOUNGEST_IDENTIFIER_CONFIG);
10888
preconsensusEventFileOldestSeconds = metrics.getOrCreate(PRECONSENSUS_EVENT_FILE_OLDEST_SECONDS_CONFIG);
109-
110-
// crating the metrics
111-
metrics.getOrCreate(PcesMetrics.AVG_EVENT_SIZE);
112-
metrics.getOrCreate(PcesMetrics.PCES_AVG_SYNC_DURATION);
113-
metrics.getOrCreate(PcesMetrics.PCES_AVG_WRITE_DURATION);
114-
metrics.getOrCreate(PcesMetrics.PCES_AVG_TOTAL_WRITE_DURATION);
115-
metrics.getOrCreate(PcesMetrics.PCES_BUFFER_EXPANSIONS_COUNTER);
11689
}
11790

11891
/**
@@ -177,16 +150,4 @@ public LongGauge getPreconsensusEventFileYoungestIdentifier() {
177150
public LongGauge getPreconsensusEventFileOldestSeconds() {
178151
return preconsensusEventFileOldestSeconds;
179152
}
180-
181-
/**
182-
* Updates the metrics with the stats reported by the writer
183-
*/
184-
public void updateMetricsWithPcesFileWritingStats(@NonNull final PcesFileWriterStats stats) {
185-
metrics.getOrCreate(PcesMetrics.AVG_EVENT_SIZE).update(stats.averageEventSize());
186-
metrics.getOrCreate(PcesMetrics.PCES_AVG_SYNC_DURATION).update(stats.averageSyncDuration());
187-
metrics.getOrCreate(PcesMetrics.PCES_AVG_WRITE_DURATION).update(stats.averageWriteDuration());
188-
metrics.getOrCreate(PcesMetrics.PCES_AVG_TOTAL_WRITE_DURATION).update(stats.averageTotalWriteDuration());
189-
if (stats.totalExpansions() > 0)
190-
metrics.getOrCreate(PcesMetrics.PCES_BUFFER_EXPANSIONS_COUNTER).add(stats.totalExpansions());
191-
}
192153
}

0 commit comments

Comments
 (0)