Skip to content

Commit 0126bbc

Browse files
authored
Merge pull request #1724 from ydb-platform/allow-writer-queue-overflow
Allow writer queue overflow
2 parents 80db1c5 + db24b7a commit 0126bbc

File tree

7 files changed

+454
-42
lines changed

7 files changed

+454
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Allowed overflow queue limit for one goroutine at time for topic writer
12
* Removed delay before send commit in sync mode of a topic reader
23

34
## v3.105.2

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/google/uuid"
1515
"github.com/jonboulle/clockwork"
16-
"golang.org/x/sync/semaphore"
1716

1817
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
@@ -36,8 +35,8 @@ var (
3635
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll
3736
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
3837
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
39-
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
40-
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
38+
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
39+
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full")) // Deprecated.
4140
PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll
4241
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll
4342

@@ -122,7 +121,7 @@ type WriterReconnector struct {
122121
background background.Worker
123122
retrySettings topic.RetrySettings
124123
writerInstanceID string
125-
semaphore *semaphore.Weighted
124+
semaphore *xsync.SoftWeightedSemaphore
126125
firstInitResponseProcessedChan empty.Chan
127126
lastSeqNo int64
128127
encodersMap *MultiEncoder
@@ -153,7 +152,7 @@ func newWriterReconnectorStopped(
153152
writerInstanceID, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64))
154153
res := &WriterReconnector{
155154
cfg: cfg,
156-
semaphore: semaphore.NewWeighted(int64(cfg.MaxQueueLen)),
155+
semaphore: xsync.NewSoftWeightedSemaphore(int64(cfg.MaxQueueLen)),
157156
queue: newMessageQueue(),
158157
lastSeqNo: -1,
159158
firstInitResponseProcessedChan: make(empty.Chan),
@@ -223,21 +222,10 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
223222
}
224223

225224
semaphoreWeight := int64(len(messages))
226-
if semaphoreWeight > int64(w.cfg.MaxQueueLen) {
227-
return xerrors.WithStackTrace(fmt.Errorf(
228-
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w",
229-
w.cfg.MaxQueueLen,
230-
semaphoreWeight,
231-
PublicErrQueueIsFull,
232-
))
233-
}
234225
if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil {
235226
return xerrors.WithStackTrace(
236-
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w",
237-
semaphoreWeight,
238-
w.cfg.MaxQueueLen,
239-
PublicErrQueueIsFull,
240-
))
227+
fmt.Errorf("ydb: timeout waiting for queue space to become available: %w", err),
228+
)
241229
}
242230
defer func() {
243231
w.semaphore.Release(semaphoreWeight)

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -283,48 +283,147 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
283283
func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
284284
xtest.TestManyTimes(t, func(t testing.TB) {
285285
ctx := xtest.Context(t)
286+
maxQueueLen := int64(2)
286287
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
287288
WithAutoSetSeqNo(false),
288-
WithMaxQueueLen(2),
289+
WithMaxQueueLen(int(maxQueueLen)),
289290
))
290291
w.firstConnectionHandled.Store(true)
291292

292-
waitStartQueueWait := func(targetWaiters int) {
293+
waitSemaphoreFull := func() {
293294
xtest.SpinWaitCondition(t, nil, func() bool {
294-
res := getWaitersCount(w.semaphore) == targetWaiters
295-
296-
return res
295+
// Semaphore is fully acquired when no tokens are available
296+
return w.semaphore.TryAcquire(1) == false
297297
})
298298
}
299299

300+
// Test normal case within queue limit
300301
err := w.Write(ctx, newTestMessages(1, 2))
301302
require.NoError(t, err)
303+
waitSemaphoreFull()
302304

305+
// Test queue overflow with context cancellation
303306
ctxNoQueueSpace, ctxNoQueueSpaceCancel := xcontext.WithCancel(ctx)
304-
305307
go func() {
306-
waitStartQueueWait(1)
308+
waitSemaphoreFull()
307309
ctxNoQueueSpaceCancel()
308310
}()
309311
err = w.Write(ctxNoQueueSpace, newTestMessages(3))
310312
require.Error(t, err)
311313
require.NotErrorIs(t, err, PublicErrMessagesPutToInternalQueueBeforeError)
312314

315+
// Test queue space becomes available after ack
313316
go func() {
314-
waitStartQueueWait(1)
317+
waitSemaphoreFull()
315318
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
316319
{
317320
SeqNo: 1,
318321
},
319322
})
320323
require.NoError(t, ackErr)
321324
}()
322-
323325
err = w.Write(ctx, newTestMessages(3))
324326
require.NoError(t, err)
325327
})
326328
}
327329

