Skip to content

Commit 2c5c7bd

Browse files
fix: deduplication declare param in yql query (#233)
* Fixed bug in Topic Writer: worker is stopped by disposeCts * Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query * Deleted property BufferOverflowRetryTimeoutMs * Discovery log level in info to debug
1 parent 6e10d81 commit 2c5c7bd

File tree

10 files changed

+70
-130
lines changed

10 files changed

+70
-130
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
- Fixed bug in Topic Writer: worker is stopped by disposeCts
2+
- Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query
3+
- Deleted property BufferOverflowRetryTimeoutMs
4+
15
## v0.9.2
26
- Fixed bug: delete deadline grpc timeout on AttachStream
37

slo/src/Internal/SloContext.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@ public abstract class SloContext<T> where T : IDisposable
1111
{
1212
// ReSharper disable once StaticMemberInGenericType
1313
protected static readonly ILoggerFactory Factory =
14-
LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));
14+
LoggerFactory.Create(builder =>
15+
{
16+
builder.AddConsole().SetMinimumLevel(LogLevel.Information);
17+
builder.AddFilter("Ydb.Sdk.Ado", LogLevel.Debug);
18+
builder.AddFilter("Ydb.Sdk.Services.Query", LogLevel.Debug);
19+
builder.AddFilter("Ydb.Sdk.Services.Topic", LogLevel.Debug);
20+
});
1521

1622
protected static readonly ILogger Logger = Factory.CreateLogger<SloContext<T>>();
1723

src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ internal static ParsedResult Parse(string sql)
1717

1818
var newYql = new StringBuilder();
1919
var paramNames = new List<string>();
20+
var foundParamNames = new HashSet<string>();
2021

2122
var prevToken = 0;
2223

@@ -114,7 +115,12 @@ internal static ParsedResult Parse(string sql)
114115

115116
var originalParamName = $"${sql[prevToken .. curToken]}";
116117

117-
paramNames.Add(originalParamName);
118+
if (!foundParamNames.Contains(originalParamName))
119+
{
120+
paramNames.Add(originalParamName);
121+
}
122+
123+
foundParamNames.Add(originalParamName);
118124
newYql.Append(originalParamName);
119125
prevToken = curToken;
120126

src/Ydb.Sdk/src/Driver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private async Task<Status> DiscoverEndpoints()
155155

156156
var resultProto = response.Operation.Result.Unpack<ListEndpointsResult>();
157157

158-
Logger.LogInformation(
158+
Logger.LogDebug(
159159
"Successfully discovered endpoints: {EndpointsCount}, self location: {SelfLocation}, sdk info: {SdkInfo}",
160160
resultProto.Endpoints.Count, resultProto.SelfLocation, _sdkInfo);
161161

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ internal class Writer<TValue> : IWriter<TValue>
2626
private readonly CancellationTokenSource _disposeCts = new();
2727

2828
private volatile TaskCompletionSource _tcsWakeUp = new();
29+
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
2930
private volatile IWriteSession _session = null!;
30-
31-
private int _limitBufferMaxSize;
31+
private volatile int _limitBufferMaxSize;
3232

3333
internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
3434
{
@@ -79,7 +79,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
7979

8080
while (true)
8181
{
82-
var curLimitBufferSize = Volatile.Read(ref _limitBufferMaxSize);
82+
var curLimitBufferSize = _limitBufferMaxSize;
8383

8484
if ( // sending one biggest message anyway
8585
(curLimitBufferSize == _config.BufferMaxSize && data.Length > curLimitBufferSize)
@@ -104,9 +104,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
104104

105105
try
106106
{
107-
await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken);
107+
await WaitBufferAvailable(cancellationToken);
108108
}
109-
catch (TaskCanceledException)
109+
catch (OperationCanceledException)
110110
{
111111
throw new WriterException("Buffer overflow");
112112
}
@@ -121,24 +121,46 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
121121
finally
122122
{
123123
Interlocked.Add(ref _limitBufferMaxSize, data.Length);
124+
125+
_tcsBufferAvailableEvent.TrySetResult();
124126
}
125127
}
126128

129+
private async Task WaitBufferAvailable(CancellationToken cancellationToken)
130+
{
131+
var tcsBufferAvailableEvent = _tcsBufferAvailableEvent;
132+
133+
await tcsBufferAvailableEvent.Task.WaitAsync(cancellationToken);
134+
135+
Interlocked.CompareExchange(
136+
ref _tcsBufferAvailableEvent,
137+
new TaskCompletionSource(),
138+
tcsBufferAvailableEvent
139+
);
140+
}
141+
127142
private async void StartWriteWorker()
128143
{
129144
await Initialize();
130145

131-
while (!_disposeCts.Token.IsCancellationRequested)
146+
try
132147
{
133-
await _tcsWakeUp.Task;
134-
_tcsWakeUp = new TaskCompletionSource();
135-
136-
if (_toSendBuffer.IsEmpty)
148+
while (!_disposeCts.Token.IsCancellationRequested)
137149
{
138-
continue;
139-
}
150+
await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token);
151+
_tcsWakeUp = new TaskCompletionSource();
140152

141-
await _session.Write(_toSendBuffer);
153+
if (_toSendBuffer.IsEmpty)
154+
{
155+
continue;
156+
}
157+
158+
await _session.Write(_toSendBuffer);
159+
}
160+
}
161+
catch (OperationCanceledException)
162+
{
163+
_logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config);
142164
}
143165
}
144166

