Skip to content

Commit bc4a185

Browse files
Merge branch 'main' into contractServiceQueriesSimpleFeesTestRefactor
2 parents 80b8c0c + 3d9574f commit bc4a185

File tree

42 files changed

+556
-2037
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+556
-2037
lines changed

.snyk

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ exclude:
1818
- >-
1919
platform-sdk/consensus-sloth/src/testFixtures/java/org/hiero/sloth/fixtures/container/docker/DockerManager.java
2020
- >-
21-
platform-sdk/consensus-sloth/src/testFixtures/java/org/hiero/sloth/fixtures/container/docker/platform/NodeCommunicationService.java
21+
platform-sdk/consensus-sloth/src/testFixtures/java/org/hiero/sloth/fixtures/container/docker/platform/NodeCommunicationService.java
22+
- >-
23+
platform-sdk/consensus-gossip-impl/src/main/java/org/hiero/consensus/gossip/impl/network/PeerConnectionServer.java
Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.consensus.gossip.impl.gossip.sync;
33

4-
import static org.hiero.consensus.io.extendable.ExtendableInputStream.extendInputStream;
5-
64
import com.swirlds.config.api.Configuration;
75
import edu.umd.cs.findbugs.annotations.NonNull;
86
import java.io.BufferedInputStream;
@@ -11,25 +9,38 @@
119
import java.util.zip.InflaterInputStream;
1210
import org.hiero.base.io.streams.SerializableDataInputStream;
1311
import org.hiero.consensus.gossip.config.SocketConfig;
14-
import org.hiero.consensus.io.extendable.extensions.CountingStreamExtension;
15-
12+
import org.hiero.consensus.io.counting.ByteCounter;
13+
import org.hiero.consensus.io.counting.CounterType;
14+
import org.hiero.consensus.io.counting.CountingInputStream;
15+
16+
/**
17+
* A {@link SerializableDataInputStream} that counts the number of bytes read from it and optionally decompresses
18+
* the data using gzip compression.
19+
*/
1620
public class SyncInputStream extends SerializableDataInputStream {
1721

18-
private final CountingStreamExtension syncByteCounter;
22+
private final ByteCounter byteCounter;
1923

20-
private SyncInputStream(@NonNull final InputStream in, @NonNull final CountingStreamExtension syncByteCounter) {
24+
private SyncInputStream(@NonNull final InputStream in, @NonNull final ByteCounter byteCounter) {
2125
super(in);
22-
this.syncByteCounter = syncByteCounter;
26+
this.byteCounter = byteCounter;
2327
}
2428

29+
/**
30+
* Create a new {@link SyncInputStream} that optionally decompresses the data using gzip compression and
31+
* counts the number of bytes read from it.
32+
*
33+
* @param configuration the configuration to use to determine whether to use gzip compression
34+
* @param in the input stream to read from
35+
* @param bufferSize the buffer size to use when reading from the input stream
36+
* @return a new {@link SyncInputStream}
37+
*/
2538
public static SyncInputStream createSyncInputStream(
2639
@NonNull final Configuration configuration, @NonNull final InputStream in, final int bufferSize) {
2740

28-
final CountingStreamExtension syncCounter = new CountingStreamExtension();
29-
3041
final boolean compress = configuration.getConfigData(SocketConfig.class).gzipCompression();
3142

32-
final InputStream meteredStream = extendInputStream(in, syncCounter);
43+
final CountingInputStream meteredStream = new CountingInputStream(in, CounterType.THREAD_SAFE);
3344

3445
final InputStream wrappedStream;
3546
if (compress) {
@@ -38,10 +49,15 @@ public static SyncInputStream createSyncInputStream(
3849
wrappedStream = new BufferedInputStream(meteredStream, bufferSize);
3950
}
4051

41-
return new SyncInputStream(wrappedStream, syncCounter);
52+
return new SyncInputStream(wrappedStream, meteredStream.byteCounter());
4253
}
4354

44-
public CountingStreamExtension getSyncByteCounter() {
45-
return syncByteCounter;
55+
/**
56+
* Get the byte counter that counts the number of bytes read from this stream.
57+
*
58+
* @return the {@link ByteCounter}
59+
*/
60+
public ByteCounter byteCounter() {
61+
return byteCounter;
4662
}
4763
}

platform-sdk/consensus-gossip-impl/src/main/java/org/hiero/consensus/gossip/impl/gossip/sync/SyncMetrics.java

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import static com.swirlds.metrics.api.Metrics.PLATFORM_CATEGORY;
1010

1111
import com.swirlds.base.time.Time;
12-
import com.swirlds.base.units.UnitConstants;
1312
import com.swirlds.metrics.api.IntegerGauge;
1413
import com.swirlds.metrics.api.Metrics;
1514
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -25,17 +24,13 @@
2524
import org.hiero.consensus.gossip.impl.gossip.shadowgraph.ShadowgraphSynchronizer;
2625
import org.hiero.consensus.gossip.impl.gossip.shadowgraph.SyncPhase;
2726
import org.hiero.consensus.gossip.impl.gossip.shadowgraph.SyncResult;
28-
import org.hiero.consensus.gossip.impl.gossip.shadowgraph.SyncTiming;
29-
import org.hiero.consensus.gossip.impl.network.Connection;
3027
import org.hiero.consensus.gossip.impl.network.PeerInfo;
3128
import org.hiero.consensus.metrics.RunningAverageMetric;
3229
import org.hiero.consensus.metrics.extensions.CountPerSecond;
3330
import org.hiero.consensus.metrics.extensions.PhaseTimer;
3431
import org.hiero.consensus.metrics.extensions.PhaseTimerBuilder;
3532
import org.hiero.consensus.metrics.statistics.AverageAndMax;
36-
import org.hiero.consensus.metrics.statistics.AverageAndMaxTimeStat;
3733
import org.hiero.consensus.metrics.statistics.AverageStat;
38-
import org.hiero.consensus.metrics.statistics.AverageTimeStat;
3934
import org.hiero.consensus.metrics.statistics.MaxStat;
4035
import org.hiero.consensus.model.hashgraph.EventWindow;
4136
import org.hiero.consensus.model.node.NodeId;
@@ -45,11 +40,6 @@
4540
*/
4641
public class SyncMetrics {
4742

48-
private static final RunningAverageMetric.Config AVG_BYTES_PER_SEC_SYNC_CONFIG = new RunningAverageMetric.Config(
49-
PLATFORM_CATEGORY, "bytes_per_sec_sync")
50-
.withDescription("average number of bytes per second transferred during a sync");
51-
private final RunningAverageMetric avgBytesPerSecSync;
52-
5343
private static final RunningAverageMetric.Config TIPS_PER_SYNC_CONFIG = new RunningAverageMetric.Config(
5444
INTERNAL_CATEGORY, "tips_per_sync")
5545
.withDescription("the average number of tips per sync at the start of each sync")
@@ -196,12 +186,6 @@ public class SyncMetrics {
196186

197187
private final AverageStat syncIndicatorDiff;
198188
private final AverageStat eventRecRate;
199-
private final AverageTimeStat avgSyncDuration1;
200-
private final AverageTimeStat avgSyncDuration2;
201-
private final AverageTimeStat avgSyncDuration3;
202-
private final AverageTimeStat avgSyncDuration4;
203-
private final AverageTimeStat avgSyncDuration5;
204-
private final AverageAndMaxTimeStat avgSyncDuration;
205189
private final AverageStat knownSetSize;
206190
private final AverageAndMax avgEventsPerSyncSent;
207191
private final AverageAndMax avgEventsPerSyncRec;
@@ -227,7 +211,6 @@ public class SyncMetrics {
227211
public SyncMetrics(final Metrics metrics, final Time time, final List<PeerInfo> peers) {
228212
this.metrics = Objects.requireNonNull(metrics);
229213
this.time = Objects.requireNonNull(time);
230-
avgBytesPerSecSync = metrics.getOrCreate(AVG_BYTES_PER_SEC_SYNC_CONFIG);
231214
callSyncsPerSecond = new CountPerSecond(metrics, CALL_SYNCS_PER_SECOND_CONFIG);
232215
recSyncsPerSecond = new CountPerSecond(metrics, REC_SYNCS_PER_SECOND_CONFIG);
233216
tipsPerSync = metrics.getOrCreate(TIPS_PER_SYNC_CONFIG);
@@ -258,13 +241,6 @@ public SyncMetrics(final Metrics metrics, final Time time, final List<PeerInfo>
258241
broadcastDisabledDueToLag = metrics.getOrCreate(BROADCAST_DISABLED_DUE_TO_LAG_CONFIG);
259242
broadcastDisabledDueToOverload = metrics.getOrCreate(BROADCAST_DISABLED_DUE_TO_OVERLOAD_CONFIG);
260243

261-
avgSyncDuration = new AverageAndMaxTimeStat(
262-
metrics,
263-
ChronoUnit.SECONDS,
264-
INTERNAL_CATEGORY,
265-
"sec_per_sync",
266-
"duration of average successful sync (in seconds)");
267-
268244
avgEventsPerSyncSent = new AverageAndMax(
269245
metrics, PLATFORM_CATEGORY, "ev_per_syncS", "number of events sent per successful sync", FORMAT_8_1);
270246
avgEventsPerSyncRec = new AverageAndMax(
@@ -289,37 +265,6 @@ public SyncMetrics(final Metrics metrics, final Time time, final List<PeerInfo>
289265
FORMAT_8_1,
290266
AverageStat.WEIGHT_VOLATILE);
291267

292-
avgSyncDuration1 = new AverageTimeStat(
293-
metrics,
294-
ChronoUnit.SECONDS,
295-
INTERNAL_CATEGORY,
296-
"sec_per_sync1",
297-
"duration of step 1 of average successful sync (in seconds)");
298-
avgSyncDuration2 = new AverageTimeStat(
299-
metrics,
300-
ChronoUnit.SECONDS,
301-
INTERNAL_CATEGORY,
302-
"sec_per_sync2",
303-
"duration of step 2 of average successful sync (in seconds)");
304-
avgSyncDuration3 = new AverageTimeStat(
305-
metrics,
306-
ChronoUnit.SECONDS,
307-
INTERNAL_CATEGORY,
308-
"sec_per_sync3",
309-
"duration of step 3 of average successful sync (in seconds)");
310-
avgSyncDuration4 = new AverageTimeStat(
311-
metrics,
312-
ChronoUnit.SECONDS,
313-
INTERNAL_CATEGORY,
314-
"sec_per_sync4",
315-
"duration of step 4 of average successful sync (in seconds)");
316-
avgSyncDuration5 = new AverageTimeStat(
317-
metrics,
318-
ChronoUnit.SECONDS,
319-
INTERNAL_CATEGORY,
320-
"sec_per_sync5",
321-
"duration of step 5 of average successful sync (in seconds)");
322-
323268
knownSetSize = new AverageStat(
324269
metrics,
325270
PLATFORM_CATEGORY,
@@ -382,30 +327,6 @@ public void eventsReceived(final long nanosStart, final int numberReceived) {
382327
eventRecRate.update(Math.round(numberReceived / seconds));
383328
}
384329

385-
/**
386-
* Record all stats related to sync timing
387-
*
388-
* @param timing object that holds the timing data
389-
* @param conn the sync connections
390-
*/
391-
public void recordSyncTiming(final SyncTiming timing, final Connection conn) {
392-
avgSyncDuration1.update(timing.getTimePoint(0), timing.getTimePoint(1));
393-
avgSyncDuration2.update(timing.getTimePoint(1), timing.getTimePoint(2));
394-
avgSyncDuration3.update(timing.getTimePoint(2), timing.getTimePoint(3));
395-
avgSyncDuration4.update(timing.getTimePoint(3), timing.getTimePoint(4));
396-
avgSyncDuration5.update(timing.getTimePoint(4), timing.getTimePoint(5));
397-
398-
avgSyncDuration.update(timing.getTimePoint(0), timing.getTimePoint(5));
399-
final double syncDurationSec = timing.getPointDiff(5, 0) * UnitConstants.NANOSECONDS_TO_SECONDS;
400-
final double speed = Math.max(
401-
conn.getDis().getSyncByteCounter().getCount(),
402-
conn.getDos().getSyncByteCounter().getCount())
403-
/ syncDurationSec;
404-
405-
// set the bytes/sec speed of the sync currently measured
406-
avgBytesPerSecSync.update(speed);
407-
}
408-
409330
/**
410331
* Records the size of the known set during a sync. This is the most compute intensive part of the sync, so this is
411332
* useful information to validate sync performance.
Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.consensus.gossip.impl.gossip.sync;
33

4-
import static org.hiero.consensus.io.extendable.ExtendableOutputStream.extendOutputStream;
5-
64
import com.swirlds.config.api.Configuration;
75
import edu.umd.cs.findbugs.annotations.NonNull;
86
import java.io.BufferedOutputStream;
@@ -11,29 +9,38 @@
119
import java.util.zip.DeflaterOutputStream;
1210
import org.hiero.base.io.streams.SerializableDataOutputStream;
1311
import org.hiero.consensus.gossip.config.SocketConfig;
14-
import org.hiero.consensus.io.extendable.extensions.CountingStreamExtension;
12+
import org.hiero.consensus.io.counting.ByteCounter;
13+
import org.hiero.consensus.io.counting.CounterType;
14+
import org.hiero.consensus.io.counting.CountingOutputStream;
1515

16+
/**
17+
* A {@link SerializableDataOutputStream} that counts the number of bytes written to it and optionally compresses
18+
* the data using gzip compression.
19+
*/
1620
public class SyncOutputStream extends SerializableDataOutputStream {
17-
private final CountingStreamExtension syncByteCounter;
18-
private final CountingStreamExtension connectionByteCounter;
1921

20-
protected SyncOutputStream(
21-
@NonNull final OutputStream out,
22-
@NonNull final CountingStreamExtension syncByteCounter,
23-
@NonNull final CountingStreamExtension connectionByteCounter) {
22+
private final ByteCounter connectionByteCounter;
23+
24+
protected SyncOutputStream(@NonNull final OutputStream out, @NonNull final ByteCounter connectionByteCounter) {
2425
super(out);
25-
this.syncByteCounter = syncByteCounter;
2626
this.connectionByteCounter = connectionByteCounter;
2727
}
2828

29+
/**
30+
* Create a new {@link SyncOutputStream} that optionally compresses the data using gzip compression and
31+
* counts the number of bytes written to it.
32+
*
33+
* @param configuration the configuration to use to determine whether to use gzip compression
34+
* @param out the output stream to write to
35+
* @param bufferSize the buffer size to use when writing to the output stream
36+
* @return a new {@link SyncOutputStream}
37+
*/
2938
public static SyncOutputStream createSyncOutputStream(
3039
@NonNull final Configuration configuration, @NonNull final OutputStream out, final int bufferSize) {
31-
final CountingStreamExtension syncByteCounter = new CountingStreamExtension();
32-
final CountingStreamExtension connectionByteCounter = new CountingStreamExtension();
3340

3441
final boolean compress = configuration.getConfigData(SocketConfig.class).gzipCompression();
3542

36-
final OutputStream meteredStream = extendOutputStream(out, connectionByteCounter);
43+
final CountingOutputStream meteredStream = new CountingOutputStream(out, CounterType.THREAD_SAFE);
3744

3845
final OutputStream wrappedStream;
3946
if (compress) {
@@ -44,14 +51,15 @@ public static SyncOutputStream createSyncOutputStream(
4451
}
4552

4653
// we write the data to the buffer first, for efficiency
47-
return new SyncOutputStream(wrappedStream, syncByteCounter, connectionByteCounter);
48-
}
49-
50-
public CountingStreamExtension getSyncByteCounter() {
51-
return syncByteCounter;
54+
return new SyncOutputStream(wrappedStream, meteredStream.byteCounter());
5255
}
5356

54-
public CountingStreamExtension getConnectionByteCounter() {
57+
/**
58+
* Get the connection byte counter that counts the number of bytes written to this stream.
59+
*
60+
* @return the {@link ByteCounter}
61+
*/
62+
public ByteCounter connectionByteCounter() {
5563
return connectionByteCounter;
5664
}
5765
}

platform-sdk/consensus-gossip-impl/src/main/java/org/hiero/consensus/gossip/impl/network/NetworkMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void update() {
194194
for (final Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
195195
final Connection conn = iterator.next();
196196
if (conn != null) {
197-
final long bytesSent = conn.getDos().getConnectionByteCounter().getAndResetCount();
197+
final long bytesSent = conn.getDos().connectionByteCounter().getAndReset();
198198
totalBytesSent += bytesSent;
199199
final NodeId otherId = conn.getOtherId();
200200

platform-sdk/consensus-gossip-impl/src/main/java/org/hiero/consensus/gossip/impl/network/SocketConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ public void initForSync() throws IOException {
182182
}
183183

184184
/* track the number of bytes written and read during a sync */
185-
getDis().getSyncByteCounter().resetCount();
186-
getDos().getSyncByteCounter().resetCount();
185+
getDis().byteCounter().getAndReset();
187186
final SocketConfig socketConfig = configuration.getConfigData(SocketConfig.class);
188187
this.setTimeout(socketConfig.timeoutSyncClientSocket());
189188
}

platform-sdk/consensus-gossip-impl/src/test/java/org/hiero/consensus/gossip/impl/network/SocketConnectionTests.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.hiero.base.io.exceptions.BadIOException;
2424
import org.hiero.consensus.gossip.impl.gossip.sync.SyncInputStream;
2525
import org.hiero.consensus.gossip.impl.gossip.sync.SyncOutputStream;
26-
import org.hiero.consensus.io.extendable.extensions.CountingStreamExtension;
26+
import org.hiero.consensus.io.counting.ByteCounter;
2727
import org.hiero.consensus.model.node.NodeId;
2828
import org.junit.jupiter.api.BeforeEach;
2929
import org.junit.jupiter.params.ParameterizedTest;
@@ -174,7 +174,8 @@ void testConnected_SocketThrows(final boolean outbound) {
174174

175175
initConnection(outbound);
176176

177-
boolean isConnected = assertDoesNotThrow(conn::connected, "connected() should not thrown any exceptions.");
177+
final boolean isConnected =
178+
assertDoesNotThrow(conn::connected, "connected() should not thrown any exceptions.");
178179
assertFalse(isConnected, "connected() should return false when an exception occurs.");
179180
}
180181

@@ -219,10 +220,8 @@ selfId, otherId, mock(ConnectionTracker.class), outbound, socket, dis, null, con
219220
@ParameterizedTest
220221
@MethodSource("booleans")
221222
void testInitForSync(final boolean outbound) throws SocketException {
222-
CountingStreamExtension disCounter = mock(CountingStreamExtension.class);
223-
CountingStreamExtension dosCounter = mock(CountingStreamExtension.class);
224-
when(dis.getSyncByteCounter()).thenReturn(disCounter);
225-
when(dos.getSyncByteCounter()).thenReturn(dosCounter);
223+
final ByteCounter disCounter = mock(ByteCounter.class);
224+
when(dis.byteCounter()).thenReturn(disCounter);
226225

227226
// mock Connection.connected() = true
228227
when(socket.isClosed()).thenReturn(false);
@@ -232,10 +231,8 @@ void testInitForSync(final boolean outbound) throws SocketException {
232231
initConnection(outbound);
233232
assertDoesNotThrow(() -> conn.initForSync(), "An exception should not be thrown when the connection is valid.");
234233

235-
verify(dis).getSyncByteCounter();
236-
verify(dos).getSyncByteCounter();
237-
verify(disCounter).resetCount();
238-
verify(dosCounter).resetCount();
234+
verify(dis).byteCounter();
235+
verify(disCounter).getAndReset();
239236
verify(socket).setSoTimeout(anyInt());
240237
}
241238

0 commit comments

Comments
 (0)