Skip to content

Commit 43d8e33

Browse files
authored
Merge pull request #864 Introduce topicwriter Writer.Flush() method from zveznicht/issues/851/flush
@zveznicht thanks for the pr :)
2 parents d063e33 + e7f9e27 commit 43d8e33

File tree

8 files changed

+105
-1
lines changed

8 files changed

+105
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added Flush method for topic writer
2+
13
## v3.66.0
24
* Added experimental package `retry/budget` for limit second and subsequent retry attempts
35
* Refactored internals for enabling `containedctx` linter

internal/topic/topicwriterinternal/queue.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,16 @@ func (q *messageQueue) Wait(ctx context.Context, waiter MessageQueueAckWaiter) e
320320
}
321321
}
322322

323+
// WaitLastWritten waits for last written message gets ack.
324+
func (q *messageQueue) WaitLastWritten(ctx context.Context) error {
325+
var lastIndex int
326+
q.m.WithRLock(func() {
327+
lastIndex = q.lastWrittenIndex
328+
})
329+
330+
return q.Wait(ctx, MessageQueueAckWaiter{sequenseNumbers: []int{lastIndex}})
331+
}
332+
323333
type MessageQueueAckWaiter struct {
324334
sequenseNumbers []int
325335
}

internal/topic/topicwriterinternal/writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,7 @@ func (w *Writer) WaitInit(ctx context.Context) (info InitialInfo, err error) {
5555
func (w *Writer) Close(ctx context.Context) error {
5656
return w.streamWriter.Close(ctx)
5757
}
58+
59+
func (w *Writer) Flush(ctx context.Context) error {
60+
return w.streamWriter.Flush(ctx)
61+
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage)
325325
return res, nil
326326
}
327327

328+
func (w *WriterReconnector) Flush(ctx context.Context) error {
329+
return w.queue.WaitLastWritten(ctx)
330+
}
331+
328332
func (w *WriterReconnector) Close(ctx context.Context) error {
329333
return w.close(ctx, xerrors.WithStackTrace(errStopWriterReconnector))
330334
}

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,70 @@ func TestWriterImpl_Reconnect(t *testing.T) {
530530
})
531531
}
532532

533+
func TestWriterImpl_CloseWithFlush(t *testing.T) {
534+
e := newTestEnv(t, nil)
535+
536+
messageTime := time.Date(2023, 9, 7, 11, 34, 0, 0, time.UTC)
537+
messageData := []byte("123")
538+
539+
const seqNo = 36
540+
541+
writeCompleted := make(empty.Chan)
542+
e.stream.EXPECT().Send(&rawtopicwriter.WriteRequest{
543+
Messages: []rawtopicwriter.MessageData{
544+
{
545+
SeqNo: seqNo,
546+
CreatedAt: messageTime,
547+
UncompressedSize: int64(len(messageData)),
548+
Partitioning: rawtopicwriter.Partitioning{},
549+
Data: messageData,
550+
},
551+
},
552+
Codec: rawtopiccommon.CodecRaw,
553+
}).Return(nil)
554+
555+
closeCompleted := make(empty.Chan)
556+
go func() {
557+
err := e.writer.Write(e.ctx, []PublicMessage{{
558+
SeqNo: seqNo,
559+
CreatedAt: messageTime,
560+
Data: bytes.NewReader(messageData),
561+
}})
562+
close(writeCompleted)
563+
require.NoError(t, err)
564+
}()
565+
566+
<-writeCompleted
567+
568+
go func() {
569+
require.NoError(t, e.writer.Flush(e.ctx))
570+
require.NoError(t, e.writer.Close(e.ctx))
571+
close(closeCompleted)
572+
}()
573+
574+
select {
575+
case <-closeCompleted:
576+
t.Fatal("flush and close must complete only after message is acked")
577+
case <-time.After(100 * time.Millisecond):
578+
// pass
579+
}
580+
581+
e.sendFromServer(&rawtopicwriter.WriteResult{
582+
Acks: []rawtopicwriter.WriteAck{
583+
{
584+
SeqNo: seqNo,
585+
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{
586+
Type: rawtopicwriter.WriteStatusTypeWritten,
587+
WrittenOffset: 4,
588+
},
589+
},
590+
},
591+
PartitionID: e.partitionID,
592+
})
593+
594+
xtest.WaitChannelClosed(t, closeCompleted)
595+
}
596+
533597
func TestAllMessagesHasSameBufCodec(t *testing.T) {
534598
t.Run("Empty", func(t *testing.T) {
535599
require.True(t, allMessagesHasSameBufCodec(nil))
@@ -891,7 +955,7 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
891955
require.NoError(t, res.writer.waitFirstInitResponse(res.ctx))
892956

893957
t.Cleanup(func() {
894-
res.writer.close(context.Background(), errors.New("stop writer test environment"))
958+
_ = res.writer.close(context.Background(), errors.New("stop writer test environment"))
895959
close(res.stopReadEvents)
896960
<-streamClosed
897961
})

internal/topic/topicwriterinternal/writer_stream_interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ type StreamWriter interface {
99
Write(ctx context.Context, messages []PublicMessage) error
1010
WaitInit(ctx context.Context) (info InitialInfo, err error)
1111
Close(ctx context.Context) error
12+
Flush(ctx context.Context) error
1213
}
1314

1415
type InitialInfo struct {

internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

topic/topicwriter/topicwriter.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,8 @@ func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err
6969
func (w *Writer) Close(ctx context.Context) error {
7070
return w.inner.Close(ctx)
7171
}
72+
73+
// Flush waits till all in-flight messages are acknowledged.
74+
func (w *Writer) Flush(ctx context.Context) error {
75+
return w.inner.Flush(ctx)
76+
}

0 commit comments

Comments
 (0)