Skip to content

Commit 859a140

Browse files
committed
Fix tests hungup
1 parent 6cc59c7 commit 859a140

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,17 @@ func (w *WriterReconnector) Flush(ctx context.Context) error {
330330
}
331331

332332
func (w *WriterReconnector) Close(ctx context.Context) error {
333-
return w.close(ctx, xerrors.WithStackTrace(errStopWriterReconnector))
333+
reason := xerrors.WithStackTrace(errStopWriterReconnector)
334+
w.queue.StopAddNewMessages(reason)
335+
336+
flushErr := w.Flush(ctx)
337+
closeErr := w.close(ctx, reason)
338+
339+
if flushErr != nil {
340+
return flushErr
341+
}
342+
343+
return closeErr
334344
}
335345

336346
func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) {
@@ -339,10 +349,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
339349
onDone(resErr)
340350
}()
341351

342-
w.queue.StopAddNewMessages(reason)
343-
344-
resErr = w.Flush(ctx)
345-
346352
closeErr := w.queue.Close(reason)
347353
if resErr == nil && closeErr != nil {
348354
resErr = closeErr

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
196196
Data: bytes.NewReader(messContent),
197197
}}))
198198

199-
require.Equal(t, rawtopiccommon.CodecRaw, <-messReceived)
199+
mess := <-messReceived
200+
require.Equal(t, rawtopiccommon.CodecRaw, mess)
200201
})
201202
t.Run("ForceGzip", func(t *testing.T) {
202203
var err error

0 commit comments

Comments
 (0)