Skip to content

Commit eceec46

Browse files
authored
Merge pull request #502 from ydb-platform/last-usage
Last usage
2 parents 2eb025e + 6e85ee6 commit eceec46

File tree

8 files changed

+123
-39
lines changed

8 files changed

+123
-39
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
* Added retry policy options for topics: `topic/topicoptions.WithReaderCheckRetryErrorFunction`, `topic/topicoptions.WithReaderStartTimeout`, `topic/topicoptions.WithWriterCheckRetryErrorFunction`, `topic/topicoptions.WithWriterStartTimeout`
2+
* Refactored `internal/conn` middlewares for `DRY`
3+
* Added `trace.tableSessionInfo.LastUsage()` method for get last usage timestamp
24

35
## v3.41.0
46
* Added option for set interval of auth token update in topic streams

internal/conn/middleware.go

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,53 +6,95 @@ import (
66
"google.golang.org/grpc"
77
)
88

9-
type contextModifierMiddleware struct {
10-
cc grpc.ClientConnInterface
11-
modifyCtx func(ctx context.Context) context.Context
9+
var _ grpc.ClientConnInterface = (*middleware)(nil)
10+
11+
type (
12+
invoker func(context.Context, string, interface{}, interface{}, ...grpc.CallOption) error
13+
streamer func(context.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error)
14+
)
15+
16+
type middleware struct {
17+
invoke invoker
18+
newStream streamer
1219
}
1320

14-
func (c *contextModifierMiddleware) Invoke(
21+
func (m *middleware) Invoke(
1522
ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption,
1623
) error {
17-
return c.cc.Invoke(c.modifyCtx(ctx), method, args, reply, opts...)
24+
return m.invoke(ctx, method, args, reply, opts...)
1825
}
1926

20-
func (c *contextModifierMiddleware) NewStream(
27+
func (m *middleware) NewStream(
2128
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
2229
) (grpc.ClientStream, error) {
23-
return c.cc.NewStream(c.modifyCtx(ctx), desc, method, opts...)
30+
return m.newStream(ctx, desc, method, opts...)
2431
}
2532

2633
func WithContextModifier(
2734
cc grpc.ClientConnInterface,
2835
modifyCtx func(ctx context.Context) context.Context,
2936
) grpc.ClientConnInterface {
30-
return &contextModifierMiddleware{
31-
cc: cc,
32-
modifyCtx: modifyCtx,
37+
return &middleware{
38+
invoke: func(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
39+
ctx = modifyCtx(ctx)
40+
return cc.Invoke(ctx, method, args, reply, opts...)
41+
},
42+
newStream: func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (
43+
grpc.ClientStream, error,
44+
) {
45+
ctx = modifyCtx(ctx)
46+
return cc.NewStream(ctx, desc, method, opts...)
47+
},
3348
}
3449
}
3550

36-
type optionsAppenderMiddleware struct {
37-
cc grpc.ClientConnInterface
38-
opts []grpc.CallOption
39-
}
40-
41-
func (c *optionsAppenderMiddleware) Invoke(
42-
ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption,
43-
) error {
44-
return c.cc.Invoke(ctx, method, args, reply, append(opts, c.opts...)...)
51+
func WithAppendOptions(cc grpc.ClientConnInterface, appendOpts ...grpc.CallOption) grpc.ClientConnInterface {
52+
return &middleware{
53+
invoke: func(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
54+
opts = append(opts, appendOpts...)
55+
return cc.Invoke(ctx, method, args, reply, opts...)
56+
},
57+
newStream: func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (
58+
grpc.ClientStream, error,
59+
) {
60+
opts = append(opts, appendOpts...)
61+
return cc.NewStream(ctx, desc, method, opts...)
62+
},
63+
}
4564
}
4665

47-
func (c *optionsAppenderMiddleware) NewStream(
48-
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
49-
) (grpc.ClientStream, error) {
50-
return c.cc.NewStream(ctx, desc, method, append(opts, c.opts...)...)
66+
func WithBeforeFunc(
67+
cc grpc.ClientConnInterface,
68+
before func(),
69+
) grpc.ClientConnInterface {
70+
return &middleware{
71+
invoke: func(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
72+
before()
73+
return cc.Invoke(ctx, method, args, reply, opts...)
74+
},
75+
newStream: func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (
76+
grpc.ClientStream, error,
77+
) {
78+
before()
79+
return cc.NewStream(ctx, desc, method, opts...)
80+
},
81+
}
5182
}
5283

53-
func WithAppendOptions(cc grpc.ClientConnInterface, opts ...grpc.CallOption) grpc.ClientConnInterface {
54-
return &optionsAppenderMiddleware{
55-
cc: cc,
56-
opts: opts,
84+
func WithAfterFunc(
85+
cc grpc.ClientConnInterface,
86+
after func(),
87+
) grpc.ClientConnInterface {
88+
return &middleware{
89+
invoke: func(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
90+
defer after()
91+
return cc.Invoke(ctx, method, args, reply, opts...)
92+
},
93+
newStream: func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (
94+
grpc.ClientStream, error,
95+
) {
96+
defer after()
97+
return cc.NewStream(ctx, desc, method, opts...)
98+
},
5799
}
58100
}

internal/table/session.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,16 @@ type session struct {
5050
status table.SessionStatus
5151
statusMtx sync.RWMutex
5252
nodeID uint32
53+
lastUsage int64
5354

5455
onClose []func(s *session)
5556
closeOnce sync.Once
5657
}
5758

59+
func (s *session) LastUsage() time.Time {
60+
return time.Unix(atomic.LoadInt64(&s.lastUsage), 0)
61+
}
62+
5863
func nodeID(sessionID string) (uint32, error) {
5964
u, err := url.Parse(sessionID)
6065
if err != nil {
@@ -137,15 +142,21 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config config.
137142
}
138143

139144
s = &session{
140-
id: result.GetSessionId(),
141-
tableService: Ydb_Table_V1.NewTableServiceClient(
145+
id: result.GetSessionId(),
146+
config: config,
147+
status: table.SessionReady,
148+
}
149+
150+
s.tableService = Ydb_Table_V1.NewTableServiceClient(
151+
conn.WithBeforeFunc(
142152
conn.WithContextModifier(cc, func(ctx context.Context) context.Context {
143153
return meta.WithTrailerCallback(balancer.WithEndpoint(ctx, s), s.checkCloseHint)
144154
}),
155+
func() {
156+
atomic.StoreInt64(&s.lastUsage, time.Now().Unix())
157+
},
145158
),
146-
config: config,
147-
status: table.SessionReady,
148-
}
159+
)
149160

150161
for _, o := range opts {
151162
o(s)

internal/xsql/badconn/badconn_go1.18.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ func (e Error) Error() string {
2222
}
2323

2424
func (e Error) Is(err error) bool {
25-
if err == driver.ErrBadConn {
25+
//nolint:nolintlint
26+
if err == driver.ErrBadConn { //nolint:errorlint
2627
return true
2728
}
2829
return xerrors.Is(e.err, err)

internal/xsql/conn.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"sync/atomic"
10+
"time"
1011

1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -56,6 +57,7 @@ type conn struct {
5657
session table.ClosableSession // Immutable and r/o usage.
5758

5859
closed uint32
60+
lastUsage int64
5961
defaultQueryMode QueryMode
6062

6163
defaultTxControl *table.TransactionControl
@@ -139,8 +141,16 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt,
139141

140142
func (c *conn) execContext(ctx context.Context, query string, args []driver.NamedValue) (_ driver.Result, err error) {
141143
m := queryModeFromContext(ctx, c.defaultQueryMode)
142-
onDone := trace.DatabaseSQLOnConnExec(c.trace, &ctx, query, m.String(), xcontext.IsIdempotent(ctx))
144+
onDone := trace.DatabaseSQLOnConnExec(
145+
c.trace,
146+
&ctx,
147+
query,
148+
m.String(),
149+
xcontext.IsIdempotent(ctx),
150+
time.Since(time.Unix(atomic.LoadInt64(&c.lastUsage), 0)),
151+
)
143152
defer func() {
153+
atomic.StoreInt64(&c.lastUsage, time.Now().Unix())
144154
onDone(err)
145155
}()
146156
switch m {
@@ -214,8 +224,16 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
214224

215225
func (c *conn) queryContext(ctx context.Context, query string, args []driver.NamedValue) (_ driver.Rows, err error) {
216226
m := queryModeFromContext(ctx, c.defaultQueryMode)
217-
onDone := trace.DatabaseSQLOnConnExec(c.trace, &ctx, query, m.String(), xcontext.IsIdempotent(ctx))
227+
onDone := trace.DatabaseSQLOnConnQuery(
228+
c.trace,
229+
&ctx,
230+
query,
231+
m.String(),
232+
xcontext.IsIdempotent(ctx),
233+
time.Since(time.Unix(atomic.LoadInt64(&c.lastUsage), 0)),
234+
)
218235
defer func() {
236+
atomic.StoreInt64(&c.lastUsage, time.Now().Unix())
219237
onDone(err)
220238
}()
221239
switch m {
@@ -349,7 +367,7 @@ func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (_ drive
349367
return c.currentTx, nil
350368
}
351369

352-
func (c *conn) ResetSession(ctx context.Context) error {
370+
func (c *conn) ResetSession(_ context.Context) error {
353371
if c.currentTx != nil {
354372
_ = c.currentTx.Rollback()
355373
}

trace/sql.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package trace
22

33
//go:generate gtrace
44

5-
import "context"
5+
import (
6+
"context"
7+
"time"
8+
)
69

710
type (
811
// DatabaseSQL specified trace of `database/sql` call activity.
@@ -84,6 +87,7 @@ type (
8487
Query string
8588
Mode string
8689
Idempotent bool
90+
IdleTime time.Duration
8791
}
8892
DatabaseSQLConnQueryDoneInfo struct {
8993
Error error
@@ -97,6 +101,7 @@ type (
97101
Query string
98102
Mode string
99103
Idempotent bool
104+
IdleTime time.Duration
100105
}
101106
DatabaseSQLConnExecDoneInfo struct {
102107
Error error

trace/sql_gtrace.go

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trace/table.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package trace
22

33
import (
44
"context"
5+
"time"
56
)
67

78
// tool gtrace used from ./internal/cmd/gtrace
@@ -99,6 +100,7 @@ type (
99100
tableSessionInfo interface {
100101
ID() string
101102
Status() string
103+
LastUsage() time.Time
102104
}
103105
tableTransactionInfo interface {
104106
ID() string

0 commit comments

Comments
 (0)