Skip to content

Commit 8a8ec21

Browse files
authored
Merge pull request #163 from ydb-platform/stream-usages
v3.14.5: stream usages
2 parents 97e90c6 + bb71682 commit 8a8ec21

File tree

12 files changed

+463
-88
lines changed

12 files changed

+463
-88
lines changed

.github/workflows/tests.yml

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ jobs:
6767
- name: Checkout code
6868
uses: actions/checkout@v2
6969
- name: Test
70-
run: go test -race -coverpkg=./... -coverprofile unit.txt -covermode atomic ./test/table_test.go
70+
run: go test -race -coverpkg=./... -coverprofile table.txt -covermode atomic ./test/table_test.go
7171
- name: Upload coverage to Codecov
7272
uses: codecov/codecov-action@v2
7373
with:
@@ -106,7 +106,7 @@ jobs:
106106
- name: Checkout code
107107
uses: actions/checkout@v2
108108
- name: Test
109-
run: go test -race -coverpkg=./... -coverprofile unit.txt -covermode atomic ./test/ratelimiter_test.go
109+
run: go test -race -coverpkg=./... -coverprofile ratelimiter.txt -covermode atomic ./test/ratelimiter_test.go
110110
shell: bash
111111
- name: Upload coverage to Codecov
112112
uses: codecov/codecov-action@v2
@@ -146,13 +146,52 @@ jobs:
146146
- name: Checkout code
147147
uses: actions/checkout@v2
148148
- name: Test
149-
run: go test -race -coverpkg=./... -coverprofile unit.txt -covermode atomic ./test/scripting_test.go
149+
run: go test -race -coverpkg=./... -coverprofile scripting.txt -covermode atomic ./test/scripting_test.go
150150
- name: Upload coverage to Codecov
151151
uses: codecov/codecov-action@v2
152152
with:
153153
file: ./scripting.txt
154154
flags: scripting,e2e,integration,${{ matrix.os }},${{ matrix.go-version }}
155155
name: scripting
156+
discovery:
157+
strategy:
158+
matrix:
159+
go-version: [1.17.x]
160+
os: [ubuntu-latest]
161+
services:
162+
ydb:
163+
image: cr.yandex/yc/yandex-docker-local-ydb:latest
164+
ports:
165+
- 2135:2135
166+
- 8765:8765
167+
volumes:
168+
- /tmp/ydb_certs:/ydb_certs
169+
env:
170+
YDB_LOCAL_SURVIVE_RESTART: true
171+
YDB_USE_IN_MEMORY_PDISKS: true
172+
options: '-h localhost'
173+
env:
174+
OS: ${{ matrix.os }}
175+
GO: ${{ matrix.go-version }}
176+
YDB_CONNECTION_STRING: grpcs://localhost:2135/?database=/local
177+
YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem
178+
YDB_ANONYMOUS_CREDENTIALS: 1
179+
runs-on: ${{ matrix.os }}
180+
steps:
181+
- name: Install Go
182+
uses: actions/setup-go@v2
183+
with:
184+
go-version: ${{ matrix.go-version }}
185+
- name: Checkout code
186+
uses: actions/checkout@v2
187+
- name: Test
188+
run: go test -race -coverpkg=./... -coverprofile discovery.txt -covermode atomic ./test/discovery_test.go
189+
- name: Upload coverage to Codecov
190+
uses: codecov/codecov-action@v2
191+
with:
192+
file: ./discovery.txt
193+
flags: discovery,e2e,integration,${{ matrix.os }},${{ matrix.go-version }}
194+
name: discovery
156195
connection:
157196
strategy:
158197
matrix:
@@ -185,7 +224,7 @@ jobs:
185224
- name: Checkout code
186225
uses: actions/checkout@v2
187226
- name: Test
188-
run: go test -race -coverpkg=./... -coverprofile unit.txt -covermode atomic ./test/connection_test.go
227+
run: go test -race -coverpkg=./... -coverprofile connection.txt -covermode atomic ./test/connection_test.go
189228
- name: Upload coverage to Codecov
190229
uses: codecov/codecov-action@v2
191230
with:

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
* Refactored `table.CreateSession` as retry operation with options
1010
* Moved log level from root of repository to package `log`
1111
* Added details and address to transport error
12-
* Fixed `recursive` param in `ratelimiter.ListResource`
12+
* Fixed `recursive` param in `ratelimiter.ListResource`
13+
* Added counting stream usages for exclude park connection if it in use
14+
* Added `trace.Driver` events about change stream usage and `conn.Release()` call
15+
* Fixed bug with non-applying meta headers on direct call `db.Discovery.Discover`
1316

