Skip to content

Commit 84c05d7

Browse files
committed
refactor: refine coordination service client
- rename OpenSession to CreateSession - make CreateSession ctx argument define the context of the method not the whole session - make coordination_test tests fail on errors See also #53
1 parent a900145 commit 84c05d7

File tree

8 files changed

+86
-89
lines changed

8 files changed

+86
-89
lines changed

coordination/coordination.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@ type Client interface {
1616
DropNode(ctx context.Context, path string) (err error)
1717
DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)
1818

19-
// OpenSession starts a new session. This method blocks until the server session is created. The context provided is
20-
// used for the lifetime of the session.
19+
// CreateSession starts a new session. This method blocks until the server session is created. The context provided
20+
// may be used to cancel the invocation. If the method completes successfully, the session remains alive even if
21+
// the context is canceled.
2122
//
2223
// To ensure resources are not leaked, one of the following actions must be performed:
2324
//
24-
// - cancel the provided context,
2525
// - call Close on the Session,
2626
// - close the Client which the session was created with,
2727
// - call any method of the Session until the ErrSessionClosed is returned.
2828
//
2929
// # Experimental
3030
//
3131
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
32-
OpenSession(ctx context.Context, path string, opts ...options.OpenSessionOption) (Session, error)
32+
CreateSession(ctx context.Context, path string, opts ...options.CreateSessionOption) (Session, error)
3333
}
3434

