Skip to content

Commit 4cd685d

Browse files
authored
Merge branch 'master' into master
2 parents 7503a07 + d6ceff7 commit 4cd685d

File tree

8 files changed

+169
-62
lines changed

8 files changed

+169
-62
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
12
* Added type assertion checks to enhance type safety and prevent unexpected panics in critical sections of the codebase
3+
4+
## v3.66.1
5+
* Added flush messages from buffer before close topic writer
26
* Added Flush method for topic writer
37

48
## v3.66.0

internal/topic/topicwriterinternal/queue.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ type messageQueue struct {
3636
closedErr error
3737
acksReceivedEvent xsync.EventBroadcast
3838

39-
m xsync.RWMutex
40-
closed bool
41-
closedChan empty.Chan
42-
lastWrittenIndex int
43-
lastSentIndex int
44-
lastSeqNo int64
39+
m xsync.RWMutex
40+
stopReceiveMessagesReason error
41+
closed bool
42+
closedChan empty.Chan
43+
lastWrittenIndex int
44+
lastSentIndex int
45+
lastSeqNo int64
4546

4647
messagesByOrder map[int]messageWithDataContent
4748
seqNoToOrderID map[int64]int
@@ -77,8 +78,10 @@ func (q *messageQueue) addMessages(messages []messageWithDataContent, needWaiter
7778
q.m.Lock()
7879
defer q.m.Unlock()
7980

80-
if q.closed {
81-
return waiter, xerrors.WithStackTrace(fmt.Errorf("ydb: add message to closed message queue: %w", q.closedErr))
81+
if q.stopReceiveMessagesReason != nil {
82+
return waiter, xerrors.WithStackTrace(
83+
fmt.Errorf("ydb: add message to closed message queue: %w", q.stopReceiveMessagesReason),
84+
)
8285
}
8386

8487
if err := q.checkNewMessagesBeforeAddNeedLock(messages); err != nil {
@@ -181,6 +184,19 @@ func (q *messageQueue) ackReceivedNeedLock(seqNo int64) error {
181184
return nil
182185
}
183186

187+
func (q *messageQueue) StopAddNewMessages(reason error) {
188+
q.m.Lock()
189+
defer q.m.Unlock()
190+
191+
q.stopAddNewMessagesNeedLock(reason)
192+
}
193+
194+
func (q *messageQueue) stopAddNewMessagesNeedLock(reason error) {
195+
if q.stopReceiveMessagesReason == nil {
196+
q.stopReceiveMessagesReason = reason
197+
}
198+
}
199+
184200
func (q *messageQueue) Close(err error) error {
185201
isFirstTimeClosed := false
186202
q.m.Lock()
@@ -193,6 +209,8 @@ func (q *messageQueue) Close(err error) error {
193209
}
194210
}()
195211

212+
q.stopAddNewMessagesNeedLock(err)
213+
196214
if q.closed {
197215
return xerrors.WithStackTrace(errCloseClosedMessageQueue)
198216
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,17 @@ func (w *WriterReconnector) Flush(ctx context.Context) error {
330330
}
331331

332332
func (w *WriterReconnector) Close(ctx context.Context) error {
333-
return w.close(ctx, xerrors.WithStackTrace(errStopWriterReconnector))
333+
reason := xerrors.WithStackTrace(errStopWriterReconnector)
334+
w.queue.StopAddNewMessages(reason)
335+
336+
flushErr := w.Flush(ctx)
337+
closeErr := w.close(ctx, reason)
338+
339+
if flushErr != nil {
340+
return flushErr
341+
}
342+
343+
return closeErr
334344
}
335345

336346
func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) {
@@ -339,9 +349,13 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
339349
onDone(resErr)
340350
}()
341351

342-
resErr = w.queue.Close(reason)
352+
closeErr := w.queue.Close(reason)
353+
if resErr == nil && closeErr != nil {
354+
resErr = closeErr
355+
}
356+
343357
bgErr := w.background.Close(ctx, reason)
344-
if resErr == nil {
358+
if resErr == nil && bgErr != nil {
345359
resErr = bgErr
346360
}
347361

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 90 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
199199
Data: bytes.NewReader(messContent),
200200
}}))
201201

202-
require.Equal(t, rawtopiccommon.CodecRaw, <-messReceived)
202+
mess := <-messReceived
203+
require.Equal(t, rawtopiccommon.CodecRaw, mess)
203204
})
204205
t.Run("ForceGzip", func(t *testing.T) {
205206
var err error
@@ -540,67 +541,107 @@ func TestWriterImpl_Reconnect(t *testing.T) {
540541
}
541542

542543
func TestWriterImpl_CloseWithFlush(t *testing.T) {
543-
e := newTestEnv(t, nil)
544+
type flushMethod func(ctx context.Context, writer *WriterReconnector) error
544545

545-
messageTime := time.Date(2023, 9, 7, 11, 34, 0, 0, time.UTC)
546-
messageData := []byte("123")
546+
f := func(t testing.TB, flush flushMethod) {
547+
e := newTestEnv(t, nil)
547548

548-
const seqNo = 36
549+
messageTime := time.Date(2023, 9, 7, 11, 34, 0, 0, time.UTC)
550+
messageData := []byte("123")
549551

550-
writeCompleted := make(empty.Chan)
551-
e.stream.EXPECT().Send(&rawtopicwriter.WriteRequest{
552-
Messages: []rawtopicwriter.MessageData{
553-
{
554-
SeqNo: seqNo,
555-
CreatedAt: messageTime,
556-
UncompressedSize: int64(len(messageData)),
557-
Partitioning: rawtopicwriter.Partitioning{},
558-
Data: messageData,
552+
const seqNo = 36
553+
554+
writeCompleted := make(empty.Chan)
555+
e.stream.EXPECT().Send(&rawtopicwriter.WriteRequest{
556+
Messages: []rawtopicwriter.MessageData{
557+
{
558+
SeqNo: seqNo,
559+
CreatedAt: messageTime,
560+
UncompressedSize: int64(len(messageData)),
561+
Partitioning: rawtopicwriter.Partitioning{},
562+
Data: messageData,
563+
},
559564
},
560-
},
561-
Codec: rawtopiccommon.CodecRaw,
562-
}).Return(nil)
565+
Codec: rawtopiccommon.CodecRaw,
566+
}).Return(nil)
563567

564-
closeCompleted := make(empty.Chan)
565-
go func() {
566-
err := e.writer.Write(e.ctx, []PublicMessage{{
567-
SeqNo: seqNo,
568-
CreatedAt: messageTime,
569-
Data: bytes.NewReader(messageData),
570-
}})
571-
close(writeCompleted)
572-
require.NoError(t, err)
573-
}()
568+
flushCompleted := make(empty.Chan)
569+
go func() {
570+
err := e.writer.Write(e.ctx, []PublicMessage{{
571+
SeqNo: seqNo,
572+
CreatedAt: messageTime,
573+
Data: bytes.NewReader(messageData),
574+
}})
575+
close(writeCompleted)
576+
require.NoError(t, err)
577+
}()
574578

575-
<-writeCompleted
579+
<-writeCompleted
576580

577-
go func() {
578-
require.NoError(t, e.writer.Flush(e.ctx))
579-
require.NoError(t, e.writer.Close(e.ctx))
580-
close(closeCompleted)
581-
}()
581+
go func() {
582+
require.NoError(t, flush(e.ctx, e.writer))
583+
close(flushCompleted)
584+
}()
582585

583-
select {
584-
case <-closeCompleted:
585-
t.Fatal("flush and close must complete only after message is acked")
586-
case <-time.After(100 * time.Millisecond):
587-
// pass
588-
}
586+
select {
587+
case <-flushCompleted:
588+
t.Fatal("flush and close must complete only after message is acked")
589+
case <-time.After(10 * time.Millisecond):
590+
// pass
591+
}
589592

590-
e.sendFromServer(&rawtopicwriter.WriteResult{
591-
Acks: []rawtopicwriter.WriteAck{
592-
{
593-
SeqNo: seqNo,
594-
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{
595-
Type: rawtopicwriter.WriteStatusTypeWritten,
596-
WrittenOffset: 4,
593+
e.sendFromServer(&rawtopicwriter.WriteResult{
594+
Acks: []rawtopicwriter.WriteAck{
595+
{
596+
SeqNo: seqNo,
597+
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{
598+
Type: rawtopicwriter.WriteStatusTypeWritten,
599+
WrittenOffset: 4,
600+
},
597601
},
598602
},
603+
PartitionID: e.partitionID,
604+
})
605+
606+
xtest.WaitChannelClosed(t, flushCompleted)
607+
}
608+
609+
tests := []struct {
610+
name string
611+
flush flushMethod
612+
}{
613+
{
614+
name: "close",
615+
flush: func(ctx context.Context, writer *WriterReconnector) error {
616+
return writer.Close(ctx)
617+
},
599618
},
600-
PartitionID: e.partitionID,
601-
})
619+
{
620+
name: "flush",
621+
flush: func(ctx context.Context, writer *WriterReconnector) error {
622+
return writer.Close(ctx)
623+
},
624+
},
625+
{
626+
name: "flush and close",
627+
flush: func(ctx context.Context, writer *WriterReconnector) error {
628+
err := writer.Flush(ctx)
629+
if err != nil {
630+
return err
631+
}
632+
633+
return writer.Close(ctx)
634+
},
635+
},
636+
}
602637

603-
xtest.WaitChannelClosed(t, closeCompleted)
638+
for _, test := range tests {
639+
t.Run(test.name, func(t *testing.T) {
640+
xtest.TestManyTimes(t, func(t testing.TB) {
641+
f(t, test.flush)
642+
})
643+
})
644+
}
604645
}
605646

606647
func TestAllMessagesHasSameBufCodec(t *testing.T) {

internal/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package version
33
const (
44
Major = "3"
55
Minor = "66"
6-
Patch = "0"
6+
Patch = "1"
77

88
Prefix = "ydb-go-sdk"
99
)

tests/integration/helpers_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver {
103103
)...,
104104
)
105105
clean := func() {
106-
scope.Require.NoError(driver.Close(scope.Ctx))
106+
if driver != nil {
107+
scope.Require.NoError(driver.Close(scope.Ctx))
108+
}
107109
}
108110

109111
return fixenv.NewGenericResultWithCleanup(driver, clean), err

tests/integration/topic_read_writer_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,32 @@ func TestTopicWriterWithManualPartitionSelect(t *testing.T) {
438438
require.NoError(t, err)
439439
}
440440

441+
func TestWriterFlushMessagesBeforeClose(t *testing.T) {
442+
s := newScope(t)
443+
ctx := s.Ctx
444+
writer, err := s.Driver().Topic().StartWriter(s.TopicPath(), topicoptions.WithWriterWaitServerAck(false))
445+
require.NoError(t, err)
446+
447+
count := 1000
448+
for i := 0; i < count; i++ {
449+
require.NoError(t, writer.Write(ctx, topicwriter.Message{Data: strings.NewReader(strconv.Itoa(i))}))
450+
}
451+
require.NoError(t, writer.Close(ctx))
452+
453+
for i := 0; i < count; i++ {
454+
readCtx, cancel := context.WithTimeout(ctx, time.Second)
455+
mess, err := s.TopicReader().ReadMessage(readCtx)
456+
cancel()
457+
require.NoError(t, err)
458+
459+
messBody, err := io.ReadAll(mess)
460+
require.NoError(t, err)
461+
messBodyString := string(messBody)
462+
require.Equal(t, strconv.Itoa(i), messBodyString)
463+
cancel()
464+
}
465+
}
466+
441467
var topicCounter int
442468

443469
func createTopic(ctx context.Context, t testing.TB, db *ydb.Driver) (topicPath string) {

topic/topicwriter/topicwriter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err
6666
return publicInfo, nil
6767
}
6868

69+
// Close will flush rested messages from buffer and close the writer.
70+
// You can't write new messages after call Close
6971
func (w *Writer) Close(ctx context.Context) error {
7072
return w.inner.Close(ctx)
7173
}

0 commit comments

Comments
 (0)