1417
## 3.14.4
1518
* Implemented auto-removing `conn.Conn` from `conn.Pool` with counting usages of `conn.Conn`

internal/conn/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"google.golang.org/grpc"
77

8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
89
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
910
)
1011

@@ -13,4 +14,5 @@ type Config interface {
1314
Trace() trace.Driver
1415
ConnectionTTL() time.Duration
1516
GrpcDialOptions() []grpc.DialOption
17+
Meta() meta.Meta
1618
}

internal/conn/conn.go

Lines changed: 106 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -46,34 +46,57 @@ func (c *conn) Address() string {
4646
}
4747

4848
type conn struct {
49-
sync.RWMutex
50-
config Config // ro access
51-
cc *grpc.ClientConn
52-
done chan struct{}
53-
endpoint endpoint.Endpoint // ro access
54-
closed bool
55-
state State
56-
usages int32
57-
lastUsage time.Time
58-
onClose []func(*conn)
49+
mtx sync.RWMutex
50+
config Config // ro access
51+
cc *grpc.ClientConn
52+
done chan struct{}
53+
endpoint endpoint.Endpoint // ro access
54+
closed bool
55+
state State
56+
usages int32
57+
streamUsages int32
58+
lastUsage time.Time
59+
onClose []func(*conn)
5960
}
6061

