@@ -20,14 +20,20 @@ public abstract class SessionBase<R, W> implements Session {
2020
2121 protected final GrpcReadWriteStream <R , W > streamConnection ;
2222 protected final AtomicBoolean isWorking = new AtomicBoolean (true );
23+ protected final String streamId ;
2324 private final ReentrantLock lock = new ReentrantLock ();
2425 private String token ;
2526
26- public SessionBase (GrpcReadWriteStream <R , W > streamConnection ) {
27+ public SessionBase (GrpcReadWriteStream <R , W > streamConnection , String streamId ) {
2728 this .streamConnection = streamConnection ;
29+ this .streamId = streamId ;
2830 this .token = streamConnection .authToken ();
2931 }
3032
33+ public String getStreamId () {
34+ return streamId ;
35+ }
36+
3137 protected abstract Logger getLogger ();
3238
3339 protected abstract void sendUpdateTokenRequest (String token );
@@ -38,12 +44,12 @@ protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObser
3844 lock .lock ();
3945
4046 try {
41- getLogger ().info ("Session start" );
47+ getLogger ().info ("[{}] Session start" , streamId );
4248 return streamConnection .start (message -> {
4349 if (getLogger ().isTraceEnabled ()) {
44- getLogger ().trace ("Message received:\n {}" , message );
50+ getLogger ().trace ("[{}] Message received:\n {}" , streamId , message );
4551 } else {
46- getLogger ().debug ("Message received" );
52+ getLogger ().debug ("[{}] Message received" , streamId );
4753 }
4854
4955 if (isWorking .get ()) {
@@ -61,21 +67,25 @@ public void send(W request) {
6167 try {
6268 if (!isWorking .get ()) {
6369 if (getLogger ().isTraceEnabled ()) {
64- getLogger ().trace ("Session is already closed. This message is NOT sent:\n {}" , request );
70+ getLogger ().trace (
71+ "[{}] Session is already closed. This message is NOT sent:\n {}" ,
72+ streamId ,
73+ request
74+ );
6575 }
6676 return ;
6777 }
6878 String currentToken = streamConnection .authToken ();
6979 if (!Objects .equals (token , currentToken )) {
7080 token = currentToken ;
71- getLogger ().info ("Sending new token" );
81+ getLogger ().info ("[{}] Sending new token" , streamId );
7282 sendUpdateTokenRequest (token );
7383 }
7484
7585 if (getLogger ().isTraceEnabled ()) {
76- getLogger ().trace ("Sending request:\n {}" , request );
86+ getLogger ().trace ("[{}] Sending request:\n {}" , streamId , request );
7787 } else {
78- getLogger ().debug ("Sending request" );
88+ getLogger ().debug ("[{}] Sending request" , streamId );
7989 }
8090 streamConnection .sendNext (request );
8191 } finally {
@@ -84,7 +94,7 @@ public void send(W request) {
8494 }
8595
8696 private boolean stop () {
87- getLogger ().info ("Session stop" );
97+ getLogger ().info ("[{}] Session stop" , streamId );
8898 return isWorking .compareAndSet (true , false );
8999 }
90100
@@ -93,7 +103,7 @@ public boolean shutdown() {
93103 lock .lock ();
94104
95105 try {
96- getLogger ().info ("Session shutdown" );
106+ getLogger ().info ("[{}] Session shutdown" , streamId );
97107 if (stop ()) {
98108 onStop ();
99109 streamConnection .close ();
0 commit comments