Skip to content

Commit 6fa3dfb

Browse files
committed
Added new implementatin of grpc stream proxies
1 parent d4458b1 commit 6fa3dfb

File tree

3 files changed

+114
-42
lines changed

3 files changed

+114
-42
lines changed

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import tech.ydb.core.StatusCode;
2121
import tech.ydb.core.grpc.GrpcReadStream;
2222
import tech.ydb.core.grpc.GrpcRequestSettings;
23-
import tech.ydb.core.impl.call.ProxyReadStream;
2423
import tech.ydb.core.operation.StatusExtractor;
2524
import tech.ydb.core.settings.BaseRequestSettings;
2625
import tech.ydb.core.utils.URITools;
@@ -137,12 +136,27 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
137136
.setSessionId(sessionId)
138137
.build();
139138
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
140-
return new ProxyReadStream<>(rpc.attachSession(request, grpcSettings), (message, promise, observer) -> {
141-
logger.trace("session '{}' got attach stream message {}", sessionId, TextFormat.shortDebugString(message));
142-
Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList()));
143-
updateSessionState(status);
144-
observer.onNext(status);
145-
});
139+
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
140+
return new GrpcReadStream<Status>() {
141+
@Override
142+
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
143+
return origin.start(message -> {
144+
if (logger.isTraceEnabled()) {
145+
String msg = TextFormat.shortDebugString(message);
146+
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
147+
}
148+
StatusCode code = StatusCode.fromProto(message.getStatus());
149+
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
150+
updateSessionState(status);
151+
observer.onNext(status);
152+
});
153+
}
154+
155+
@Override
156+
public void cancel() {
157+
origin.cancel();
158+
}
159+
};
146160
}
147161

148162
private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) {

table/src/main/java/tech/ydb/table/Session.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import tech.ydb.core.Result;
1010
import tech.ydb.core.Status;
1111
import tech.ydb.core.grpc.GrpcReadStream;
12-
import tech.ydb.core.impl.call.ProxyReadStream;
1312
import tech.ydb.table.description.TableDescription;
1413
import tech.ydb.table.description.TableOptionDescription;
1514
import tech.ydb.table.query.DataQuery;
@@ -151,9 +150,18 @@ default CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode txMo
151150

152151
@Deprecated
153152
default GrpcReadStream<ResultSetReader> readTable(String tablePath, ReadTableSettings settings) {
154-
return new ProxyReadStream<>(executeReadTable(tablePath, settings), (part, promise, observer) -> {
155-
observer.onNext(part.getResultSetReader());
156-
});
153+
GrpcReadStream<ReadTablePart> stream = executeReadTable(tablePath, settings);
154+
return new GrpcReadStream<ResultSetReader>() {
155+
@Override
156+
public CompletableFuture<Status> start(GrpcReadStream.Observer<ResultSetReader> observer) {
157+
return stream.start(part -> observer.onNext(part.getResultSetReader()));
158+
}
159+
160+
@Override
161+
public void cancel() {
162+
stream.cancel();
163+
}
164+
};
157165
}
158166

159167
@Deprecated

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 81 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@
3131
import tech.ydb.core.grpc.GrpcReadStream;
3232
import tech.ydb.core.grpc.GrpcRequestSettings;
3333
import tech.ydb.core.grpc.YdbHeaders;
34-
import tech.ydb.core.impl.call.ProxyReadStream;
3534
import tech.ydb.core.operation.Operation;
3635
import tech.ydb.core.settings.BaseRequestSettings;
3736
import tech.ydb.core.utils.ProtobufUtils;
3837
import tech.ydb.core.utils.URITools;
3938
import tech.ydb.proto.StatusCodesProtos.StatusIds;
4039
import tech.ydb.proto.ValueProtos;
4140
import tech.ydb.proto.ValueProtos.TypedValue;
41+
import tech.ydb.proto.YdbIssueMessage;
4242
import tech.ydb.proto.common.CommonProtos;
4343
import tech.ydb.proto.scheme.SchemeOperationProtos;
4444
import tech.ydb.proto.table.YdbTable;
@@ -1205,22 +1205,22 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
12051205
}
12061206

