Skip to content

Commit 911ccc2

Browse files
committed
fix linter
1 parent f7e98f3 commit 911ccc2

File tree

8 files changed

+88
-18
lines changed

8 files changed

+88
-18
lines changed

internal/topic/topicreadercommon/batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,5 +226,6 @@ func BatchGetPartitionSessionID(item *PublicBatch) rawtopicreader.PartitionSessi
226226

227227
func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch {
228228
b.commitRange = commitRange
229+
229230
return b
230231
}

internal/topic/topicreadercommon/message.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMes
196196

197197
func (pmb *PublicMessageBuilder) CommitRange(cr CommitRange) *PublicMessageBuilder {
198198
pmb.mess.commitRange = cr
199+
199200
return pmb
200201
}
201202

@@ -223,6 +224,7 @@ func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) {
223224

224225
func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder {
225226
pmb.mess.rawDataLen = val
227+
226228
return pmb
227229
}
228230

@@ -240,6 +242,7 @@ func MessageGetBufferBytesAccount(m *PublicMessage) int {
240242

241243
func MessageWithSetCommitRangeForTest(m *PublicMessage, commitRange CommitRange) *PublicMessage {
242244
m.commitRange = commitRange
245+
243246
return m
244247
}
245248

internal/topic/topicreadercommon/partition_session_storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (c *PartitionSessionStorage) GetAll() []*PartitionSession {
6767
for _, s := range c.sessions {
6868
res = append(res, s.Session)
6969
}
70+
7071
return res
7172
}
7273

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,16 +387,25 @@ func TestBatcher_Find(t *testing.T) {
387387

388388
t.Run("FoundPartialBatchFilter", func(t *testing.T) {
389389
session := &topicreadercommon.PartitionSession{}
390-
batch := mustNewBatch(session, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}, {WrittenAt: testTime(2)}})
390+
batch := mustNewBatch(
391+
session,
392+
[]*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}, {WrittenAt: testTime(2)}},
393+
)
391394

392395
b := newBatcher()
393396

394397
require.NoError(t, b.PushBatches(batch))
395398

396399
findRes := b.findNeedLock(batcherGetOptions{MaxCount: 1})
397400

398-
expectedResult := newBatcherItemBatch(mustNewBatch(session, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}}))
399-
expectedRestBatch := newBatcherItemBatch(mustNewBatch(session, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(2)}}))
401+
expectedResult := newBatcherItemBatch(mustNewBatch(
402+
session,
403+
[]*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}},
404+
))
405+
expectedRestBatch := newBatcherItemBatch(mustNewBatch(
406+
session,
407+
[]*topicreadercommon.PublicMessage{{WrittenAt: testTime(2)}},
408+
))
400409

401410
expectedCandidate := batcherResultCandidate{
402411
Key: session,
@@ -447,7 +456,11 @@ func TestBatcher_Apply(t *testing.T) {
447456
func TestBatcherGetOptions_Split(t *testing.T) {
448457
t.Run("Empty", func(t *testing.T) {
449458
opts := batcherGetOptions{}
450-
batch := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}, {WrittenAt: testTime(2)}})
459+
batch := mustNewBatch(nil, []*topicreadercommon.PublicMessage{
460+
{WrittenAt: testTime(1)},
461+
{WrittenAt: testTime(2)},
462+
},
463+
)
451464
head, rest, ok := opts.splitBatch(batch)
452465

453466
require.Equal(t, batch, head)
@@ -457,7 +470,11 @@ func TestBatcherGetOptions_Split(t *testing.T) {
457470
t.Run("MinCount", func(t *testing.T) {
458471
opts := batcherGetOptions{MinCount: 2}
459472
batch1 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}})
460-
batch2 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}, {WrittenAt: testTime(2)}})
473+
batch2 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{
474+
{WrittenAt: testTime(1)},
475+
{WrittenAt: testTime(2)},
476+
},
477+
)
461478

462479
head, rest, ok := opts.splitBatch(batch1)
463480
require.True(t, topicreadercommon.BatchIsEmpty(head))
@@ -472,7 +489,11 @@ func TestBatcherGetOptions_Split(t *testing.T) {
472489
t.Run("MaxCount", func(t *testing.T) {
473490
opts := batcherGetOptions{MaxCount: 2}
474491
batch1 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}})
475-
batch2 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}, {WrittenAt: testTime(2)}})
492+
batch2 := mustNewBatch(nil, []*topicreadercommon.PublicMessage{
493+
{WrittenAt: testTime(1)},
494+
{WrittenAt: testTime(2)},
495+
},
496+
)
476497
batch3 := mustNewBatch(
477498
nil,
478499
[]*topicreadercommon.PublicMessage{
@@ -494,8 +515,20 @@ func TestBatcherGetOptions_Split(t *testing.T) {
494515
require.True(t, ok)
495516

496517
head, rest, ok = opts.splitBatch(batch3)
497-
expectedHead := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(11)}, {WrittenAt: testTime(12)}})
498-
expectedRest := mustNewBatch(nil, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(13)}, {WrittenAt: testTime(14)}})
518+
expectedHead := mustNewBatch(
519+
nil,
520+
[]*topicreadercommon.PublicMessage{
521+
{WrittenAt: testTime(11)},
522+
{WrittenAt: testTime(12)},
523+
},
524+
)
525+
expectedRest := mustNewBatch(
526+
nil,
527+
[]*topicreadercommon.PublicMessage{
528+
{WrittenAt: testTime(13)},
529+
{WrittenAt: testTime(14)},
530+
},
531+
)
499532
require.Equal(t, expectedHead, head)
500533
require.Equal(t, expectedRest, rest)
501534
require.True(t, ok)
@@ -509,7 +542,10 @@ func TestBatcher_Fire(t *testing.T) {
509542
})
510543
}
511544