src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,27 +48,14 @@ public WriterBuilder(IDriver driver, string topicPath)
4848
/// </remarks>
4949
public ISerializer<TValue>? Serializer { get; set; }
5050

51-
52-
/// <summary>
53-
/// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected.
54-
/// This timeout specifies how long the system should wait before attempting to retry the operation.
55-
/// </summary>
56-
/// <remarks>
57-
/// This timeout is important for managing system performance and stability.
58-
/// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention
59-
/// and degrading system performance. Conversely, too long a timeout might delay processing significantly.
60-
/// </remarks>
61-
public int BufferOverflowRetryTimeoutMs { get; set; } = 10;
62-
6351
public IWriter<TValue> Build()
6452
{
6553
var config = new WriterConfig(
6654
topicPath: TopicPath,
6755
producerId: ProducerId,
6856
codec: Codec,
6957
bufferMaxSize: BufferMaxSize,
70-
partitionId: PartitionId,
71-
bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs
58+
partitionId: PartitionId
7259
);
7360

7461
return new Writer<TValue>(

src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@ internal WriterConfig(
99
string? producerId,
1010
Codec codec,
1111
int bufferMaxSize,
12-
long? partitionId,
13-
int bufferOverflowRetryTimeoutMs)
12+
long? partitionId)
1413
{
1514
TopicPath = topicPath;
1615
ProducerId = producerId;
1716
Codec = codec;
1817
BufferMaxSize = bufferMaxSize;
1918
PartitionId = partitionId;
20-
BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs;
2119
}
2220

2321
public string TopicPath { get; }
@@ -30,17 +28,15 @@ internal WriterConfig(
3028

3129
public long? PartitionId { get; }
3230

33-
public int BufferOverflowRetryTimeoutMs { get; }
34-
3531
public override string ToString()
3632
{
37-
var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath);
33+
var toString = new StringBuilder().Append("TopicPath: ").Append(TopicPath);
3834

3935
if (ProducerId != null)
4036
{
4137
toString.Append(", ProducerId: ").Append(ProducerId);
4238
}
4339

44-
return toString.Append(", Codec: ").Append(Codec).Append(']').ToString();
40+
return toString.Append(", Codec: ").Append(Codec).ToString();
4541
}
4642
}

src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,13 @@ public void Parse_WhenMultilineStringLiterals_ReturnSql()
241241
SELECT $param; SELECT $p2; SELECT $p_3;", sql);
242242
Assert.Equal(new[] { "$param", "$p2", "$p_3" }, paramNames);
243243
}
244+
245+
[Fact]
246+
public void Parse_WhenRepeatedOneParam_ReturnThisParamInParamNames()
247+
{
248+
var (sql, paramNames) = SqlParser.Parse("SELECT @a, @a, @a;");
249+
250+
Assert.Equal("SELECT $a, $a, $a;", sql);
251+
Assert.Equal(new[] { "$a" }, paramNames);
252+
}
244253
}

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted()
6161
topicSettings.Consumers.Add(new Consumer("Consumer"));
6262
await topicClient.CreateTopic(topicSettings);
6363

64-
using var writer = new WriterBuilder<int>(_driver, topicName) { ProducerId = "producerId" }.Build();
64+
using var writer = new WriterBuilder<int>(_driver, topicName)
65+
{ ProducerId = "producerId" }.Build();
6566

