Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ public abstract class SessionBase<R, W> implements Session {

protected final GrpcReadWriteStream<R, W> streamConnection;
protected final AtomicBoolean isWorking = new AtomicBoolean(true);
protected final String streamId;
private final ReentrantLock lock = new ReentrantLock();
private String token;

public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
public SessionBase(GrpcReadWriteStream<R, W> streamConnection, String streamId) {
this.streamConnection = streamConnection;
this.streamId = streamId;
this.token = streamConnection.authToken();
}

public String getStreamId() {
return streamId;
}

protected abstract Logger getLogger();

protected abstract void sendUpdateTokenRequest(String token);
Expand All @@ -38,12 +44,12 @@ protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObser
lock.lock();

try {
getLogger().info("Session start");
getLogger().info("[{}] Session start", streamId);
return streamConnection.start(message -> {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Message received:\n{}", message);
getLogger().trace("[{}] Message received:\n{}", streamId, message);
} else {
getLogger().debug("Message received");
getLogger().debug("[{}] Message received", streamId);
}

if (isWorking.get()) {
Expand All @@ -61,21 +67,25 @@ public void send(W request) {
try {
if (!isWorking.get()) {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
getLogger().trace(
"[{}] Session is already closed. This message is NOT sent:\n{}",
streamId,
request
);
}
return;
}
String currentToken = streamConnection.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
getLogger().info("Sending new token");
getLogger().info("[{}] Sending new token", streamId);
sendUpdateTokenRequest(token);
}

if (getLogger().isTraceEnabled()) {
getLogger().trace("Sending request:\n{}", request);
getLogger().trace("[{}] Sending request:\n{}", streamId, request);
} else {
getLogger().debug("Sending request");
getLogger().debug("[{}] Sending request", streamId);
}
streamConnection.sendNext(request);
} finally {
Expand All @@ -84,7 +94,7 @@ public void send(W request) {
}

private boolean stop() {
getLogger().info("Session stop");
getLogger().info("[{}] Session stop", streamId);
return isWorking.compareAndSet(true, false);
}

Expand All @@ -93,7 +103,7 @@ public boolean shutdown() {
lock.lock();

try {
getLogger().info("Session shutdown");
getLogger().info("[{}] Session shutdown", streamId);
if (stop()) {
onStop();
streamConnection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
public abstract class ReadSession extends SessionBase<FromServer, FromClient> {
private static final Logger logger = LoggerFactory.getLogger(ReadSession.class);

protected final String streamId;

public ReadSession(TopicRpc rpc, String streamId) {
super(rpc.readSession(streamId));
this.streamId = streamId;
super(rpc.readSession(streamId), streamId);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ protected CompletableFuture<Status> sendUpdateOffsetsInTransaction(YdbTransactio
if (error != null) {
currentSession.closeDueToError(null,
new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
" with partition offsets from read session " + currentSession.streamId +
" with partition offsets from read session " + currentSession.getStreamId() +
" was not committed with reason: " + error));
} else if (!status.isSuccess()) {
currentSession.closeDueToError(null,
new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
" with partition offsets from read session " + currentSession.streamId +
" with partition offsets from read session " + currentSession.getStreamId() +
" was not committed with status: " + status));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
public abstract class WriteSession extends SessionBase<FromServer, FromClient> {
private static final Logger logger = LoggerFactory.getLogger(WriteSession.class);

protected final String streamId;

public WriteSession(TopicRpc rpc, String streamId) {
super(rpc.writeSession(streamId));
this.streamId = streamId;
super(rpc.writeSession(streamId), streamId);
}

@Override
Expand Down