3535
const (
@@ -81,7 +81,8 @@ type Session interface {
8181
// UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or
8282
// returns an error.
8383
//
84-
// This method is idempotent. The client will automatically retry in the case of network or server failure.
84+
// This method is not idempotent. The client will automatically retry in the case of network or server failure
85+
// unless it leaves the client state inconsistent.
8586
UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error
8687

8788
// DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the

coordination/example_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ func Example_semaphore() {
8181
}
8282
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)
8383

84-
s, err := db.Coordination().OpenSession(ctx, "/local/test")
84+
s, err := db.Coordination().CreateSession(ctx, "/local/test")
8585
if err != nil {
86-
fmt.Printf("failed to open session: %v\n", err)
86+
fmt.Printf("failed to create session: %v\n", err)
8787

8888
return
8989
}
9090
defer s.Close(ctx)
91-
fmt.Printf("session 1 opened, id: %d\n", s.SessionID())
91+
fmt.Printf("session 1 created, id: %d\n", s.SessionID())
9292

9393
err = s.CreateSemaphore(ctx, "my-semaphore", 20, options.WithCreateData([]byte{1, 2, 3}))
9494
if err != nil {

coordination/options/options.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,71 +7,71 @@ import (
77
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination"
88
)
99

10-
// WithDescription returns an OpenSessionOption that specifies a user-defined description that may be used to describe
10+
// WithDescription returns an CreateSessionOption that specifies a user-defined description that may be used to describe
1111
// the client.
12-
func WithDescription(description string) OpenSessionOption {
13-
return func(c *OpenSessionOptions) {
12+
func WithDescription(description string) CreateSessionOption {
13+
return func(c *CreateSessionOptions) {
1414
c.Description = description
1515
}
1616
}
1717

18-
// WithSessionTimeout returns an OpenSessionOption that specifies the timeout during which client may restore a
18+
// WithSessionTimeout returns an CreateSessionOption that specifies the timeout during which client may restore a
1919
// detached session. The client is forced to terminate the session if the last successful session request occurred
2020
// earlier than this time.
2121
//
2222
// If this is not set, the client uses the default 5 seconds.
23-
func WithSessionTimeout(timeout time.Duration) OpenSessionOption {
24-
return func(c *OpenSessionOptions) {
23+
func WithSessionTimeout(timeout time.Duration) CreateSessionOption {
24+
return func(c *CreateSessionOptions) {
2525
c.SessionTimeout = timeout
2626
}
2727
}
2828

29-
// WithSessionStartTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
29+
// WithSessionStartTimeout returns an CreateSessionOption that specifies the time that the client should wait for a
3030
// response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect.
3131
//
3232
// If this is not set, the client uses the default time 1 second.
33-
func WithSessionStartTimeout(timeout time.Duration) OpenSessionOption {
34-
return func(c *OpenSessionOptions) {
33+
func WithSessionStartTimeout(timeout time.Duration) CreateSessionOption {
34+
return func(c *CreateSessionOptions) {
3535
c.SessionStartTimeout = timeout
3636
}
3737
}
3838

39-
// WithSessionStopTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
39+
// WithSessionStopTimeout returns an CreateSessionOption that specifies the time that the client should wait for a
4040
// response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect.
4141
//
4242
// If this is not set, the client uses the default time 1 second.
43-
func WithSessionStopTimeout(timeout time.Duration) OpenSessionOption {
44-
return func(c *OpenSessionOptions) {
43+
func WithSessionStopTimeout(timeout time.Duration) CreateSessionOption {
44+
return func(c *CreateSessionOptions) {
4545
c.SessionStartTimeout = timeout
4646
}
4747
}
4848

49-
// WithSessionKeepAliveTimeout returns an OpenSessionOption that specifies the time that the client will wait before it
50-
// terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server.
49+
// WithSessionKeepAliveTimeout returns an CreateSessionOption that specifies the time that the client will wait before
50+
// it terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server.
5151
//
5252
// If this is not set, the client uses the default time 10 seconds.
53-
func WithSessionKeepAliveTimeout(timeout time.Duration) OpenSessionOption {
54-
return func(c *OpenSessionOptions) {
53+
func WithSessionKeepAliveTimeout(timeout time.Duration) CreateSessionOption {
54+
return func(c *CreateSessionOptions) {
5555
c.SessionKeepAliveTimeout = timeout
5656
}
5757
}
5858

59-
// WithSessionReconnectDelay returns an OpenSessionOption that specifies the time that the client will wait before it
59+
// WithSessionReconnectDelay returns an CreateSessionOption that specifies the time that the client will wait before it
6060
// tries to reconnect the underlying gRPC stream in case of error.
6161
//
6262
// If this is not set, the client uses the default time 500 milliseconds.
63-
func WithSessionReconnectDelay(delay time.Duration) OpenSessionOption {
64-
return func(c *OpenSessionOptions) {
63+
func WithSessionReconnectDelay(delay time.Duration) CreateSessionOption {
64+
return func(c *CreateSessionOptions) {
6565
c.SessionReconnectDelay = delay
6666
}
6767
}
6868

69-
// OpenSessionOption configures how we open a new session.
70-
type OpenSessionOption func(c *OpenSessionOptions)
69+
// CreateSessionOption configures how we create a new session.
70+
type CreateSessionOption func(c *CreateSessionOptions)
7171

72-
// OpenSessionOptions configure an OpenSession call. OpenSessionOptions are set by the OpenSessionOption values passed
73-
// to the OpenSession function.
74-
type OpenSessionOptions struct {
72+
// CreateSessionOptions configure an CreateSession call. CreateSessionOptions are set by the CreateSessionOption values
73+
// passed to the CreateSession function.
74+
type CreateSessionOptions struct {
7575
Description string
7676
SessionTimeout time.Duration
7777
SessionStartTimeout time.Duration
@@ -95,18 +95,18 @@ func WithEphemeral(ephemeral bool) AcquireSemaphoreOption {
9595
// acquired by another session.
9696
//
9797
// If this is not set, the client waits for the acquire operation result until the operation or session context is done.
98-
// You can reset the default value of this timeout by calling the WithNoAcquireTimeout method.
98+
// You can reset the default value of this timeout by calling the WithAcquireInfiniteTimeout method.
9999
func WithAcquireTimeout(timeout time.Duration) AcquireSemaphoreOption {
100100
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
101101
c.TimeoutMillis = uint64(timeout.Milliseconds())
102102
}
103103
}
104104

105-
// WithNoAcquireTimeout returns an AcquireSemaphoreOption which disables the timeout after which the operation fails if
106-
// it is still waiting in the queue.
105+
// WithAcquireInfiniteTimeout returns an AcquireSemaphoreOption which disables the timeout after which the operation
106+
// fails if it is still waiting in the queue.
107107
//
108108
// This is the default behavior. You can set the specific timeout by calling the WithAcquireTimeout method.
109-
func WithNoAcquireTimeout() AcquireSemaphoreOption {
109+
func WithAcquireInfiniteTimeout() AcquireSemaphoreOption {
110110
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
111111
c.TimeoutMillis = math.MaxUint64
112112
}

examples/coordination/lock/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"os/signal"
99
"sync"
10+
"syscall"
1011
"time"
1112

1213
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
@@ -62,7 +63,7 @@ func init() {
6263
}
6364

6465
func main() {
65-
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
66+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
6667
defer cancel()
6768

6869
db, err := ydb.Open(ctx, dsn,
@@ -90,7 +91,7 @@ func main() {
9091
for {
9192
fmt.Println("waiting for a lock...")
9293

93-
session, err := db.Coordination().OpenSession(ctx, path)
94+
session, err := db.Coordination().CreateSession(ctx, path)
9495
if err != nil {
9596
fmt.Println("failed to open session", err)
9697

examples/coordination/workers/main.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"os/signal"
1010
"sync"
11+
"syscall"
1112
"time"
1213

1314
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
@@ -74,13 +75,8 @@ func init() {
7475
}
7576

7677
func main() {
77-
ctx, cancel := context.WithCancel(context.Background())
78-
c := make(chan os.Signal, 1)
79-
signal.Notify(c, os.Interrupt)
80-
defer func() {
81-
signal.Stop(c)
82-
cancel()
83-
}()
78+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
79+
defer cancel()
8480

8581
db, err := ydb.Open(ctx, dsn,
8682
environ.WithEnvironCredentials(ctx),
@@ -114,7 +110,7 @@ func main() {
114110

115111
fmt.Println("starting tasks")
116112
for {
117-
session, err := db.Coordination().OpenSession(ctx, path)
113+
session, err := db.Coordination().CreateSession(ctx, path)
118114
if err != nil {
119115
fmt.Println("failed to open session", err)
120116

@@ -137,16 +133,16 @@ func main() {
137133
select {
138134
case <-semaphoreCtx.Done():
139135
break loop
140-
case <-c:
141-
fmt.Println("exiting")
142-
143-
return
144136
case lease := <-leaseChan:
145137
go doWork(lease.lease, lease.semaphoreName)
146138
tasksStarted++
147139
if tasksStarted == capacity {
148140
break loop
149141
}
142+
case <-ctx.Done():
143+
fmt.Println("exiting")
144+
145+
return
150146
}
151147
}
152148

@@ -156,11 +152,11 @@ func main() {
156152
wg.Wait()
157153

158154
select {
159-
case <-session.Context().Done():
160-
case <-c:
155+
case <-ctx.Done():
161156
fmt.Println("exiting")
162157

163158
return
159+
case <-session.Context().Done():
164160
}
165161
}
166162
}

internal/coordination/client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ func (c *Client) describeNode(
249249
}, nil
250250
}
251251

252-
func newOpenSessionConfig(opts ...options.OpenSessionOption) *options.OpenSessionOptions {
253-
c := defaultOpenSessionConfig()
252+
func newCreateSessionConfig(opts ...options.CreateSessionOption) *options.CreateSessionOptions {
253+
c := defaultCreateSessionConfig()
254254
for _, o := range opts {
255255
if o != nil {
256256
o(c)
@@ -260,7 +260,7 @@ func newOpenSessionConfig(opts ...options.OpenSessionOption) *options.OpenSessio
260260
return c
261261
}
262262

263-
func (c *Client) sessionOpened(s *session) {
263+
func (c *Client) sessionCreated(s *session) {
264264
c.mutex.Lock()
265265
defer c.mutex.Unlock()
266266

@@ -283,8 +283,8 @@ func (c *Client) closeSessions(ctx context.Context) {
283283
}
284284
}
285285

286-
func defaultOpenSessionConfig() *options.OpenSessionOptions {
287-
return &options.OpenSessionOptions{
286+
func defaultCreateSessionConfig() *options.CreateSessionOptions {
287+
return &options.CreateSessionOptions{
288288
Description: "YDB Go SDK",
289289
SessionTimeout: time.Second * 5,
290290
SessionStartTimeout: time.Second * 1,
@@ -294,16 +294,16 @@ func defaultOpenSessionConfig() *options.OpenSessionOptions {
294294
}
295295
}
296296

297-
func (c *Client) OpenSession(
297+
func (c *Client) CreateSession(
298298
ctx context.Context,
299299
path string,
300-
opts ...options.OpenSessionOption,
300+
opts ...options.CreateSessionOption,
301301
) (coordination.Session, error) {
302302
if c == nil {
303303
return nil, xerrors.WithStackTrace(errNilClient)
304304
}
305305

306-
return openSession(ctx, c, path, newOpenSessionConfig(opts...))
306+
return createSession(ctx, c, path, newCreateSessionConfig(opts...))
307307
}
308308

309309
func (c *Client) Close(ctx context.Context) error {

0 commit comments

Comments
 (0)