330+
// TestWriterReconnector_Write_SoftQueueLimit tests the soft queue limit functionality
331+
func TestWriterReconnector_Write_SoftQueueLimit(t *testing.T) {
332+
xtest.TestManyTimes(t, func(t testing.TB) {
333+
ctx := xtest.Context(t)
334+
maxQueueLen := int64(2)
335+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
336+
WithAutoSetSeqNo(false),
337+
WithMaxQueueLen(int(maxQueueLen)),
338+
))
339+
w.firstConnectionHandled.Store(true)
340+
341+
// First write should succeed even if it exceeds queue limit
342+
// because semaphore is completely free
343+
err := w.Write(ctx, newTestMessages(1, 2, 3)) // 3 messages > maxQueueLen (2)
344+
require.NoError(t, err, "first write with overflow should succeed")
345+
346+
// Second write should fail with timeout because semaphore is already acquired
347+
ctxTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
348+
defer cancel()
349+
err = w.Write(ctxTimeout, newTestMessages(4))
350+
require.Error(t, err, "second write should fail when queue is full")
351+
require.ErrorIs(t, err, context.DeadlineExceeded)
352+
353+
// Release messages and wait for semaphore to be released
354+
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
355+
{SeqNo: 1},
356+
{SeqNo: 2},
357+
{SeqNo: 3},
358+
})
359+
require.NoError(t, ackErr)
360+
361+
// Now we should be able to write up to maxQueueLen messages
362+
err = w.Write(ctx, newTestMessages(4, 5))
363+
require.NoError(t, err, "write should succeed after release")
364+
365+
// But we still can't write more than maxQueueLen when semaphore is not empty
366+
ctxTimeout2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
367+
defer cancel2()
368+
err = w.Write(ctxTimeout2, newTestMessages(6, 7, 8))
369+
require.Error(t, err, "write should fail when exceeding queue limit")
370+
require.ErrorIs(t, err, context.DeadlineExceeded)
371+
372+
// Cleanup: release remaining messages
373+
ackErr = w.queue.AcksReceived([]rawtopicwriter.WriteAck{
374+
{SeqNo: 4},
375+
{SeqNo: 5},
376+
})
377+
require.NoError(t, ackErr)
378+
})
379+
}
380+
381+
// TestWriterReconnector_Write_SoftQueueLimitPartialRelease tests partial release behavior
382+
func TestWriterReconnector_Write_SoftQueueLimitPartialRelease(t *testing.T) {
383+
xtest.TestManyTimes(t, func(t testing.TB) {
384+
ctx := xtest.Context(t)
385+
maxQueueLen := int64(2)
386+
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
387+
WithAutoSetSeqNo(false),
388+
WithMaxQueueLen(int(maxQueueLen)),
389+
))
390+
w.firstConnectionHandled.Store(true)
391+
392+
// Write more than queue limit when semaphore is free
393+
err := w.Write(ctx, newTestMessages(1, 2, 3)) // 3 messages > maxQueueLen (2)
394+
require.NoError(t, err, "first write with overflow should succeed")
395+
396+
// Release two messages, one place is enough to write one message
397+
ackErr := w.queue.AcksReceived([]rawtopicwriter.WriteAck{
398+
{SeqNo: 1},
399+
{SeqNo: 2},
400+
})
401+
require.NoError(t, ackErr)
402+
403+
// We should be able to write one message now
404+
err = w.Write(ctx, newTestMessages(4))
405+
require.NoError(t, err, "write should succeed after partial release")
406+
407+
// But we can't write two messages due to remaining tokens
408+
ctxTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
409+
defer cancel()
410+
err = w.Write(ctxTimeout, newTestMessages(5, 6))
411+
require.Error(t, err, "write should fail when exceeding available space")
412+
require.ErrorIs(t, err, context.DeadlineExceeded)
413+
414+
// Release all remaining messages
415+
ackErr = w.queue.AcksReceived([]rawtopicwriter.WriteAck{
416+
{SeqNo: 3},
417+
{SeqNo: 4},
418+
})
419+
require.NoError(t, ackErr)
420+
421+
// Now we can write up to maxQueueLen again
422+
err = w.Write(ctx, newTestMessages(5, 6))
423+
require.NoError(t, err, "write should succeed after full release")
424+
})
425+
}
426+
328427
func TestMessagesPutToInternalQueueBeforeError(t *testing.T) {
329428
ctx := xtest.Context(t)
330429
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,25 @@
11
package topicwriterinternal
22

33
import (
4-
"container/list"
54
"reflect"
65
"runtime"
7-
"sync"
86
"unsafe"
97

10-
"golang.org/x/sync/semaphore"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
119
)
1210

13-
func getWaitersCount(sem *semaphore.Weighted) int {
11+
// getWaitersCount returns number of goroutines waiting for semaphore acquisition
12+
// by checking overflow state. This is unsafe function which uses reflection
13+
// and unsafe pointers to access internal fields. It should be used only in tests.
14+
func getWaitersCount(sem *xsync.SoftWeightedSemaphore) int64 {
15+
// Prevent garbage collection of the semaphore while we work with its fields
1416
defer runtime.KeepAlive(sem)
1517

18+
// Get access to overflow field through reflection
1619
semVal := reflect.ValueOf(sem).Elem()
17-
mutexField := semVal.FieldByName("mu")
20+
overflowField := semVal.FieldByName("overflow")
21+
overflowAddr := unsafe.Pointer(overflowField.UnsafeAddr())
22+
overflow := (*int64)(overflowAddr)
1823

19-
mutexAddr := unsafe.Pointer(mutexField.UnsafeAddr())
20-
mutex := (*sync.Mutex)(mutexAddr)
21-
mutex.Lock()
22-
defer mutex.Unlock()
23-
24-
waitersField := semVal.FieldByName("waiters")
25-
waitersPointer := unsafe.Pointer(waitersField.UnsafeAddr())
26-
waiters := (*list.List)(waitersPointer)
27-
28-
return waiters.Len()
24+
return *overflow
2925
}

internal/xsync/soft_semaphore.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package xsync
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/sync/semaphore"
7+
)
8+
9+
// SoftWeightedSemaphore extends semaphore.Weighted with ability to acquire
10+
// one request over capacity if semaphore is completely free
11+
type SoftWeightedSemaphore struct {
12+
sem *semaphore.Weighted
13+
capacity int64
14+
mu Mutex
15+
overflow int64 // Number of tokens in overflow state
16+
}
17+
18+
// NewSoftWeightedSemaphore creates new SoftWeightedSemaphore with given capacity
19+
func NewSoftWeightedSemaphore(n int64) *SoftWeightedSemaphore {
20+
return &SoftWeightedSemaphore{
21+
sem: semaphore.NewWeighted(n),
22+
capacity: n,
23+
}
24+
}
25+
26+
// Acquire acquires the semaphore with a weight of n.
27+
// If the semaphore is completely free, the acquisition will succeed regardless of weight.
28+
func (s *SoftWeightedSemaphore) Acquire(ctx context.Context, n int64) error {
29+
// If request doesn't exceed capacity, use normal path
30+
if n <= s.capacity {
31+
return s.sem.Acquire(ctx, n)
32+
}
33+
34+
// For large requests, try to acquire entire semaphore
35+
if err := s.sem.Acquire(ctx, s.capacity); err != nil {
36+
return err
37+
}
38+
39+
s.setOverflow(n)
40+
41+
return nil
42+
}
43+
44+
// Release releases n tokens back to the semaphore.
45+
func (s *SoftWeightedSemaphore) Release(n int64) {
46+
s.mu.WithLock(func() {
47+
if n >= s.overflow {
48+
n -= s.overflow
49+
s.overflow = 0
50+
if n > 0 {
51+
s.sem.Release(n)
52+
}
53+
} else {
54+
s.overflow -= n
55+
}
56+
})
57+
}
58+
59+
// TryAcquire tries to acquire the semaphore with a weight of n without blocking.
60+
// If the semaphore is completely free, the acquisition will succeed regardless of weight.
61+
func (s *SoftWeightedSemaphore) TryAcquire(n int64) bool {
62+
// If request doesn't exceed capacity, use normal path
63+
if n <= s.capacity {
64+
return s.sem.TryAcquire(n)
65+
}
66+
67+
// For large requests, try to acquire entire semaphore
68+
if !s.sem.TryAcquire(s.capacity) {
69+
return false
70+
}
71+
72+
s.setOverflow(n)
73+
74+
return true
75+
}
76+
77+
// setOverflow sets the overflow value when entire semaphore is successfully acquired
78+
func (s *SoftWeightedSemaphore) setOverflow(n int64) {
79+
s.mu.WithLock(func() {
80+
s.overflow = n - s.capacity
81+
})
82+
}

0 commit comments

Comments
 (0)