Skip to content

Commit 67a694c

Browse files
feat: create bidirectional stream & renaming server stream (#197)
* feat: create bidirectional stream & renaming server stream * fix linter
1 parent dddea79 commit 67a694c

File tree

5 files changed

+91
-27
lines changed

5 files changed

+91
-27
lines changed

src/Ydb.Sdk/src/Client/Response.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public abstract class StreamResponse<TProtoResponse, TResponse>
6363
where TProtoResponse : class
6464
where TResponse : class
6565
{
66-
private readonly Driver.StreamIterator<TProtoResponse> _iterator;
66+
private readonly Driver.ServerStream<TProtoResponse> _iterator;
6767
private TResponse? _response;
6868
private bool _transportError;
6969

70-
internal StreamResponse(Driver.StreamIterator<TProtoResponse> iterator)
70+
internal StreamResponse(Driver.ServerStream<TProtoResponse> iterator)
7171
{
7272
_iterator = iterator;
7373
}

src/Ydb.Sdk/src/Driver.cs

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ internal async Task<TResponse> UnaryCall<TRequest, TResponse>(
150150
}
151151
}
152152

153-
internal StreamIterator<TResponse> StreamCall<TRequest, TResponse>(
153+
internal ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
154154
Method<TRequest, TResponse> method,
155155
TRequest request,
156156
GrpcRequestSettings settings)
@@ -160,22 +160,30 @@ internal StreamIterator<TResponse> StreamCall<TRequest, TResponse>(
160160
var (endpoint, channel) = GetChannel(settings.NodeId);
161161
var callInvoker = channel.CreateCallInvoker();
162162

163-
try
164-
{
165-
var call = callInvoker.AsyncServerStreamingCall(
166-
method: method,
167-
host: null,
168-
options: GetCallOptions(settings, true),
169-
request: request);
163+
var call = callInvoker.AsyncServerStreamingCall(
164+
method: method,
165+
host: null,
166+
options: GetCallOptions(settings, true),
167+
request: request);
170168

171-
return new StreamIterator<TResponse>(call, () => { PessimizeEndpoint(endpoint); });
172-
}
173-
catch (RpcException e)
174-
{
175-
PessimizeEndpoint(endpoint);
169+
return new ServerStream<TResponse>(call, () => { PessimizeEndpoint(endpoint); });
170+
}
176171

177-
throw new TransportException(e);
178-
}
172+
internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
173+
Method<TRequest, TResponse> method,
174+
GrpcRequestSettings settings)
175+
where TRequest : class
176+
where TResponse : class
177+
{
178+
var (endpoint, channel) = GetChannel(settings.NodeId);
179+
var callInvoker = channel.CreateCallInvoker();
180+
181+
var call = callInvoker.AsyncDuplexStreamingCall(
182+
method: method,
183+
host: null,
184+
options: GetCallOptions(settings, true));
185+
186+
return new BidirectionalStream<TRequest, TResponse>(call, () => { PessimizeEndpoint(endpoint); });
179187
}
180188

181189
private (string, GrpcChannel) GetChannel(long nodeId)
@@ -319,12 +327,12 @@ private CallOptions GetCallOptions(GrpcRequestSettings settings, bool streaming)
319327
return options;
320328
}
321329

322-
internal sealed class StreamIterator<TResponse> : IAsyncEnumerator<TResponse>, IAsyncEnumerable<TResponse>
330+
internal sealed class ServerStream<TResponse> : IAsyncEnumerator<TResponse>, IAsyncEnumerable<TResponse>
323331
{
324332
private readonly AsyncServerStreamingCall<TResponse> _responseStream;
325333
private readonly Action _rpcErrorAction;
326334

327-
internal StreamIterator(AsyncServerStreamingCall<TResponse> responseStream, Action rpcErrorAction)
335+
internal ServerStream(AsyncServerStreamingCall<TResponse> responseStream, Action rpcErrorAction)
328336
{
329337
_responseStream = responseStream;
330338
_rpcErrorAction = rpcErrorAction;
@@ -359,6 +367,62 @@ public async ValueTask<bool> MoveNextAsync()
359367
}
360368
}
361369