512-
func mustNewBatch(session *topicreadercommon.PartitionSession, messages []*topicreadercommon.PublicMessage) *topicreadercommon.PublicBatch {
545+
func mustNewBatch(
546+
session *topicreadercommon.PartitionSession,
547+
messages []*topicreadercommon.PublicMessage,
548+
) *topicreadercommon.PublicBatch {
513549
batch, err := topicreadercommon.NewBatch(session, messages)
514550
if err != nil {
515551
panic(err)

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,10 @@ func newTestCommitter(ctx context.Context, t testing.TB) *committer {
396396
return res
397397
}
398398

399-
func newTestPartitionSession(ctx context.Context, partitionSessionID rawtopicreader.PartitionSessionID) *topicreadercommon.PartitionSession {
399+
func newTestPartitionSession(
400+
ctx context.Context,
401+
partitionSessionID rawtopicreader.PartitionSessionID,
402+
) *topicreadercommon.PartitionSession {
400403
return topicreadercommon.NewPartitionSession(
401404
ctx,
402405
"",

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,15 @@ func TestReader_Close(t *testing.T) {
7373
readerReadMessageBatchState := newCallState()
7474

7575
go func() {
76-
readerCommitState.err = reader.Commit(context.Background(), topicreadercommon.MessageWithSetCommitRangeForTest(&topicreadercommon.PublicMessage{}, topicreadercommon.CommitRange{
77-
PartitionSession: &topicreadercommon.PartitionSession{},
78-
}))
76+
readerCommitState.err = reader.Commit(
77+
context.Background(),
78+
topicreadercommon.MessageWithSetCommitRangeForTest(
79+
&topicreadercommon.PublicMessage{},
80+
topicreadercommon.CommitRange{
81+
PartitionSession: &topicreadercommon.PartitionSession{},
82+
},
83+
),
84+
)
7985
close(readerCommitState.callCompleted)
8086
}()
8187

@@ -136,7 +142,13 @@ func TestReader_Commit(t *testing.T) {
136142

137143
testErr := errors.New("test err")
138144
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeErr).Return(testErr)
139-
require.ErrorIs(t, reader.Commit(context.Background(), topicreadercommon.MessageWithSetCommitRangeForTest(&topicreadercommon.PublicMessage{}, expectedRangeErr)), testErr)
145+
require.ErrorIs(t, reader.Commit(
146+
context.Background(),
147+
topicreadercommon.MessageWithSetCommitRangeForTest(
148+
&topicreadercommon.PublicMessage{},
149+
expectedRangeErr,
150+
),
151+
), testErr)
140152
})
141153

142154
t.Run("CommitFromOtherReader", func(t *testing.T) {
@@ -168,7 +180,10 @@ func TestReader_WaitInit(t *testing.T) {
168180
require.NoError(t, err)
169181
}
170182

171-
func newTestPartitionSessionReaderID(readerID int64, partitionSessionID rawtopicreader.PartitionSessionID) *topicreadercommon.PartitionSession {
183+
func newTestPartitionSessionReaderID(
184+
readerID int64,
185+
partitionSessionID rawtopicreader.PartitionSessionID,
186+
) *topicreadercommon.PartitionSession {
172187
return topicreadercommon.NewPartitionSession(
173188
context.Background(),
174189
"",

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
244244
opts.MinCount = 2
245245
batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
246246
require.NoError(t, err)
247-
require.ErrorIs(t, e.reader.Commit(e.ctx, topicreadercommon.GetCommitRange(batch.Messages[1])), ErrWrongCommitOrderInSyncMode)
247+
require.ErrorIs(t, e.reader.Commit(
248+
e.ctx,
249+
topicreadercommon.GetCommitRange(batch.Messages[1]),
250+
), ErrWrongCommitOrderInSyncMode)
248251
xtest.WaitChannelClosed(t, readRequestReceived)
249252
})
250253

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,14 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
161161
defer mc.Finish()
162162

163163
stream := NewMockbatchedStreamReader(mc)
164-
stream.EXPECT().Commit(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, offset topicreadercommon.CommitRange) {
164+
stream.EXPECT().Commit(
165+
gomock.Any(),
166+
gomock.Any(),
167+
).Do(func(ctx context.Context, offset topicreadercommon.CommitRange) {
165168
require.Equal(t, "v", ctx.Value(k{}))
166169
require.Equal(t, expectedCommitRange, offset)
167170
})
171+
168172
reconnector := &readerReconnector{
169173
streamVal: stream,
170174
streamContextCancel: func(cause error) {},
@@ -176,10 +180,14 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
176180
t.Run("StreamOkCommitErr", func(t *testing.T) {
177181
mc := gomock.NewController(t)
178182
stream := NewMockbatchedStreamReader(mc)
179-
stream.EXPECT().Commit(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, offset topicreadercommon.CommitRange) {
183+
stream.EXPECT().Commit(
184+
gomock.Any(),
185+
gomock.Any(),
186+
).Do(func(ctx context.Context, offset topicreadercommon.CommitRange) {
180187
require.Equal(t, "v", ctx.Value(k{}))
181188
require.Equal(t, expectedCommitRange, offset)
182189
}).Return(testErr)
190+
183191
reconnector := &readerReconnector{
184192
streamVal: stream,
185193
streamContextCancel: func(cause error) {},

0 commit comments

Comments
 (0)