Skip to content

Commit 3a94697

Browse files
committed
init zap logging
1 parent 775da3f commit 3a94697

File tree

7 files changed

+153
-189
lines changed

7 files changed

+153
-189
lines changed

internal/topic/topicreaderinternal/reader.go

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -122,22 +122,6 @@ func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOp
122122
readOptions = optFunc.Apply(readOptions)
123123
}
124124

125-
onDone := trace.TopicOnReaderReadMessages(r.tracer, ctx, readOptions.MinCount, readOptions.MaxCount)
126-
defer func() {
127-
if batch == nil {
128-
onDone(-1, "", -1, -1, -1, err)
129-
} else {
130-
onDone(
131-
len(batch.Messages),
132-
batch.Topic(),
133-
batch.PartitionID(),
134-
batch.commitRange.commitOffsetStart.ToInt64(),
135-
batch.commitRange.commitOffsetEnd.ToInt64(),
136-
err,
137-
)
138-
}
139-
}()
140-
141125
forReadBatch:
142126
for {
143127
if err = ctx.Err(); err != nil {
@@ -159,27 +143,7 @@ forReadBatch:
159143
}
160144

161145
func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error) {
162-
cr := offsets.getCommitRange()
163-
164-
var session partitionSession
165-
if cr.priv.partitionSession != nil {
166-
session = *cr.priv.partitionSession
167-
}
168-
169-
onDone := trace.TopicOnReaderCommit(
170-
r.tracer,
171-
ctx,
172-
session.Topic,
173-
session.PartitionID,
174-
session.partitionSessionID.ToInt64(),
175-
cr.priv.commitOffsetStart.ToInt64(),
176-
cr.priv.commitOffsetEnd.ToInt64(),
177-
)
178-
defer func() {
179-
onDone(err)
180-
}()
181-
182-
return r.reader.Commit(ctx, cr.priv)
146+
return r.reader.Commit(ctx, offsets.getCommitRange().priv)
183147
}
184148

185149
func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error {

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
279279
return err
280280
}
281281