370+
internal sealed class BidirectionalStream<TRequest, TResponse> : IAsyncEnumerator<TResponse>,
371+
IAsyncEnumerable<TResponse>
372+
{
373+
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _bidirectionalStream;
374+
private readonly Action _rpcErrorAction;
375+
376+
public BidirectionalStream(AsyncDuplexStreamingCall<TRequest, TResponse> bidirectionalStream,
377+
Action rpcErrorAction)
378+
{
379+
_bidirectionalStream = bidirectionalStream;
380+
_rpcErrorAction = rpcErrorAction;
381+
}
382+
383+
public async Task Write(TRequest request)
384+
{
385+
try
386+
{
387+
await _bidirectionalStream.RequestStream.WriteAsync(request);
388+
}
389+
catch (RpcException e)
390+
{
391+
_rpcErrorAction();
392+
393+
throw new TransportException(e);
394+
}
395+
}
396+
397+
public ValueTask DisposeAsync()
398+
{
399+
_bidirectionalStream.Dispose();
400+
401+
return default;
402+
}
403+
404+
public async ValueTask<bool> MoveNextAsync()
405+
{
406+
try
407+
{
408+
return await _bidirectionalStream.ResponseStream.MoveNext(CancellationToken.None);
409+
}
410+
catch (RpcException e)
411+
{
412+
_rpcErrorAction();
413+
414+
throw new TransportException(e);
415+
}
416+
}
417+
418+
public TResponse Current => _bidirectionalStream.ResponseStream.Current;
419+
420+
public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new())
421+
{
422+
return this;
423+
}
424+
}
425+
362426
public class InitializationFailureException : Exception
363427
{
364428
internal InitializationFailureException(string message) : base(message)

src/Ydb.Sdk/src/Services/Query/SessionPool.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ protected override async Task<Session> CreateSession()
4747
{
4848
try
4949
{
50-
await using var stream = _driver.StreamCall(QueryService.AttachSessionMethod, new AttachSessionRequest
51-
{ SessionId = session.SessionId }, AttachSessionSettings);
50+
await using var stream = _driver.ServerStreamCall(QueryService.AttachSessionMethod,
51+
new AttachSessionRequest { SessionId = session.SessionId }, AttachSessionSettings);
5252

5353
if (!await stream.MoveNextAsync())
5454
{
@@ -140,7 +140,7 @@ internal Session(Driver driver, SessionPool<Session> sessionPool, string session
140140
_driver = driver;
141141
}
142142

143-
internal Driver.StreamIterator<ExecuteQueryResponsePart> ExecuteQuery(
143+
internal Driver.ServerStream<ExecuteQueryResponsePart> ExecuteQuery(
144144
string query,
145145
Dictionary<string, YdbValue>? parameters,
146146
ExecuteQuerySettings? settings,
@@ -161,7 +161,7 @@ internal Driver.StreamIterator<ExecuteQueryResponsePart> ExecuteQuery(
161161

162162
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
163163

164-
return _driver.StreamCall(QueryService.ExecuteQueryMethod, request, settings);
164+
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
165165
}
166166

167167
internal async Task<Status> CommitTransaction(string txId, GrpcRequestSettings? settings = null)

src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal static ResultData FromProto(ExecuteScanQueryPartialResult resultProto)
3434

3535
public class ExecuteScanQueryStream : StreamResponse<ExecuteScanQueryPartialResponse, ExecuteScanQueryPart>
3636
{
37-
internal ExecuteScanQueryStream(Driver.StreamIterator<ExecuteScanQueryPartialResponse> iterator)
37+
internal ExecuteScanQueryStream(Driver.ServerStream<ExecuteScanQueryPartialResponse> iterator)
3838
: base(iterator)
3939
{
4040
}
@@ -75,7 +75,7 @@ public ExecuteScanQueryStream ExecuteScanQuery(
7575

7676
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
7777

78-
var streamIterator = _driver.StreamCall(
78+
var streamIterator = _driver.ServerStreamCall(
7979
method: TableService.StreamExecuteScanQueryMethod,
8080
request: request,
8181
settings: settings

src/Ydb.Sdk/src/Services/Table/ReadTable.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ internal static ResultData FromProto(ReadTableResult resultProto)
3939

4040
public class ReadTableStream : StreamResponse<ReadTableResponse, ReadTablePart>
4141
{
42-
internal ReadTableStream(Driver.StreamIterator<ReadTableResponse> iterator)
42+
internal ReadTableStream(Driver.ServerStream<ReadTableResponse> iterator)
4343
: base(iterator)
4444
{
4545
}
@@ -74,7 +74,7 @@ public ReadTableStream ReadTable(string tablePath, ReadTableSettings? settings =
7474
Ordered = settings.Ordered
7575
};
7676

77-
var streamIterator = _driver.StreamCall(
77+
var streamIterator = _driver.ServerStreamCall(
7878
method: TableService.StreamReadTableMethod,
7979
request: request,
8080
settings: settings

0 commit comments

Comments
 (0)