Skip to content

Commit f7e98f3

Browse files
committed
fix generation and linter
1 parent 88e3dbc commit f7e98f3

File tree

8 files changed

+46
-11
lines changed

8 files changed

+46
-11
lines changed

internal/topic/topicreadercommon/batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func splitBytesByMessagesInBatches(batches []*PublicBatch, totalBytesCount int)
178178
return nil
179179
}
180180

181-
func BatchAppend(original *PublicBatch, appended *PublicBatch) (*PublicBatch, error) {
181+
func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error) {
182182
var res *PublicBatch
183183
if original == nil {
184184
res = &PublicBatch{}

internal/topic/topicreadercommon/split_batches.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package topicreadercommon
22

33
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
44

5-
func ReadRawBatchesToPublicBatches(msg *rawtopicreader.ReadResponse, sessions *PartitionSessionStorage, decoders DecoderMap) ([]*PublicBatch, error) {
5+
func ReadRawBatchesToPublicBatches(
6+
msg *rawtopicreader.ReadResponse,
7+
sessions *PartitionSessionStorage,
8+
decoders DecoderMap,
9+
) ([]*PublicBatch, error) {
610
batchesCount := 0
711
for i := range msg.PartitionData {
812
batchesCount += len(msg.PartitionData[i].Batches)

internal/topic/topicreaderinternal/batcher.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ func (b *batcher) PushBatches(batches ...*topicreadercommon.PublicBatch) error {
5959
}
6060

6161
for _, batch := range batches {
62-
if err := b.addNeedLock(topicreadercommon.GetCommitRange(batch).PartitionSession, newBatcherItemBatch(batch)); err != nil {
62+
if err := b.addNeedLock(
63+
topicreadercommon.GetCommitRange(batch).PartitionSession,
64+
newBatcherItemBatch(batch),
65+
); err != nil {
6366
return err
6467
}
6568
}
@@ -135,7 +138,10 @@ func (o batcherGetOptions) cutBatchItemsHead(items batcherMessageOrderItems) (
135138
return items[0], items[1:], true
136139
}
137140

138-
func (o batcherGetOptions) splitBatch(batch *topicreadercommon.PublicBatch) (head, rest *topicreadercommon.PublicBatch, ok bool) {
141+
func (o batcherGetOptions) splitBatch(batch *topicreadercommon.PublicBatch) (
142+
head, rest *topicreadercommon.PublicBatch,
143+
ok bool,
144+
) {
139145
notFound := func() (*topicreadercommon.PublicBatch, *topicreadercommon.PublicBatch, bool) {
140146
return nil, nil, false
141147
}

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ func TestBatcher_PushRawMessage(t *testing.T) {
103103
expectedMap := batcherMessagesMap{session: batcherMessageOrderItems{
104104
newBatcherItemBatch(batch1),
105105
newBatcherItemRawMessage(m),
106-
newBatcherItemBatch(mustNewBatch(session, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(2)}, {WrittenAt: testTime(3)}})),
106+
newBatcherItemBatch(mustNewBatch(
107+
session,
108+
[]*topicreadercommon.PublicMessage{{WrittenAt: testTime(2)}, {WrittenAt: testTime(3)}},
109+
)),
107110
}}
108111
require.Equal(t, expectedMap, b.messages)
109112
})
@@ -219,7 +222,10 @@ func TestBatcher_Pop(t *testing.T) {
219222
b := newBatcher()
220223
m := &rawtopicreader.StopPartitionSessionRequest{PartitionSessionID: 1}
221224

222-
require.NoError(t, b.PushBatches(mustNewBatch(session1, []*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}})))
225+
require.NoError(t, b.PushBatches(mustNewBatch(
226+
session1,
227+
[]*topicreadercommon.PublicMessage{{WrittenAt: testTime(1)}},
228+
)))
223229
require.NoError(t, b.PushRawMessage(session2, m))
224230

225231
res, err := b.Pop(context.Background(), batcherGetOptions{})

internal/topic/topicreaderinternal/committer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,10 @@ type commitWaiter struct {
263263
Committed empty.Chan
264264
}
265265

266-
func (w *commitWaiter) checkCondition(session *topicreadercommon.PartitionSession, offset rawtopicreader.Offset) (finished bool) {
266+
func (w *commitWaiter) checkCondition(
267+
session *topicreadercommon.PartitionSession,
268+
offset rawtopicreader.Offset,
269+
) (finished bool) {
267270
return session == w.Session && offset >= w.EndOffset
268271
}
269272

internal/topic/topicreaderinternal/reader.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,13 @@ func (r *Reader) ReadMessage(ctx context.Context) (*topicreadercommon.PublicMess
140140

141141
// ReadMessageBatch read batch of messages.
142142
// Batch is collection of messages, which can be atomically committed
143-
func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOption) (batch *topicreadercommon.PublicBatch, err error) {
143+
func (r *Reader) ReadMessageBatch(
144+
ctx context.Context,
145+
opts ...PublicReadBatchOption,
146+
) (
147+
batch *topicreadercommon.PublicBatch,
148+
err error,
149+
) {
144150
readOptions := r.defaultBatchConfig.clone()
145151

146152
for _, opt := range opts {

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ func (r *topicStreamReaderImpl) initSession() (err error) {
461461
}
462462

463463
if status := resp.StatusData(); !status.Status.IsSuccess() {
464-
// TODO: better handler status error
464+
// Need wrap status to common ydb operational error
465+
// https://github.com/ydb-platform/ydb-go-sdk/issues/1361
465466
return xerrors.WithStackTrace(fmt.Errorf("bad status on initial error: %v (%v)", status.Status, status.Issues))
466467
}
467468

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ func newReaderReconnector(
7777
return res
7878
}
7979

80-
func (r *readerReconnector) ReadMessageBatch(ctx context.Context, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) {
80+
func (r *readerReconnector) ReadMessageBatch(
81+
ctx context.Context,
82+
opts ReadMessageBatchOptions,
83+
) (
84+
*topicreadercommon.PublicBatch,
85+
error,
86+
) {
8187
if ctx.Err() != nil {
8288
return nil, ctx.Err()
8389
}
@@ -127,7 +133,10 @@ func (r *readerReconnector) ReadMessageBatch(ctx context.Context, opts ReadMessa
127133
}
128134
}
129135

130-
func (r *readerReconnector) Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error {
136+
func (r *readerReconnector) Commit(
137+
ctx context.Context,
138+
commitRange topicreadercommon.CommitRange,
139+
) error {
131140
stream, err := r.stream(ctx)
132141
if err != nil {
133142
return err

0 commit comments

Comments
 (0)