6162
func (c *conn) Release(ctx context.Context) {
62-
if c.decUsages() == 0 {
63-
_ = c.Close(ctx)
63+
var (
64+
onDone = trace.DriverOnConnRelease(
65+
c.config.Trace(),
66+
&ctx,
67+
c.endpoint.Copy(),
68+
)
69+
err error
70+
)
71+
defer func() {
72+
onDone(err)
73+
}()
74+
var issues []error
75+
if c.changeUsages(-1) == 0 {
76+
if usages := atomic.LoadInt32(&c.streamUsages); usages > 0 {
77+
issues = append(issues, fmt.Errorf("conn in stream use: usages=%d", usages))
78+
}
79+
if closeErr := c.Close(ctx); closeErr != nil {
80+
issues = append(issues, closeErr)
81+
}
82+
}
83+
if len(issues) > 0 {
84+
err = errors.NewWithIssues("conn released with issues", issues...)
6485
}
6586
}
6687

6788
func (c *conn) LastUsage() time.Time {
68-
c.RLock()
69-
defer c.RUnlock()
70-
89+
if usages := atomic.LoadInt32(&c.streamUsages); usages > 0 {
90+
return time.Now()
91+
}
92+
c.mtx.RLock()
93+
defer c.mtx.RUnlock()
7194
return c.lastUsage
7295
}
7396

7497
func (c *conn) IsState(states ...State) bool {
75-
c.RLock()
76-
defer c.RUnlock()
98+
c.mtx.RLock()
99+
defer c.mtx.RUnlock()
77100

78101
for _, s := range states {
79102
if s == c.state {
@@ -85,8 +108,8 @@ func (c *conn) IsState(states ...State) bool {
85108
}
86109

87110
func (c *conn) park(ctx context.Context) (err error) {
88-
c.Lock()
89-
defer c.Unlock()
111+
c.mtx.Lock()
112+
defer c.mtx.Unlock()
90113

91114
if c.closed {
92115
return nil
@@ -129,8 +152,8 @@ func (c *conn) Endpoint() endpoint.Endpoint {
129152
}
130153

131154
func (c *conn) SetState(s State) State {
132-
c.Lock()
133-
defer c.Unlock()
155+
c.mtx.Lock()
156+
defer c.mtx.Unlock()
134157
return c.setState(s)
135158
}
136159

@@ -145,8 +168,8 @@ func (c *conn) setState(s State) State {
145168
}
146169

147170
func (c *conn) GetState() (s State) {
148-
c.RLock()
149-
defer c.RUnlock()
171+
c.mtx.RLock()
172+
defer c.mtx.RUnlock()
150173
return c.state
151174
}
152175

@@ -168,8 +191,8 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
168191
)
169192
}
170193

171-
c.Lock()
172-
defer c.Unlock()
194+
c.mtx.Lock()
195+
defer c.mtx.Unlock()
173196

174197
if !isBroken(c.cc) {
175198
return c.cc, nil
@@ -194,30 +217,44 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
194217
return c.cc, nil
195218
}
196219

220+
func (c *conn) touchLastUsage() {
221+
c.mtx.Lock()
222+
defer c.mtx.Unlock()
223+
c.lastUsage = time.Now()
224+
}
225+
197226
func (c *conn) changeUsages(delta int32) int32 {
198-
if usages := atomic.AddInt32(&c.usages, delta); usages < 0 {
227+
defer c.touchLastUsage()
228+
229+
usages := atomic.AddInt32(&c.usages, delta)
230+
231+
if usages < 0 {
199232
panic("negative usages" + strconv.Itoa(int(usages)))
200-
} else {
201-
trace.DriverOnConnUsagesChange(
202-
c.config.Trace(),
203-
c.endpoint.Copy(),
204-
int(usages),
205-
)
206-
return usages
207233
}
208-
}
209234

210-
func (c *conn) incUsages() {
211-
c.Lock()
212-
defer c.Unlock()
213-
c.lastUsage = time.Now()
214-
c.changeUsages(1)
235+
trace.DriverOnConnUsagesChange(
236+
c.config.Trace(),
237+
c.endpoint.Copy(),
238+
int(usages),
239+
)
240+
241+
return usages
215242
}
216243

217-
func (c *conn) decUsages() int32 {
218-
c.Lock()
219-
defer c.Unlock()
220-
return c.changeUsages(-1)
244+
func (c *conn) changeStreamUsages(delta int32) {
245+
defer c.touchLastUsage()
246+
247+
usages := atomic.AddInt32(&c.streamUsages, delta)
248+
249+
if usages < 0 {
250+
panic("negative stream usages" + strconv.Itoa(int(usages)))
251+
}
252+
253+
trace.DriverOnConnStreamUsagesChange(
254+
c.config.Trace(),
255+
c.endpoint.Copy(),
256+
int(usages),
257+
)
221258
}
222259

223260
func isBroken(raw *grpc.ClientConn) bool {
@@ -240,14 +277,14 @@ func (c *conn) close() (err error) {
240277
}
241278

242279
func (c *conn) isClosed() bool {
243-
c.RLock()
244-
defer c.RUnlock()
280+
c.mtx.RLock()
281+
defer c.mtx.RUnlock()
245282
return c.closed
246283
}
247284

248285
func (c *conn) Close(ctx context.Context) (err error) {
249-
c.Lock()
250-
defer c.Unlock()
286+
c.mtx.Lock()
287+
defer c.mtx.Unlock()
251288

252289
if c.closed {
253290
return nil
@@ -292,8 +329,17 @@ func (c *conn) invoke(
292329
)
293330
}
294331

295-
c.incUsages()
296-
defer c.decUsages()
332+
ctx, err = c.config.Meta().Meta(ctx)
333+
if err != nil {
334+
return errors.NewGrpcError(
335+
codes.Unavailable,
336+
errors.WithMsg("ydb driver conn apply meta failed"),
337+
errors.WithErr(err),
338+
)
339+
}
340+
341+
c.changeUsages(1)
342+
defer c.changeUsages(-1)
297343

298344
return cc.Invoke(ctx, method, req, res, opts...)
299345
}
@@ -370,8 +416,17 @@ func (c *conn) newStream(
370416
)
371417
}
372418

373-
c.incUsages()
374-
defer c.decUsages()
419+
ctx, err = c.config.Meta().Meta(ctx)
420+
if err != nil {
421+
return nil, errors.NewGrpcError(
422+
codes.Unavailable,
423+
errors.WithMsg("ydb driver conn apply meta failed"),
424+
errors.WithErr(err),
425+
)
426+
}
427+
428+
c.changeStreamUsages(1)
429+
defer c.changeStreamUsages(-1)
375430

376431
return cc.NewStream(ctx, desc, method, opts...)
377432
}

internal/conn/grpc_client_stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ func (s *grpcClientStream) Context() context.Context {
4747
}
4848

4949
func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
50-
s.c.incUsages()
51-
defer s.c.decUsages()
50+
s.c.changeStreamUsages(1)
51+
defer s.c.changeStreamUsages(-1)
5252

5353
err = s.s.SendMsg(m)
5454
if err != nil && s.wrapping {
@@ -64,8 +64,8 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
6464
}
6565

6666
func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
67-
s.c.incUsages()
68-
defer s.c.decUsages()
67+
s.c.changeStreamUsages(1)
68+
defer s.c.changeStreamUsages(-1)
6969

7070
defer func() {
7171
onDone := s.recv(errors.HideEOF(err))

internal/conn/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (p *pool) Get(ctx context.Context, endpoint endpoint.Endpoint) Conn {
6262
)
6363

6464
if cc, has = p.conns[address]; has {
65-
cc.incUsages()
65+
cc.changeUsages(1)
6666
return cc
6767
}
6868

0 commit comments

Comments
 (0)