Skip to content

Commit d7383ea

Browse files
fix linter
1 parent 77e34ca commit d7383ea

File tree

3 files changed

+62
-21
lines changed

3 files changed

+62
-21
lines changed

src/Ydb.Sdk/src/Services/Topic/Producer.cs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ internal Producer(ProducerConfig producerConfig, ISerializer<TValue> serializer)
3737

3838
internal async Task Initialize()
3939
{
40-
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
41-
GrpcRequestSettings.DefaultInstance);
40+
_logger.LogInformation("Producer session initialization started. ProducerConfig: {ProducerConfig}", _config);
41+
42+
var stream = _config.Driver.BidirectionalStreamCall(
43+
TopicService.StreamWriteMethod,
44+
GrpcRequestSettings.DefaultInstance
45+
);
4246

4347
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
4448
if (_config.ProducerId != null)
@@ -54,7 +58,8 @@ internal async Task Initialize()
5458
await stream.Write(new MessageFromClient { InitRequest = initRequest });
5559
if (!await stream.MoveNextAsync())
5660
{
57-
throw new YdbProducerException("Write stream is closed by YDB server");
61+
throw new YdbProducerException(
62+
$"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}");
5863
}
5964

6065
var receivedInitMessage = stream.Current;
@@ -139,26 +144,41 @@ public ProducerSession(
139144

140145
internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlightMessages)
141146
{
142-
await foreach (var messageFromServer in _stream)
147+
try
143148
{
144-
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
145-
146-
if (status.IsNotSuccess)
149+
Logger.LogInformation("ProducerSession[{SessionId}] is running processing writeAck", SessionId);
150+
151+
await foreach (var messageFromServer in _stream)
147152
{
148-
Logger.LogWarning("");
149-
return;
150-
}
153+
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
151154

152-
foreach (var ack in messageFromServer.WriteResponse.Acks)
153-
{
154-
if (!inFlightMessages.TryDequeue(out var messageFromClient))
155+
if (status.IsNotSuccess)
155156
{
156-
break;
157+
Logger.LogWarning(
158+
"ProducerSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}",
159+
SessionId, status);
160+
return;
157161
}
158162

159-
messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack));
163+
foreach (var ack in messageFromServer.WriteResponse.Acks)
164+
{
165+
if (!inFlightMessages.TryDequeue(out var messageFromClient))
166+
{
167+
break;
168+
}
169+
170+
messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack));
171+
}
160172
}
161173
}
174+
catch (Exception e)
175+
{
176+
Logger.LogError(e, "ProducerSession[{SessionId}] have error on processing writeAck", SessionId);
177+
}
178+
finally
179+
{
180+
ReconnectSession();
181+
}
162182
}
163183

164184
internal async Task Write(ConcurrentQueue<MessageSending> toSendBuffer,

src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Text;
2+
13
namespace Ydb.Sdk.Services.Topic;
24

35
public class ProducerConfig
@@ -13,4 +15,21 @@ public ProducerConfig(Driver driver, string topicPath)
1315
public string? ProducerId { get; set; }
1416
public string? MessageGroupId { get; set; }
1517
public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw
18+
19+
public override string ToString()
20+
{
21+
var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath);
22+
23+
if (ProducerId != null)
24+
{
25+
toString.Append(", ProducerId: ").Append(ProducerId);
26+
}
27+
28+
if (MessageGroupId != null)
29+
{
30+
toString.Append(", MessageGroupId: ").Append(MessageGroupId);
31+
}
32+
33+
return toString.Append(", Codec: ").Append(Codec).Append(']').ToString();
34+
}
1635
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,29 @@ namespace Ydb.Sdk.Services.Topic;
55
internal abstract class TopicSession : IDisposable
66
{
77
private readonly Func<Task> _initialize;
8-
8+
99
protected readonly ILogger Logger;
1010
protected readonly string SessionId;
11-
11+
1212
private int _isActive = 1;
1313

1414
protected TopicSession(ILogger logger, string sessionId, Func<Task> initialize)
1515
{
1616
Logger = logger;
17-
_initialize = initialize;
1817
SessionId = sessionId;
18+
_initialize = initialize;
1919
}
2020

2121
protected async void ReconnectSession()
2222
{
2323
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
2424
{
25-
return;
25+
Logger.LogWarning("The reconnect has already been launched");
26+
27+
return;
2628
}
2729

30+
2831
while (true)
2932
{
3033
try
@@ -38,9 +41,8 @@ protected async void ReconnectSession()
3841
}
3942
}
4043
}
41-
44+
4245
public void Dispose()
4346
{
44-
4547
}
4648
}

0 commit comments

Comments
 (0)