Skip to content

Commit 082a710

Browse files
author
Igor Melnichenko
committed
Stream ID was attached to SessionBase logs
1 parent 3306904 commit 082a710

File tree

4 files changed

+24
-20
lines changed

4 files changed

+24
-20
lines changed

topic/src/main/java/tech/ydb/topic/impl/SessionBase.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,20 @@ public abstract class SessionBase<R, W> implements Session {
2020

2121
protected final GrpcReadWriteStream<R, W> streamConnection;
2222
protected final AtomicBoolean isWorking = new AtomicBoolean(true);
23+
protected final String streamId;
2324
private final ReentrantLock lock = new ReentrantLock();
2425
private String token;
2526

26-
public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
27+
public SessionBase(GrpcReadWriteStream<R, W> streamConnection, String streamId) {
2728
this.streamConnection = streamConnection;
29+
this.streamId = streamId;
2830
this.token = streamConnection.authToken();
2931
}
3032

33+
public String getStreamId() {
34+
return streamId;
35+
}
36+
3137
protected abstract Logger getLogger();
3238

3339
protected abstract void sendUpdateTokenRequest(String token);
@@ -38,12 +44,12 @@ protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObser
3844
lock.lock();
3945

4046
try {
41-
getLogger().info("Session start");
47+
getLogger().info("[{}] Session start", streamId);
4248
return streamConnection.start(message -> {
4349
if (getLogger().isTraceEnabled()) {
44-
getLogger().trace("Message received:\n{}", message);
50+
getLogger().trace("[{}] Message received:\n{}", streamId, message);
4551
} else {
46-
getLogger().debug("Message received");
52+
getLogger().debug("[{}] Message received", streamId);
4753
}
4854

4955
if (isWorking.get()) {
@@ -61,21 +67,25 @@ public void send(W request) {
6167
try {
6268
if (!isWorking.get()) {
6369
if (getLogger().isTraceEnabled()) {
64-
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
70+
getLogger().trace(
71+
"[{}] Session is already closed. This message is NOT sent:\n{}",
72+
streamId,
73+
request
74+
);
6575
}
6676
return;
6777
}
6878
String currentToken = streamConnection.authToken();
6979
if (!Objects.equals(token, currentToken)) {
7080
token = currentToken;
71-
getLogger().info("Sending new token");
81+
getLogger().info("[{}] Sending new token", streamId);
7282
sendUpdateTokenRequest(token);
7383
}
7484

7585
if (getLogger().isTraceEnabled()) {
76-
getLogger().trace("Sending request:\n{}", request);
86+
getLogger().trace("[{}] Sending request:\n{}", streamId, request);
7787
} else {
78-
getLogger().debug("Sending request");
88+
getLogger().debug("[{}] Sending request", streamId);
7989
}
8090
streamConnection.sendNext(request);
8191
} finally {
@@ -84,7 +94,7 @@ public void send(W request) {
8494
}
8595

8696
private boolean stop() {
87-
getLogger().info("Session stop");
97+
getLogger().info("[{}] Session stop", streamId);
8898
return isWorking.compareAndSet(true, false);
8999
}
90100

@@ -93,7 +103,7 @@ public boolean shutdown() {
93103
lock.lock();
94104

95105
try {
96-
getLogger().info("Session shutdown");
106+
getLogger().info("[{}] Session shutdown", streamId);
97107
if (stop()) {
98108
onStop();
99109
streamConnection.close();

topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
public abstract class ReadSession extends SessionBase<FromServer, FromClient> {
1616
private static final Logger logger = LoggerFactory.getLogger(ReadSession.class);
1717

18-
protected final String streamId;
19-
2018
public ReadSession(TopicRpc rpc, String streamId) {
21-
super(rpc.readSession(streamId));
22-
this.streamId = streamId;
19+
super(rpc.readSession(streamId), streamId);
2320
}
2421

2522
@Override

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,12 @@ protected CompletableFuture<Status> sendUpdateOffsetsInTransaction(YdbTransactio
171171
if (error != null) {
172172
currentSession.closeDueToError(null,
173173
new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
174-
" with partition offsets from read session " + currentSession.streamId +
174+
" with partition offsets from read session " + currentSession.getStreamId() +
175175
" was not committed with reason: " + error));
176176
} else if (!status.isSuccess()) {
177177
currentSession.closeDueToError(null,
178178
new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
179-
" with partition offsets from read session " + currentSession.streamId +
179+
" with partition offsets from read session " + currentSession.getStreamId() +
180180
" was not committed with status: " + status));
181181
}
182182
});

topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414
public abstract class WriteSession extends SessionBase<FromServer, FromClient> {
1515
private static final Logger logger = LoggerFactory.getLogger(WriteSession.class);
1616

17-
protected final String streamId;
18-
1917
public WriteSession(TopicRpc rpc, String streamId) {
20-
super(rpc.writeSession(streamId));
21-
this.streamId = streamId;
18+
super(rpc.writeSession(streamId), streamId);
2219
}
2320

2421
@Override

0 commit comments

Comments
 (0)