Skip to content

Commit ec42229

Browse files
fixes
1 parent 43a758e commit ec42229

File tree

9 files changed

+165
-84
lines changed

9 files changed

+165
-84
lines changed

src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ internal static class IssueMessageUtils
77
{
88
internal static string IssuesToString(this IReadOnlyList<IssueMessage> issues) => IssuesToString(issues, 0, 4);
99

10-
private static string IssuesToString(IEnumerable<IssueMessage> issueMessages, int currentIndent, int indent)
11-
{
12-
var sb = new StringBuilder();
13-
14-
foreach (var message in issueMessages)
10+
private static string IssuesToString(IEnumerable<IssueMessage> issueMessages, int currentIndent, int indent) =>
11+
string.Join(Environment.NewLine, issueMessages.Select(message =>
1512
{
13+
var sb = new StringBuilder();
1614
sb.Append(' ', currentIndent);
1715
sb.Append($"[{message.IssueCode}] ");
1816

@@ -23,35 +21,38 @@ private static string IssuesToString(IEnumerable<IssueMessage> issueMessages, in
2321

2422
sb.Append($"{message.Severity.SeverityToString()}: ");
2523
sb.Append(message.Message);
26-
sb.Append(IssuesToString(message.Issues, currentIndent + indent, indent));
27-
sb.Append(Environment.NewLine);
28-
}
2924

30-
return sb.ToString();
31-
}
25+
if (message.Issues.Count > 0)
26+
{
27+
sb.AppendLine();
28+
sb.Append(IssuesToString(message.Issues, currentIndent + indent, indent));
29+
}
30+
31+
return sb.ToString();
32+
}));
3233

3334
private static string SeverityToString(this uint severity) => severity switch
3435
{
3536
0 => "Fatal",
3637
1 => "Error",
3738
2 => "Warning",
3839
3 => "Info",
39-
_ => $"Unknown severity {severity}"
40+
_ => $"Unknown SeverityCode {severity}"
4041
};
4142

4243
private static string PositionToString(this IssueMessage.Types.Position position)
4344
{
4445
var sb = new StringBuilder();
4546
sb.Append('(');
4647

47-
if (position.File != null)
48+
if (!string.IsNullOrEmpty(position.File))
4849
{
4950
sb.Append(position.File);
5051
sb.Append(':');
5152
}
5253

5354
sb.Append($"{position.Row}:{position.Column}");
54-
sb.Append(')');
55+
sb.Append(") ");
5556
return sb.ToString();
5657
}
5758
}

