Skip to content

Commit 897c775

Browse files
committed
support statistics reporting for topic writes
1 parent 6e8133b commit 897c775

File tree

2 files changed

+81
-18
lines changed

2 files changed

+81
-18
lines changed
Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
package tech.ydb.topic.write;
22

3+
import java.time.Duration;
4+
5+
import tech.ydb.proto.topic.YdbTopic;
6+
37
/**
48
* @author Nikolay Perfilov
59
*/
610
public class WriteAck {
711
private final long seqNo;
812
private final State state;
913
private final Details details;
14+
private final Statistics statistics;
1015

11-
public WriteAck(long seqNo, State state, Details details) {
16+
public WriteAck(long seqNo, State state, Details details, Statistics statistics) {
1217
this.seqNo = seqNo;
1318
this.state = state;
1419
this.details = details;
20+
this.statistics = statistics;
1521
}
1622

1723
public enum State {
@@ -20,18 +26,6 @@ public enum State {
2026
WRITTEN_IN_TX
2127
}
2228

23-
public static class Details {
24-
private final long offset;
25-
26-
public Details(long offset) {
27-
this.offset = offset;
28-
}
29-
30-
public long getOffset() {
31-
return offset;
32-
}
33-
}
34-
3529
public long getSeqNo() {
3630
return seqNo;
3731
}
@@ -47,4 +41,68 @@ public State getState() {
4741
public Details getDetails() {
4842
return details;
4943
}
44+
45+
/**
46+
* Obtain message write statistics
47+
* @return {@link Statistics} with timings if statistics are available or null otherwise
48+
*/
49+
public Statistics getStatistics() {
50+
return statistics;
51+
}
52+
53+
private static Duration convert(com.google.protobuf.Duration d) {
54+
if (d == null) {
55+
return Duration.ZERO;
56+
}
57+
return Duration.ofSeconds(d.getSeconds(), d.getNanos());
58+
}
59+
60+
public static class Details {
61+
private final long offset;
62+
63+
public Details(long offset) {
64+
this.offset = offset;
65+
}
66+
67+
public long getOffset() {
68+
return offset;
69+
}
70+
}
71+
72+
public static class Statistics {
73+
private final Duration persistingTime;
74+
private final Duration partitionQuotaWaitTime;
75+
private final Duration topicQuotaWaitTime;
76+
private final Duration maxQueueWaitTime;
77+
private final Duration minQueueWaitTime;
78+
79+
public Statistics(YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src) {
80+
this.persistingTime = convert(src.getPersistingTime());
81+
this.partitionQuotaWaitTime = convert(src.getPartitionQuotaWaitTime());
82+
this.topicQuotaWaitTime = convert(src.getTopicQuotaWaitTime());
83+
this.maxQueueWaitTime = convert(src.getMaxQueueWaitTime());
84+
this.minQueueWaitTime = convert(src.getMinQueueWaitTime());
85+
}
86+
87+
public Duration getPersistingTime() {
88+
return persistingTime;
89+
}
90+
91+
public Duration getPartitionQuotaWaitTime() {
92+
return partitionQuotaWaitTime;
93+
}
94+
95+
public Duration getTopicQuotaWaitTime() {
96+
return topicQuotaWaitTime;
97+
}
98+
99+
public Duration getMaxQueueWaitTime() {
100+
return maxQueueWaitTime;
101+
}
102+
103+
public Duration getMinQueueWaitTime() {
104+
return minQueueWaitTime;
105+
}
106+
}
107+
50108
}

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,10 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
421421
private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
422422
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = response.getAcksList();
423423
logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size());
424+
WriteAck.Statistics statistics = null;
425+
if (response.getWriteStatistics() != null) {
426+
statistics = new WriteAck.Statistics(response.getWriteStatistics());
427+
}
424428
int inFlightFreed = 0;
425429
long bytesFreed = 0;
426430
for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
@@ -433,7 +437,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
433437
inFlightFreed++;
434438
bytesFreed += sentMessage.getSize();
435439
sentMessages.remove();
436-
processWriteAck(sentMessage, ack);
440+
processWriteAck(sentMessage, statistics, ack);
437441
break;
438442
}
439443
if (sentMessage.getSeqNo() < ack.getSeqNo()) {
@@ -474,20 +478,20 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
474478
}
475479
}
476480

477-
private void processWriteAck(EnqueuedMessage message,
481+
private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics,
478482
YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
479483
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(),
480484
ack.getMessageWriteStatusCase());
481485
WriteAck resultAck;
482486
switch (ack.getMessageWriteStatusCase()) {
483487
case WRITTEN:
484488
WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
485-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
489+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics);
486490
break;
487491
case SKIPPED:
488492
switch (ack.getSkipped().getReason()) {
489493
case REASON_ALREADY_WRITTEN:
490-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
494+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics);
491495
break;
492496
case REASON_UNSPECIFIED:
493497
default:
@@ -497,7 +501,7 @@ private void processWriteAck(EnqueuedMessage message,
497501
}
498502
break;
499503
case WRITTEN_IN_TX:
500-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null);
504+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics);
501505
break;
502506
default:
503507
message.getFuture().completeExceptionally(
@@ -519,5 +523,6 @@ private void closeDueToError(Status status, Throwable th) {
519523
protected void onStop() {
520524
logger.debug("[{}] Session {} onStop called", streamId, sessionId);
521525
}
526+
522527
}
523528
}

0 commit comments

Comments
 (0)