Skip to content

Commit 42f0271

Browse files
committed
trace changes
1 parent f12a33e commit 42f0271

File tree

14 files changed

+927
-937
lines changed

14 files changed

+927
-937
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Changed `trace.Table` and `trace.Query` traces
2+
13
## v3.79.0
24
* Added commit messages for topic listener
35
* EOF error in RecvMsg is no longer logged

internal/pool/defaults.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,3 @@
11
package pool
22

33
const DefaultLimit = 50
4-
5-
var defaultTrace = &Trace{
6-
OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) {
7-
return func(info *NewDoneInfo) {
8-
}
9-
},
10-
OnClose: func(info *CloseStartInfo) func(info *CloseDoneInfo) {
11-
return func(info *CloseDoneInfo) {
12-
}
13-
},
14-
OnTry: func(info *TryStartInfo) func(info *TryDoneInfo) {
15-
return func(info *TryDoneInfo) {
16-
}
17-
},
18-
OnWith: func(info *WithStartInfo) func(info *WithDoneInfo) {
19-
return func(info *WithDoneInfo) {
20-
}
21-
},
22-
OnPut: func(info *PutStartInfo) func(info *PutDoneInfo) {
23-
return func(info *PutDoneInfo) {
24-
}
25-
},
26-
OnGet: func(info *GetStartInfo) func(info *GetDoneInfo) {
27-
return func(info *GetDoneInfo) {
28-
}
29-
},
30-
OnChange: func(info ChangeInfo) {},
31-
}

internal/pool/pool.go