282-
trace.TopicOnReaderStreamPartitionReadStop(
282+
trace.TopicOnReaderPartitionReadStop(
283283
r.cfg.Tracer,
284284
r.readConnectionID,
285285
session.Context(),
@@ -320,8 +320,26 @@ func (r *topicStreamReaderImpl) onPartitionSessionStatusResponseFromBuffer(
320320
panic("not implemented")
321321
}
322322

323-
func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRange) error {
324-
if err := r.checkCommitRange(commitRange); err != nil {
323+
func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRange) (err error) {
324+
session := partitionSession{}
325+
if commitRange.partitionSession != nil {
326+
session = *commitRange.partitionSession
327+
}
328+
329+
onDone := trace.TopicOnReaderStreamCommit(
330+
r.cfg.Tracer,
331+
ctx,
332+
session.Topic,
333+
session.PartitionID,
334+
session.partitionSessionID.ToInt64(),
335+
commitRange.commitOffsetStart.ToInt64(),
336+
commitRange.commitOffsetEnd.ToInt64(),
337+
)
338+
defer func() {
339+
onDone(err)
340+
}()
341+
342+
if err = r.checkCommitRange(commitRange); err != nil {
325343
return err
326344
}
327345
return r.committer.Commit(ctx, commitRange)
@@ -686,17 +704,32 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequest(m *rawtopicreader
686704

687705
func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
688706
m *rawtopicreader.StartPartitionSessionRequest,
689-
) error {
707+
) (err error) {
690708
session, err := r.sessionController.Get(m.PartitionSession.PartitionSessionID)
691709
if err != nil {
692710
return err
693711
}
694712

713+
onDone := trace.TopicOnReaderPartitionReadStartResponse(
714+
r.cfg.Tracer,
715+
r.readConnectionID,
716+
session.Context(),
717+
session.Topic,
718+
session.PartitionID,
719+
session.partitionSessionID.ToInt64(),
720+
)
721+
695722
respMessage := &rawtopicreader.StartPartitionSessionResponse{
696723
PartitionSessionID: session.partitionSessionID,
697724
}
698725

699726
var forceOffset *int64
727+
var commitOffset *int64
728+
729+
defer func() {
730+
onDone(forceOffset, commitOffset, err)
731+
}()
732+
700733
if r.cfg.GetPartitionStartOffsetCallback != nil {
701734
req := PublicGetPartitionStartOffsetRequest{
702735
Topic: session.Topic,
@@ -714,23 +747,14 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
714747

715748
respMessage.ReadOffset.FromInt64Pointer(forceOffset)
716749
if r.cfg.CommitMode.commitsEnabled() {
717-
respMessage.CommitOffset.FromInt64Pointer(forceOffset)
750+
commitOffset = forceOffset
751+
respMessage.CommitOffset.FromInt64Pointer(commitOffset)
718752
}
719753

720754
if err = r.send(respMessage); err != nil {
721755
return err
722756
}
723757

724-
trace.TopicOnReaderStreamPartitionReadStart(
725-
r.cfg.Tracer,
726-
r.readConnectionID,
727-
session.Context(),
728-
session.Topic,
729-
session.PartitionID,
730-
forceOffset,
731-
forceOffset,
732-
)
733-
734758
return nil
735759
}
736760

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
145145
readMessagesCtx, readMessagesCtxCancel := xcontext.WithErrCancel(context.Background())
146146
committedOffset := int64(222)
147147

148-
e.reader.cfg.Tracer.OnReaderStreamPartitionReadStop = func(info trace.OnReaderStreamPartitionReadStopInfo) {
148+
e.reader.cfg.Tracer.OnReaderPartitionReadStop = func(info trace.OnReaderStreamPartitionReadStopInfo) {
149149
expected := trace.OnReaderStreamPartitionReadStopInfo{
150150
ReaderConnectionID: e.reader.readConnectionID,
151151
PartitionContext: e.partitionSession.ctx,
@@ -184,7 +184,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
184184
readMessagesCtx, readMessagesCtxCancel := xcontext.WithErrCancel(context.Background())
185185
committedOffset := int64(222)
186186

187-
e.reader.cfg.Tracer.OnReaderStreamPartitionReadStop = func(info trace.OnReaderStreamPartitionReadStopInfo) {
187+
e.reader.cfg.Tracer.OnReaderPartitionReadStop = func(info trace.OnReaderStreamPartitionReadStopInfo) {
188188
expected := trace.OnReaderStreamPartitionReadStopInfo{
189189
ReaderConnectionID: e.reader.readConnectionID,
190190
PartitionContext: e.partitionSession.ctx,

topic/topicreader/reader_example_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,13 @@ func Example_readWithExplicitPartitionStartStopHandler() {
197197
reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
198198
topicoptions.WithTracer(
199199
trace.Topic{
200-
OnReaderStreamPartitionReadStart: func(info trace.OnReaderStreamPartitionReadStartInfo) {
200+
OnReaderPartitionReadStartResponse: func(info trace.OnReaderStreamPartitionReadStartInfo) {
201201
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
202202
if err != nil {
203203
stopReader()
204204
}
205205
},
206-
OnReaderStreamPartitionReadStop: func(info trace.OnReaderStreamPartitionReadStopInfo) {
206+
OnReaderPartitionReadStop: func(info trace.OnReaderStreamPartitionReadStopInfo) {
207207
if info.Graceful {
208208
err := externalSystemUnlock(ctx, info.Topic, info.PartitionID)
209209
if err != nil {
@@ -272,8 +272,8 @@ func Example_readWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(
272272
topicoptions.WithGetPartitionStartOffset(readStartPosition),
273273
topicoptions.WithTracer(
274274
trace.Topic{
275-
OnReaderStreamPartitionReadStart: onPartitionStart,
276-
OnReaderStreamPartitionReadStop: onPartitionStop,
275+
OnReaderPartitionReadStartResponse: onPartitionStart,
276+
OnReaderPartitionReadStop: onPartitionStop,
277277
},
278278
),
279279
)

trace/details.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ const (
2323
TablePoolAPIEvents
2424

2525
TopicControlPlaneEvents
26+
27+
TopicReaderStreamLifeCycleEvents
2628
TopicReaderStreamEvents
2729
TopicReaderPartitionEvents
28-
TopicReaderMessageEvents
2930

3031
RetryEvents
3132

@@ -67,10 +68,11 @@ const (
6768
TableSessionQueryEvents |
6869
TableSessionTransactionEvents
6970

70-
TopicEvents = TopicControlPlaneEvents |
71-
TopicReaderStreamEvents |
71+
TopicReaderAllEvents = TopicReaderStreamEvents |
7272
TopicReaderPartitionEvents |
73-
TopicReaderMessageEvents
73+
TopicReaderStreamLifeCycleEvents
74+
75+
TopicAllEvents = TopicControlPlaneEvents | TopicReaderAllEvents
7476

7577
DetailsAll = ^Details(0) // All bits enabled
7678
)
@@ -106,11 +108,12 @@ var (
106108
TablePoolSessionLifeCycleEvents: "ydb.table.pool.session",
107109
TablePoolAPIEvents: "ydb.table.pool.api",
108110

109-
TopicEvents: "ydb.topic",
110-
TopicControlPlaneEvents: "ydb.topic.controlplane",
111-
TopicReaderStreamEvents: "ydb.topic.reader.state",
112-
TopicReaderPartitionEvents: "ydb.topic.reader.partition",
113-
TopicReaderMessageEvents: "ydb.topic.reader.message",
111+
TopicAllEvents: "ydb.topic",
112+
TopicControlPlaneEvents: "ydb.topic.controlplane",
113+
TopicReaderAllEvents: "ydb.topic.reader",
114+
TopicReaderStreamEvents: "ydb.topic.reader.stream",
115+
TopicReaderPartitionEvents: "ydb.topic.reader.partition",
116+
TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle",
114117
}
115118
defaultDetails = DetailsAll
116119
)

trace/topic.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ type (
2020
OnReaderConnect func(TableReaderConnectStartInfo) func(TableReaderConnectDoneInfo)
2121
OnReaderReconnect func(TableReaderReconnectStartInfo) func(TableReaderReconnectDoneInfo)
2222
OnReaderReconnectRequest func(TableReaderReconnectRequestInfo)
23-
OnReaderReadMessages func(TableReaderReadMessagesStartInfo) func(TableReaderReadMessagesDoneInfo)
24-
OnReaderCommit func(TableReaderCommitStartInfo) func(TableReaderCommitDoneInfo)
2523

24+
OnReaderPartitionReadStartResponse func(TableReaderPartitionReadStartResponseStartInfo) func(TableReaderPartitionReadStartResponseDoneInfo)
25+
OnReaderPartitionReadStop func(TableReaderPartitionReadStopInfo)
26+
27+
OnReaderStreamCommit func(TableReaderStreamCommitStartInfo) func(TableReaderStreamCommitDoneInfo)
2628
OnReaderStreamSentCommitMessage func(TableReaderStreamSentCommitMessageStartInfo) func(TableReaderStreamSentCommitMessageDoneInfo)
2729
OnReaderStreamCommittedNotify func(TableReaderStreamCommittedInfo)
28-
OnReaderStreamPartitionReadStart func(TableReaderStreamPartitionReadStartInfo)
29-
OnReaderStreamPartitionReadStop func(TableReaderStreamPartitionReadStopInfo)
3030
OnReaderStreamClose func(TableReaderStreamCloseStartInfo) func(TableReaderStreamCloseDoneInfo)
3131
OnReaderStreamInit func(TableReaderStreamInitStartInfo) func(TableReaderStreamInitDoneInfo)
3232
OnReaderStreamError func(TableReaderStreamErrorInfo)
@@ -37,27 +37,37 @@ type (
3737
OnReaderStreamUpdateToken func(OnReadStreamUpdateTokenStartInfo) func(OnReadStreamUpdateTokenMiddleTokenReceivedInfo) func(OnReadStreamUpdateTokenDoneInfo)
3838
}
3939

40-
// TableReaderStreamPartitionReadStartInfo
40+
// TableReaderPartitionReadStartResponseStartInfo
4141
// Experimental
4242
//
4343
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
4444
// later release.
45-
TableReaderStreamPartitionReadStartInfo struct {
45+
TableReaderPartitionReadStartResponseStartInfo struct {
4646
ReaderConnectionID string
4747
PartitionContext context.Context
4848
Topic string
4949
PartitionID int64
50-
ReadOffset *int64
51-
CommitOffset *int64
50+
PartitionSessionID int64
51+
}
52+
53+
// TableReaderPartitionReadStartResponseDoneInfo
54+
// Experimental
55+
//
56+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
57+
// later release.
58+
TableReaderPartitionReadStartResponseDoneInfo struct {
59+
ReadOffset *int64
60+
CommitOffset *int64
61+
Error error
5262
}
5363

54-
// TableReaderStreamPartitionReadStopInfo
64+
// TableReaderPartitionReadStopInfo
5565
//
5666
// Experimental
5767
//
5868
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
5969
// later release.
60-
TableReaderStreamPartitionReadStopInfo struct {
70+
TableReaderPartitionReadStopInfo struct {
6171
ReaderConnectionID string
6272
PartitionContext context.Context
6373
Topic string
@@ -247,13 +257,13 @@ type (
247257
MaxCount int
248258
}
249259

250-
// TableReaderCommitStartInfo
260+
// TableReaderStreamCommitStartInfo
251261
//
252262
// Experimental
253263
//
254264
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
255265
// later release.
256-
TableReaderCommitStartInfo struct {
266+
TableReaderStreamCommitStartInfo struct {
257267
RequestContext context.Context
258268
Topic string
259269
PartitionID int64
@@ -262,13 +272,13 @@ type (
262272
EndOffset int64
263273
}
264274

265-
// TableReaderCommitDoneInfo
275+
// TableReaderStreamCommitDoneInfo
266276
//
267277
// Experimental
268278
//
269279
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
270280
// later release.
271-
TableReaderCommitDoneInfo struct {
281+
TableReaderStreamCommitDoneInfo struct {
272282
Error error
273283
}
274284

0 commit comments

Comments
 (0)