src/Ydb.Sdk/src/Pool/SessionPool.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ internal abstract class SessionPool<TSession> where TSession : SessionBase<TSess
99
{
1010
private readonly SemaphoreSlim _semaphore;
1111
private readonly ConcurrentQueue<TSession> _idleSessions = new();
12-
private readonly int _createSessionTimeoutMs;
12+
private readonly int _createSessionTimeout;
1313
private readonly int _size;
1414

1515
protected readonly SessionPoolConfig Config;
@@ -23,7 +23,7 @@ protected SessionPool(ILogger<SessionPool<TSession>> logger, SessionPoolConfig c
2323
Logger = logger;
2424
Config = config;
2525
_size = config.MaxSessionPool;
26-
_createSessionTimeoutMs = config.CreateSessionTimeout * 1000;
26+
_createSessionTimeout = config.CreateSessionTimeout;
2727
_semaphore = new SemaphoreSlim(_size);
2828
}
2929

@@ -32,9 +32,9 @@ internal async Task<TSession> GetSession(CancellationToken cancellationToken = d
3232
try
3333
{
3434
using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
35-
if (_createSessionTimeoutMs > 0)
35+
if (_createSessionTimeout > 0)
3636
{
37-
ctsGetSession.CancelAfter(_createSessionTimeoutMs);
37+
ctsGetSession.CancelAfter(TimeSpan.FromSeconds(_createSessionTimeout));
3838
}
3939

4040
var finalCancellationToken = ctsGetSession.Token;
@@ -76,7 +76,7 @@ internal async Task<TSession> GetSession(CancellationToken cancellationToken = d
7676
throw new YdbException(StatusCode.Cancelled,
7777
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
7878
$"(currently {_size}) or 'CreateSessionTimeout' " +
79-
$"(currently {_createSessionTimeoutMs} seconds) in your connection string.", e
79+
$"(currently {_createSessionTimeout} seconds) in your connection string.", e
8080
);
8181
}
8282
}

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ public WriterException(string message) : base(message)
66
{
77
}
88

9-
public WriterException(string message, Status status) : base(message + ": " + status)
10-
{
11-
}
12-
139
public WriterException(string message, Exception inner) : base(message, inner)
1410
{
1511
}
@@ -21,10 +17,6 @@ public ReaderException(string message) : base(message)
2117
{
2218
}
2319

24-
public ReaderException(string message, Status status) : base(message + ": " + status)
25-
{
26-
}
27-
2820
public ReaderException(string message, Exception inner) : base(message, inner)
2921
{
3022
}

src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
3838
{
3939
if (!IsActive)
4040
{
41-
message = default;
41+
message = null;
4242
return false;
4343
}
4444

4545
var index = _startMessageDataIndex++;
4646
var messageData = _batch.MessageData[index];
47-
_readerSession.TryReadRequestBytes(Utils
48-
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index));
47+
_ = _readerSession.TryReadRequestBytes(
48+
Utils.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index));
4949

5050
TValue value;
5151
try
@@ -82,7 +82,7 @@ internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages<TValue> ba
8282
{
8383
if (!IsActive)
8484
{
85-
batchMessages = default;
85+
batchMessages = null;
8686
return false;
8787
}
8888

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System.Threading.Channels;
33
using Google.Protobuf.WellKnownTypes;
44
using Microsoft.Extensions.Logging;
5+
using Ydb.Sdk.Ado;
6+
using Ydb.Sdk.Ado.Internal;
57
using Ydb.Topic;
68
using Ydb.Topic.V1;
79
using static Ydb.Topic.StreamReadMessage.Types.FromServer;
@@ -157,21 +159,23 @@ private async Task Initialize()
157159

158160
var receivedInitMessage = stream.Current;
159161

160-
var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues);
161-
162-
if (status.IsNotSuccess)
162+
if (receivedInitMessage.Status.IsNotSuccess())
163163
{
164-
if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None)
164+
var statusCode = receivedInitMessage.Status.Code();
165+
var statusMessage = statusCode.ToMessage(receivedInitMessage.Issues);
166+
167+
if (RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy != RetryPolicy.None)
165168
{
166-
_logger.LogError("Reader initialization failed to start. Reason: {Status}", status);
169+
_logger.LogError("Reader initialization failed to start. Reason: {Status}", statusMessage);
167170

168171
_ = Task.Run(Initialize, _disposeCts.Token);
169172
}
170173
else
171174
{
172-
_logger.LogCritical("Reader initialization failed to start. Reason: {Status}", status);
175+
_logger.LogCritical("Reader initialization failed to start. Reason: {Status}", statusMessage);
173176

174-
_receivedMessagesChannel.Writer.Complete(new ReaderException("Initialization failed", status));
177+
_receivedMessagesChannel.Writer.Complete(
178+
new ReaderException($"Initialization failed! Reason: {statusMessage}"));
175179
}
176180

177181
return;
@@ -300,13 +304,11 @@ private async Task RunProcessingStreamResponse()
300304
{
301305
var messageFromServer = Stream.Current;
302306

303-
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
304-
305-
if (status.IsNotSuccess)
307+
if (messageFromServer.Status.IsNotSuccess())
306308
{
307309
Logger.LogError(
308310
"ReaderSession[{SessionId}] received unsuccessful status while processing readAck: {Status}",
309-
SessionId, status);
311+
SessionId, messageFromServer.Status.Code().ToMessage(messageFromServer.Issues));
310312
return;
311313
}
312314

@@ -370,7 +372,9 @@ private async Task RunProcessingStreamRequest()
370372
}
371373
}
372374