Lines changed: 79 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func New[PT Item[T], T any](
7777
) *Pool[PT, T] {
7878
p := &Pool[PT, T]{
7979
config: Config[PT, T]{
80-
trace: defaultTrace,
80+
trace: &Trace{},
8181
limit: DefaultLimit,
8282
createItem: defaultCreateItem[T, PT],
8383
},
@@ -90,16 +90,16 @@ func New[PT Item[T], T any](
9090
}
9191
}
9292

93-
onDone := p.config.trace.OnNew(&NewStartInfo{
94-
Context: &ctx,
95-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"),
96-
})
97-
98-
defer func() {
99-
onDone(&NewDoneInfo{
100-
Limit: p.config.limit,
101-
})
102-
}()
93+
if onNew := p.config.trace.OnNew; onNew != nil {
94+
onDone := onNew(&ctx,
95+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"),
96+
)
97+
if onDone != nil {
98+
defer func() {
99+
onDone(p.config.limit)
100+
}()
101+
}
102+
}
103103

104104
p.createItem = makeCreateItemFunc(p.config, p.done, func(item PT) error {
105105
return xsync.WithLock(&p.mu, func() error {
@@ -195,14 +195,21 @@ func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen
195195
}
196196
}
197197

198+
func (p *Pool[PT, T]) stats() Stats {
199+
return Stats{
200+
Limit: p.config.limit,
201+
Idle: len(p.idle),
202+
Wait: 0,
203+
CreateInProgress: 0,
204+
}
205+
}
206+
198207
func (p *Pool[PT, T]) onChangeStats() {
199-
p.mu.RLock()
200-
info := ChangeInfo{
201-
Limit: p.config.limit,
202-
Idle: len(p.idle),
208+
if onChange := p.config.trace.OnChange; onChange != nil {
209+
onChange(xsync.WithRLock(&p.mu, func() Stats {
210+
return p.stats()
211+
}))
203212
}
204-
p.mu.RUnlock()
205-
p.config.trace.OnChange(info)
206213
}
207214

208215
func (p *Pool[PT, T]) Stats() Stats {
@@ -229,18 +236,19 @@ func (p *Pool[PT, T]) getItemFromIdle() (item PT) {
229236
return item
230237
}
231238

232-
func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
233-
onDone := p.config.trace.OnGet(&GetStartInfo{
234-
Context: &ctx,
235-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"),
236-
})
237-
defer func() {
238-
onDone(&GetDoneInfo{
239-
Error: finalErr,
240-
})
241-
}()
239+
func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) {
240+
if onGet := p.config.trace.OnGet; onGet != nil {
241+
onDone := onGet(&ctx,
242+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"),
243+
)
244+
if onDone != nil {
245+
defer func() {
246+
onDone(item, 0, finalErr)
247+
}()
248+
}
249+
}
242250

243-
item := p.getItemFromIdle()
251+
item = p.getItemFromIdle()
244252

245253
if item != nil {
246254
if item.IsAlive() {
@@ -267,16 +275,17 @@ func (p *Pool[PT, T]) appendItemToIdle(item PT) {
267275
}
268276

269277
func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
270-
onDone := p.config.trace.OnPut(&PutStartInfo{
271-
Context: &ctx,
272-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"),
273-
})
274-
defer func() {
275-
onDone(&PutDoneInfo{
276-
Error: finalErr,
277-
})
278-
}()
279-
278+
if onPut := p.config.trace.OnPut; onPut != nil {
279+
onDone := onPut(&ctx,
280+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"),
281+
item,
282+
)
283+
if onDone != nil {
284+
defer func() {
285+
onDone(finalErr)
286+
}()
287+
}
288+
}
280289
if !item.IsAlive() {
281290
p.closeItem(ctx, item)
282291

@@ -324,15 +333,16 @@ func makeAsyncCloseItemFunc[PT Item[T], T any](
324333
}
325334

326335
func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
327-
onDone := p.config.trace.OnTry(&TryStartInfo{
328-
Context: &ctx,
329-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
330-
})
331-
defer func() {
332-
onDone(&TryDoneInfo{
333-
Error: finalErr,
334-
})
335-
}()
336+
if onTry := p.config.trace.OnTry; onTry != nil {
337+
onDone := onTry(&ctx,
338+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
339+
)
340+
if onDone != nil {
341+
defer func() {
342+
onDone(finalErr)
343+
}()
344+
}
345+
}
336346

337347
select {
338348
case <-p.done:
@@ -373,19 +383,18 @@ func (p *Pool[PT, T]) With(
373383
f func(ctx context.Context, item PT) error,
374384
opts ...retry.Option,
375385
) (finalErr error) {
376-
var (
377-
onDone = p.config.trace.OnWith(&WithStartInfo{
378-
Context: &ctx,
379-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"),
380-
})
381-
attempts int
382-
)
383-
defer func() {
384-
onDone(&WithDoneInfo{
385-
Error: finalErr,
386-
Attempts: attempts,
387-
})
388-
}()
386+
var attempts int
387+
388+
if onWith := p.config.trace.OnWith; onWith != nil {
389+
onDone := onWith(&ctx,
390+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"),
391+
)
392+
if onDone != nil {
393+
defer func() {
394+
onDone(attempts, finalErr)
395+
}()
396+
}
397+
}
389398

390399
err := retry.Retry(ctx, func(ctx context.Context) error {
391400
err := p.try(ctx, f)
@@ -409,15 +418,16 @@ func (p *Pool[PT, T]) With(
409418
}
410419

411420
func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
412-
onDone := p.config.trace.OnClose(&CloseStartInfo{
413-
Context: &ctx,
414-
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"),
415-
})
416-
defer func() {
417-
onDone(&CloseDoneInfo{
418-
Error: finalErr,
419-
})
420-
}()
421+
if onClose := p.config.trace.OnClose; onClose != nil {
422+
onDone := onClose(&ctx,
423+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"),
424+
)
425+
if onDone != nil {
426+
defer func() {
427+
onDone(finalErr)
428+
}()
429+
}
430+
}
421431

422432
close(p.done)
423433

internal/pool/pool_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,12 @@ func TestPool(t *testing.T) {
268268
})
269269
t.Run("Stress", func(t *testing.T) {
270270
xtest.TestManyTimes(t, func(t testing.TB) {
271-
trace := *defaultTrace
272-
trace.OnChange = func(info ChangeInfo) {
273-
require.GreaterOrEqual(t, info.Limit, info.Idle)
271+
trace := &Trace{
272+
OnChange: func(info Stats) {
273+
require.GreaterOrEqual(t, info.Limit, info.Idle)
274+
},
274275
}
275-
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
276+
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace))
276277
var wg sync.WaitGroup
277278
wg.Add(DefaultLimit*2 + 1)
278279
for range make([]struct{}, DefaultLimit*2) {
@@ -297,12 +298,13 @@ func TestPool(t *testing.T) {
297298
})
298299
t.Run("ParallelCreation", func(t *testing.T) {
299300
xtest.TestManyTimes(t, func(t testing.TB) {
300-
trace := *defaultTrace
301-
trace.OnChange = func(info ChangeInfo) {
302-
require.Equal(t, DefaultLimit, info.Limit)
303-
require.LessOrEqual(t, info.Idle, DefaultLimit)
301+
trace := &Trace{
302+
OnChange: func(info Stats) {
303+
require.Equal(t, DefaultLimit, info.Limit)
304+
require.LessOrEqual(t, info.Idle, DefaultLimit)
305+
},
304306
}
305-
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
307+
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace))
306308
var wg sync.WaitGroup
307309
for range make([]struct{}, DefaultLimit*10) {
308310
wg.Add(1)

internal/pool/stats.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package pool
22

33
type Stats struct {
4-
Limit int
5-
Idle int
4+
Limit int
5+
Index int
6+
Idle int
7+
Wait int
8+
CreateInProgress int
69
}

internal/pool/trace.go

Lines changed: 8 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,81 +8,13 @@ import (
88

99
type (
1010
Trace struct {
11-
OnNew func(*NewStartInfo) func(*NewDoneInfo)
12-
OnClose func(*CloseStartInfo) func(*CloseDoneInfo)
13-
OnTry func(*TryStartInfo) func(*TryDoneInfo)
14-
OnWith func(*WithStartInfo) func(*WithDoneInfo)
15-
OnPut func(*PutStartInfo) func(*PutDoneInfo)
16-
OnGet func(*GetStartInfo) func(*GetDoneInfo)
17-
OnChange func(ChangeInfo)
11+
OnNew func(ctx *context.Context, call stack.Caller) func(limit int)
12+
OnClose func(ctx *context.Context, call stack.Caller) func(err error)
13+
OnTry func(ctx *context.Context, call stack.Caller) func(err error)
14+
OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error)
15+
OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error)
16+
OnGet func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error)
17+
onWait func() func(item any, err error) //nolint:unused
18+
OnChange func(Stats)
1819
}
19-
NewStartInfo struct {
20-
// Context make available context in trace stack.Callerback function.
21-
// Pointer to context provide replacement of context in trace stack.Callerback function.
22-
// Warning: concurrent access to pointer on client side must be excluded.
23-
// Safe replacement of context are provided only inside stack.Callerback function
24-
Context *context.Context
25-
Call stack.Caller
26-
}
27-
NewDoneInfo struct {
28-
Limit int
29-
}
30-
CloseStartInfo struct {
31-
// Context make available context in trace stack.Callerback function.
32-
// Pointer to context provide replacement of context in trace stack.Callerback function.
33-
// Warning: concurrent access to pointer on client side must be excluded.
34-
// Safe replacement of context are provided only inside stack.Callerback function
35-
Context *context.Context
36-
Call stack.Caller
37-
}
38-
CloseDoneInfo struct {
39-
Error error
40-
}
41-
TryStartInfo struct {
42-
// Context make available context in trace stack.Callerback function.
43-
// Pointer to context provide replacement of context in trace stack.Callerback function.
44-
// Warning: concurrent access to pointer on client side must be excluded.
45-
// Safe replacement of context are provided only inside stack.Callerback function
46-
Context *context.Context
47-
Call stack.Caller
48-
}
49-
TryDoneInfo struct {
50-
Error error
51-
}
52-
WithStartInfo struct {
53-
// Context make available context in trace stack.Callerback function.
54-
// Pointer to context provide replacement of context in trace stack.Callerback function.
55-
// Warning: concurrent access to pointer on client side must be excluded.
56-
// Safe replacement of context are provided only inside stack.Callerback function
57-
Context *context.Context
58-
Call stack.Caller
59-
}
60-
WithDoneInfo struct {
61-
Error error
62-
63-
Attempts int
64-
}
65-
PutStartInfo struct {
66-
// Context make available context in trace stack.Callerback function.
67-
// Pointer to context provide replacement of context in trace stack.Callerback function.
68-
// Warning: concurrent access to pointer on client side must be excluded.
69-
// Safe replacement of context are provided only inside stack.Callerback function
70-
Context *context.Context
71-
Call stack.Caller
72-
}
73-
PutDoneInfo struct {
74-
Error error
75-
}
76-
GetStartInfo struct {
77-
// Context make available context in trace stack.Callerback function.
78-
// Pointer to context provide replacement of context in trace stack.Callerback function.
79-
// Warning: concurrent access to pointer on client side must be excluded.
80-
// Safe replacement of context are provided only inside stack.Callerback function
81-
Context *context.Context
82-
Call stack.Caller
83-
}
84-
GetDoneInfo struct {
85-
Error error
86-
}
87-
ChangeInfo = Stats
8820
)

0 commit comments

Comments
 (0)