Skip to content

Commit 4e8254f

Browse files
committed
Returned topicwriter.ErrQueueLimitExceed, accidental removed at v3.81.0
1 parent dbaa3bf commit 4e8254f

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`
2+
13
## v3.81.3
24
* Fixed tracing details check for some metrics
35

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ var (
3636
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll
3737
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
3838
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
39-
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
39+
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
40+
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
4041
PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll
4142
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll
4243

@@ -223,16 +224,18 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
223224
semaphoreWeight := int64(len(messages))
224225
if semaphoreWeight > int64(w.cfg.MaxQueueLen) {
225226
return xerrors.WithStackTrace(fmt.Errorf(
226-
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v",
227+
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w",
227228
w.cfg.MaxQueueLen,
228229
semaphoreWeight,
230+
PublicErrQueueIsFull,
229231
))
230232
}
231233
if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil {
232234
return xerrors.WithStackTrace(
233-
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v",
235+
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w",
234236
semaphoreWeight,
235237
w.cfg.MaxQueueLen,
238+
PublicErrQueueIsFull,
236239
))
237240
}
238241
defer func() {

topic/topicwriter/topicwriter.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ type (
1010
Message = topicwriterinternal.PublicMessage
1111
)
1212

13-
var ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError
13+
var (
14+
ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
15+
ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError
16+
)
1417

1518
// Writer represent write session to topic
1619
// It handles connection problems, reconnect to server when need and resend buffered messages

0 commit comments

Comments
 (0)