373-
internal async void TryReadRequestBytes(long bytes)
375+
// Avoid using 'async' for method with the 'void' return type or catch all exceptions in it:
376+
// any exceptions unhandled by the method might lead to the process crash
377+
internal async Task TryReadRequestBytes(long bytes)
374378
{
375379
var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes);
376380

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ protected TopicSession(
2929

3030
public bool IsActive => Volatile.Read(ref _isActive) == 1;
3131

32-
protected async void ReconnectSession()
32+
protected void ReconnectSession()
3333
{
3434
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
3535
{
@@ -40,7 +40,7 @@ protected async void ReconnectSession()
4040

4141
Logger.LogDebug("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
4242

43-
await _initialize();
43+
_ = Task.Run(() => _initialize());
4444
}
4545

4646
protected async Task SendMessage(TFromClient fromClient)

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Google.Protobuf;
33
using Google.Protobuf.WellKnownTypes;
44
using Microsoft.Extensions.Logging;
5+
using Ydb.Sdk.Ado.Internal;
56
using Ydb.Topic;
67
using Ydb.Topic.V1;
78

@@ -146,37 +147,44 @@ private async Task WaitBufferAvailable(CancellationToken cancellationToken)
146147

147148
private async void StartWriteWorker()
148149
{
149-
await Initialize();
150-
151150
try
152151
{
153-
while (!_disposeCts.Token.IsCancellationRequested)
154-
{
155-
await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token);
156-
_tcsWakeUp = new TaskCompletionSource();
152+
await Initialize();
157153

158-
if (_toSendBuffer.IsEmpty)
154+
try
155+
{
156+
while (!_disposeCts.Token.IsCancellationRequested)
159157
{
160-
continue;
161-
}
158+
await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token);
159+
_tcsWakeUp = new TaskCompletionSource();
162160

163-
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
164-
try
165-
{
166-
if (_session.IsActive)
161+
if (_toSendBuffer.IsEmpty)
167162
{
168-
await _session.Write(_toSendBuffer);
163+
continue;
164+
}
165+
166+
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
167+
try
168+
{
169+
if (_session.IsActive)
170+
{
171+
await _session.Write(_toSendBuffer);
172+
}
173+
}
174+
finally
175+
{
176+
_sendInFlightMessagesSemaphoreSlim.Release();
169177
}
170178
}
171-
finally
172-
{
173-
_sendInFlightMessagesSemaphoreSlim.Release();
174-
}
179+
}
180+
catch (OperationCanceledException)
181+
{
182+
_logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config);
175183
}
176184
}
177-
catch (OperationCanceledException)
185+
catch (Exception e)
178186
{
179-
_logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config);
187+
_logger.LogCritical(e, "WriteWorker[{WriterConfig}] has unhandled exception! Bug report!", _config);
180188
}
181189
}
182190

@@ -226,21 +234,22 @@ private async Task Initialize()
226234

227235
var receivedInitMessage = stream.Current;
228236

229-
var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues);
230-
231-
if (status.IsNotSuccess)
237+
if (receivedInitMessage.Status.IsNotSuccess())
232238
{
233-
if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None)
239+
var statusCode = receivedInitMessage.Status.Code();
240+
var statusMessage = statusCode.ToMessage(receivedInitMessage.Issues);
241+
242+
if (RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy != RetryPolicy.None)
234243
{
235-
_logger.LogError("Writer initialization failed to start. Reason: {Status}", status);
244+
_logger.LogError("Writer initialization failed to start. Reason: {Status}", statusMessage);
236245

237246
_ = Task.Run(Initialize);
238247
}
239248
else
240249
{
241-
_logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status);
250+
_logger.LogCritical("Writer initialization failed to start. Reason: {Status}", statusMessage);
242251

243-
_session = new NotStartedWriterSession("Initialization failed", status);
252+
_session = new NotStartedWriterSession($"Initialization failed! Reason: {statusMessage}");
244253
}
245254

246255
return;
@@ -390,11 +399,6 @@ public NotStartedWriterSession(string reasonExceptionMessage)
390399
_reasonException = new WriterException(reasonExceptionMessage);
391400
}
392401

393-
public NotStartedWriterSession(string reasonExceptionMessage, Status status)
394-
{
395-
_reasonException = new WriterException(reasonExceptionMessage, status);
396-
}
397-
398402
public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
399403
{
400404
while (toSendBuffer.TryDequeue(out var messageSending))
@@ -481,7 +485,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
481485

482486
var messageData = sendData.MessageData;
483487

484-
if (messageData.SeqNo == default)
488+
if (messageData.SeqNo == 0)
485489
{
486490
messageData.SeqNo = ++currentSeqNum;
487491
}
@@ -511,13 +515,12 @@ private async Task RunProcessingWriteAck()
511515
while (await Stream.MoveNextAsync())
512516
{
513517
var messageFromServer = Stream.Current;
514-
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
515518

516-
if (status.IsNotSuccess)
519+
if (messageFromServer.Status.IsNotSuccess())
517520
{
518521
Logger.LogError(
519522
"WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}",
520-
SessionId, status);
523+
SessionId, messageFromServer.Status.Code().ToMessage(messageFromServer.Issues));
521524
return;
522525
}
523526

src/Ydb.Sdk/src/Status.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private static string IssuesToString(IReadOnlyList<Issue> issues, int currentInd
149149
foreach (var issue in issues)
150150
{
151151
sb.Append(issue.ToString(currentIndent, indent));
152-
sb.Append(Environment.NewLine);
152+
sb.AppendLine();
153153
}
154154

155155
return sb.ToString();
@@ -199,7 +199,7 @@ public override string ToString()
199199
}
200200

201201
sb.Append(", Issues:");
202-
sb.Append(Environment.NewLine);
202+
sb.AppendLine();
203203
sb.Append(Issue.IssuesToString(Issues));
204204

205205
return sb.ToString();

0 commit comments

Comments
 (0)