12071207
GrpcReadStream<YdbTable.ReadTableResponse> origin = rpc.streamReadTable(request.build(), options.build());
1208-
return new ProxyReadStream<>(origin, (response, future, observer) -> {
1209-
StatusIds.StatusCode statusCode = response.getStatus();
1210-
if (statusCode == StatusIds.StatusCode.SUCCESS) {
1211-
try {
1212-
observer.onNext(new ReadTablePart(response.getResult(), response.getSnapshot()));
1213-
} catch (Throwable t) {
1214-
future.completeExceptionally(t);
1215-
origin.cancel();
1216-
}
1217-
} else {
1218-
Issue[] issues = Issue.fromPb(response.getIssuesList());
1219-
StatusCode code = StatusCode.fromProto(statusCode);
1220-
future.complete(Status.of(code, issues));
1221-
origin.cancel();
1208+
return new ProxyStream<YdbTable.ReadTableResponse, ReadTablePart>(origin) {
1209+
@Override
1210+
StatusIds.StatusCode readStatusCode(YdbTable.ReadTableResponse message) {
1211+
return message.getStatus();
12221212
}
1223-
});
1213+
1214+
@Override
1215+
List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ReadTableResponse message) {
1216+
return message.getIssuesList();
1217+
}
1218+
1219+
@Override
1220+
ReadTablePart readValue(YdbTable.ReadTableResponse message) {
1221+
return new ReadTablePart(message.getResult(), message.getSnapshot());
1222+
}
1223+
};
12241224
}
12251225

12261226
@Override
@@ -1239,22 +1239,22 @@ public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params par
12391239
}
12401240

12411241
GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts.build());
1242-
return new ProxyReadStream<>(origin, (response, future, observer) -> {
1243-
StatusIds.StatusCode statusCode = response.getStatus();
1244-
if (statusCode == StatusIds.StatusCode.SUCCESS) {
1245-
try {
1246-
observer.onNext(ProtoValueReaders.forResultSet(response.getResult().getResultSet()));
1247-
} catch (Throwable t) {
1248-
future.completeExceptionally(t);
1249-
origin.cancel();
1250-
}
1251-
} else {
1252-
Issue[] issues = Issue.fromPb(response.getIssuesList());
1253-
StatusCode code = StatusCode.fromProto(statusCode);
1254-
future.complete(Status.of(code, issues));
1255-
origin.cancel();
1242+
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ResultSetReader>(origin) {
1243+
@Override
1244+
StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) {
1245+
return message.getStatus();
12561246
}
1257-
});
1247+
1248+
@Override
1249+
List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ExecuteScanQueryPartialResponse message) {
1250+
return message.getIssuesList();
1251+
}
1252+
1253+
@Override
1254+
ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
1255+
return ProtoValueReaders.forResultSet(message.getResult().getResultSet());
1256+
}
1257+
};
12581258
}
12591259

12601260
private CompletableFuture<Status> commitTransactionInternal(String txId, CommitTxSettings settings) {
@@ -1378,6 +1378,56 @@ public String toString() {
13781378
return "Session{" + id + "}";
13791379
}
13801380

1381+
private abstract class ProxyStream<R, T> implements GrpcReadStream<T> {
1382+
private final GrpcReadStream<R> origin;
1383+
private final CompletableFuture<Status> result = new CompletableFuture<>();
1384+
1385+
ProxyStream(GrpcReadStream<R> origin) {
1386+
this.origin = origin;
1387+
}
1388+
1389+
abstract StatusIds.StatusCode readStatusCode(R message);
1390+
abstract List<YdbIssueMessage.IssueMessage> readIssues(R message);
1391+
abstract T readValue(R message);
1392+
1393+
private void onClose(Status status, Throwable th) {
1394+
if (th != null) {
1395+
updateSessionState(th, null, false);
1396+
result.completeExceptionally(th);
1397+
}
1398+
if (status != null) {
1399+
updateSessionState(null, status.getCode(), false);
1400+
result.complete(status);
1401+
}
1402+
}
1403+
1404+
@Override
1405+
public CompletableFuture<Status> start(Observer<T> observer) {
1406+
origin.start(message -> {
1407+
StatusIds.StatusCode statusCode = readStatusCode(message);
1408+
if (statusCode == StatusIds.StatusCode.SUCCESS) {
1409+
try {
1410+
observer.onNext(readValue(message));
1411+
} catch (Throwable t) {
1412+
result.completeExceptionally(t);
1413+
origin.cancel();
1414+
}
1415+
} else {
1416+
Issue[] issues = Issue.fromPb(readIssues(message));
1417+
StatusCode code = StatusCode.fromProto(statusCode);
1418+
result.complete(Status.of(code, issues));
1419+
origin.cancel();
1420+
}
1421+
}).whenComplete(this::onClose);
1422+
return result;
1423+
}
1424+
1425+
@Override
1426+
public void cancel() {
1427+
origin.cancel();
1428+
}
1429+
}
1430+
13811431
class TableTransactionImpl extends YdbTransactionImpl implements TableTransaction {
13821432

13831433
TableTransactionImpl(TxMode txMode, String txId) {

0 commit comments

Comments
 (0)