6667
var tasks = new List<Task>();
6768
for (var i = 0; i < messageCount; i++)

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 1 addition & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using Grpc.Core;
22
using Moq;
33
using Xunit;
4-
using Xunit.Abstractions;
54
using Ydb.Issue;
65
using Ydb.Sdk.Services.Topic;
76
using Ydb.Sdk.Services.Topic.Writer;
@@ -15,13 +14,11 @@ namespace Ydb.Sdk.Tests.Topic;
1514

1615
public class WriterUnitTests
1716
{
18-
private readonly ITestOutputHelper _testOutputHelper;
1917
private readonly Mock<IDriver> _mockIDriver = new();
2018
private readonly Mock<WriterStream> _mockStream = new();
2119

22-
public WriterUnitTests(ITestOutputHelper testOutputHelper)
20+
public WriterUnitTests()
2321
{
24-
_testOutputHelper = testOutputHelper;
2522
_mockIDriver.Setup(driver => driver.BidirectionalStreamCall(
2623
It.IsAny<Method<FromClient, StreamWriteMessage.Types.FromServer>>(),
2724
It.IsAny<GrpcRequestSettings>())
@@ -317,94 +314,6 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
317314
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
318315
}
319316

320-
[Fact]
321-
public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow()
322-
{
323-
const int countBatchSendingSize = 1000;
324-
const int batchTasksSize = 100;
325-
const int bufferSize = 100;
326-
const int messageSize = sizeof(int);
327-
328-
Assert.True(batchTasksSize > bufferSize / 4);
329-
Assert.True(bufferSize % 4 == 0);
330-
331-
var taskSource = new TaskCompletionSource<bool>();
332-
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
333-
.Returns(Task.CompletedTask);
334-
var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync())
335-
.Returns(new ValueTask<bool>(true))
336-
.Returns(new ValueTask<bool>(taskSource.Task));
337-
var sequentialResult = _mockStream.SetupSequence(stream => stream.Current)
338-
.Returns(new StreamWriteMessage.Types.FromServer
339-
{
340-
InitResponse = new StreamWriteMessage.Types.InitResponse
341-
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
342-
Status = StatusIds.Types.StatusCode.Success
343-
});
344-
using var writer = new WriterBuilder<int>(_mockIDriver.Object, "/topic")
345-
{
346-
ProducerId = "producerId",
347-
BufferMaxSize = bufferSize /* bytes */,
348-
BufferOverflowRetryTimeoutMs = 1_000
349-
}.Build();
350-
351-
for (var attempt = 0; attempt < countBatchSendingSize; attempt++)
352-
{
353-
_testOutputHelper.WriteLine($"Processing attempt {attempt}");
354-
var cts = new CancellationTokenSource();
355-
356-
var tasks = new List<Task<WriteResult>>();
357-
var serverAck = new StreamWriteMessage.Types.FromServer
358-
{
359-
WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 },
360-
Status = StatusIds.Types.StatusCode.Success
361-
};
362-
for (var i = 0; i < batchTasksSize; i++)
363-
{
364-
tasks.Add(writer.WriteAsync(100, cts.Token));
365-
serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
366-
{
367-
SeqNo = bufferSize / messageSize * attempt + i + 1,
368-
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
369-
{ Offset = i * messageSize + bufferSize * attempt }
370-
});
371-
}
372-
373-
sequentialResult.Returns(() =>
374-
{
375-
// ReSharper disable once AccessToModifiedClosure
376-
Volatile.Write(ref taskSource, new TaskCompletionSource<bool>());
377-
mockNextAsync.Returns(() =>
378-
{
379-
cts.Cancel();
380-
return new ValueTask<bool>(Volatile.Read(ref taskSource).Task);
381-
});
382-
return serverAck;
383-
});
384-
taskSource.SetResult(true);
385-
386-
var countSuccess = 0;
387-
var countErrors = 0;
388-
foreach (var task in tasks)
389-
{
390-
try
391-
{
392-
var res = await task;
393-
countSuccess++;
394-
Assert.Equal(PersistenceStatus.Written, res.Status);
395-
}
396-
catch (WriterException e)
397-
{
398-
countErrors++;
399-
Assert.Equal("Buffer overflow", e.Message);
400-
}
401-
}
402-
403-
Assert.Equal(bufferSize / messageSize, countSuccess);
404-
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors);
405-
}
406-
}
407-
408317
/*
409318
* Performed invocations:
410319

0 commit comments

Comments
 (0)