Skip to content

Commit 9c98639

Browse files
committed
Set traceId for topic sessions
1 parent ccb2a17 commit 9c98639

File tree

6 files changed

+87
-78
lines changed

6 files changed

+87
-78
lines changed

topic/src/main/java/tech/ydb/topic/TopicRpc.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@ CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
7575
CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
7676
GrpcRequestSettings settings);
7777

78-
GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> writeSession();
78+
GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> writeSession(
79+
String traceId
80+
);
7981

80-
GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient> readSession();
82+
GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient> readSession(
83+
String traceId
84+
);
8185

8286
ScheduledExecutorService getScheduler();
8387
}

topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,21 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffse
9191
}
9292

9393
@Override
94-
public GrpcReadWriteStream<
95-
YdbTopic.StreamWriteMessage.FromServer,
96-
YdbTopic.StreamWriteMessage.FromClient
97-
> writeSession() {
98-
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(),
99-
GrpcRequestSettings.newBuilder().build());
94+
public GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient>
95+
writeSession(String streamId) {
96+
return transport.readWriteStreamCall(
97+
TopicServiceGrpc.getStreamWriteMethod(),
98+
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
99+
);
100100
}
101101

102102
@Override
103-
public GrpcReadWriteStream<
104-
YdbTopic.StreamReadMessage.FromServer,
105-
YdbTopic.StreamReadMessage.FromClient
106-
> readSession() {
107-
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(),
108-
GrpcRequestSettings.newBuilder().build());
103+
public GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient>
104+
readSession(String streamId) {
105+
return transport.readWriteStreamCall(
106+
TopicServiceGrpc.getStreamReadMethod(),
107+
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
108+
);
109109
}
110110

111111
@Override

topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
public abstract class ReadSession extends SessionBase<FromServer, FromClient> {
1616
private static final Logger logger = LoggerFactory.getLogger(ReadSession.class);
1717

18-
public ReadSession(TopicRpc rpc) {
19-
super(rpc.readSession());
18+
protected final String streamId;
19+
20+
public ReadSession(TopicRpc rpc, String streamId) {
21+
super(rpc.readSession(streamId));
22+
this.streamId = streamId;
2023
}
2124

2225
@Override

0 commit comments

Comments
 (0)