Skip to content

Commit 96e361f

Browse files
authored
Merge pull request #1160 from ydb-platform/coordination
Coordination service client with Semaphore support
2 parents 4efbc7a + c9ae1ad commit 96e361f

File tree

22 files changed

+4047
-81
lines changed

22 files changed

+4047
-81
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added experimental support of semaphores over coordination service client
2+
13
## v3.59.3
24
* Fixed `gstack` logic for parsing `ast.BlockStmt`
35

coordination/coordination.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package coordination
22

33
import (
44
"context"
5+
"fmt"
6+
"math"
7+
"time"
58

9+
"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
610
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
711
)
812

@@ -11,4 +15,187 @@ type Client interface {
1115
AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
1216
DropNode(ctx context.Context, path string) (err error)
1317
DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)
18+
19+
// CreateSession starts a new session. This method blocks until the server session is created. The context provided
20+
// may be used to cancel the invocation. If the method completes successfully, the session remains alive even if
21+
// the context is canceled.
22+
//
23+
// To ensure resources are not leaked, one of the following actions must be performed:
24+
//
25+
// - call Close on the Session,
26+
// - close the Client which the session was created with,
27+
// - call any method of the Session until the ErrSessionClosed is returned.
28+
//
29+
// # Experimental
30+
//
31+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
32+
CreateSession(ctx context.Context, path string, opts ...options.CreateSessionOption) (Session, error)
33+
}
34+
35+
const (
36+
// MaxSemaphoreLimit defines the maximum value of the limit parameter in the Session.CreateSemaphore method.
37+
MaxSemaphoreLimit = math.MaxUint64
38+
39+
// Exclusive is just a shortcut for the maximum semaphore limit value. You can use this to acquire a semaphore in
40+
// the exclusive mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for
41+
// ephemeral semaphores.
42+
Exclusive = math.MaxUint64
43+
44+
// Shared is just a shortcut for the minimum semaphore limit value (1). You can use this to acquire a semaphore in
45+
// the shared mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for ephemeral
46+
// semaphores.
47+
Shared = 1
48+
)
49+
50+
// Session defines a coordination service backed session.
51+
//
52+
// In general, Session API is concurrency-friendly, you can safely access all of its methods concurrently.
53+
//
54+
// The client guarantees that sequential calls of the methods are sent to the server in the same order. However, the
55+
// session client may reorder and suspend some of the requests without violating correctness of the execution. This also
56+
// applies to the situations when the underlying gRPC stream has been recreated due to network or server issues.
57+
//
58+
// The client automatically keeps the underlying gRPC stream alive by sending keep-alive (ping-pong) requests. If the
59+
// client can no longer consider the session alive, it immediately cancels the session context which also leads to
60+
// cancellation of contexts of all semaphore leases created by this session.
61+
type Session interface {
62+
// Close closes the coordination service session. It cancels all active requests on the server and notifies every
63+
// pending or waiting for response request on the client side. It also cancels the session context and tries to
64+
// stop the session gracefully on the server. If the ctx is canceled, this will not wait for the server session to
65+
// become stopped and returns immediately with an error. Once this function returns with no error, all subsequent
66+
// calls will be noop.
67+
Close(ctx context.Context) error
68+
69+
// Context returns the context of the session. It is canceled when the underlying server session is over or if the
70+
// client could not get any successful response from the server before the session timeout (see
71+
// options.WithSessionTimeout).
72+
Context() context.Context
73+
74+
// CreateSemaphore creates a new semaphore. This method waits until the server successfully creates a new semaphore
75+
// or returns an error.
76+
//
77+
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
78+
// returns the ErrOperationStatusUnknown error.
79+
CreateSemaphore(ctx context.Context, name string, limit uint64, opts ...options.CreateSemaphoreOption) error
80+
81+
// UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or
82+
// returns an error.
83+
//
84+
// This method is not idempotent. The client will automatically retry in the case of network or server failure
85+
// unless it leaves the client state inconsistent.
86+
UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error
87+
88+
// DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the
89+
// semaphore or returns an error.
90+
//
91+
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
92+
// returns the ErrOperationStatusUnknown error.
93+
DeleteSemaphore(ctx context.Context, name string, opts ...options.DeleteSemaphoreOption) error
94+
95+
// DescribeSemaphore returns the state of the semaphore.
96+
//
97+
// This method is idempotent. The client will automatically retry in the case of network or server failure.
98+
DescribeSemaphore(
99+
ctx context.Context,
100+
name string,
101+
opts ...options.DescribeSemaphoreOption,
102+
) (*SemaphoreDescription, error)
103+
104+
// AcquireSemaphore acquires the semaphore. If you acquire an ephemeral semaphore (see options.WithEphemeral), its
105+
// limit will be set to MaxSemaphoreLimit. Later requests override previous operations with the same semaphore, e.g.
106+
// to reduce acquired count, change timeout or attached data.
107+
//
108+
// This method blocks until the semaphore is acquired, an error is returned from the server or the session is
109+
// closed. If the operation context was canceled but the server replied that the semaphore was actually acquired,
110+
// the client will automatically release the semaphore.
111+
//
112+
// Semaphore waiting is fair: the semaphore guarantees that other sessions invoking the AcquireSemaphore method
113+
// acquire permits in the order which they were called (FIFO). If a session invokes the AcquireSemaphore method
114+
// multiple times while the first invocation is still in process, the position in the queue remains unchanged.
115+
//
116+
// This method is idempotent. The client will automatically retry in the case of network or server failure.
117+
AcquireSemaphore(
118+
ctx context.Context,
119+
name string,
120+
count uint64,
121+
opts ...options.AcquireSemaphoreOption,
122+
) (Lease, error)
123+
124+
// SessionID returns a server-generated identifier of the session. This value is permanent and unique within the
125+
// coordination service node.
126+
SessionID() uint64
127+
128+
// Reconnect forcibly shuts down the underlying gRPC stream and initiates a new one. This method is highly unlikely
129+
// to be of use in a typical application but is extremely useful for testing an API implementation.
130+
Reconnect()
131+
}
132+
133+
// Lease is the object which defines the rights of the session to the acquired semaphore. Lease is alive until its
134+
// context is not canceled. This may happen implicitly, when the associated session becomes lost or closed, or
135+
// explicitly, if someone calls the Release method of the lease.
136+
type Lease interface {
137+
// Context returns the context of the lease. It is canceled when the session it was created by was lost or closed,
138+
// or if the lease was released by calling the Release method.
139+
Context() context.Context
140+
141+
// Release releases the acquired lease to the semaphore. It also cancels the context of the lease. This method does
142+
// not take a ctx argument, but you can cancel the execution of it by closing the session or canceling its context.
143+
Release() error
144+
145+
// Session returns the session which this lease was created by.
146+
Session() Session
147+
}
148+
149+
// SemaphoreDescription describes the state of a semaphore.
150+
type SemaphoreDescription struct {
151+
// Name is the name of the semaphore.
152+
Name string
153+
154+
// Limit is the maximum number of tokens that may be acquired.
155+
Limit uint64
156+
157+
// Count is the number of tokens currently acquired by its owners.
158+
Count uint64
159+
160+
// Ephemeral semaphores are deleted when there are no owners and waiters left.
161+
Ephemeral bool
162+
163+
// Data is user-defined data attached to the semaphore.
164+
Data []byte
165+
166+
// Owner is the list of current owners of the semaphore.
167+
Owners []*SemaphoreSession
168+
169+
// Waiter is the list of current waiters of the semaphore.
170+
Waiters []*SemaphoreSession
171+
}
172+
173+
// SemaphoreSession describes an owner or a waiter of this semaphore.
174+
type SemaphoreSession struct {
175+
// SessionID is the id of the session which tried to acquire the semaphore.
176+
SessionID uint64
177+
178+
// Count is the number of tokens for the acquire operation.
179+
Count uint64
180+
181+
// OrderId is a monotonically increasing id which determines locking order.
182+
OrderID uint64
183+
184+
// Data is user-defined data attached to the acquire operation.
185+
Data []byte
186+
187+
// Timeout is the timeout for the operation in the waiter queue. If this is time.Duration(math.MaxInt64) the session
188+
// will wait for the semaphore until the operation is canceled.
189+
Timeout time.Duration
190+
}
191+
192+
func (d *SemaphoreDescription) String() string {
193+
return fmt.Sprintf(
194+
"{Name: %q Limit: %d Count: %d Ephemeral: %t Data: %q Owners: %s Waiters: %s}",
195+
d.Name, d.Limit, d.Count, d.Ephemeral, d.Data, d.Owners, d.Waiters)
196+
}
197+
198+
func (s *SemaphoreSession) String() string {
199+
return fmt.Sprintf("{SessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}",
200+
s.SessionID, s.Count, s.OrderID, s.Data, s.Timeout)
14201
}

coordination/errors.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package coordination
2+
3+
import "errors"
4+
5+
var (
6+
// ErrOperationStatusUnknown indicates that the request has been sent to the server but no reply has been received.
7+
// The client usually automatically retries calls of that kind, but there are cases when it is not possible:
8+
// - the request is not idempotent, non-idempotent requests are never retried,
9+
// - the session was lost and its context is canceled.
10+
ErrOperationStatusUnknown = errors.New("operation status is unknown")
11+
12+
// ErrSessionClosed indicates that the Session object is closed.
13+
ErrSessionClosed = errors.New("session is closed")
14+
15+
// ErrAcquireTimeout indicates that the Session.AcquireSemaphore method could not acquire the semaphore before the
16+
// operation timeout (see options.WithAcquireTimeout).
17+
ErrAcquireTimeout = errors.New("acquire semaphore timeout")
18+
)

coordination/example_test.go

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import (
66

77
"github.com/ydb-platform/ydb-go-sdk/v3"
88
"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
910
)
1011

1112
//nolint:errcheck
12-
func Example() {
13+
func Example_createDropNode() {
1314
ctx := context.TODO()
1415
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
1516
if err != nil {
@@ -41,3 +42,106 @@ func Example() {
4142
}
4243
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)
4344
}
45+
46+
func Example_semaphore() {
47+
ctx := context.TODO()
48+
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
49+
if err != nil {
50+
fmt.Printf("failed to connect: %v", err)
51+
52+
return
53+
}
54+
defer db.Close(ctx) // cleanup resources
55+
// create node
56+
err = db.Coordination().CreateNode(ctx, "/local/test", coordination.NodeConfig{
57+
Path: "",
58+
SelfCheckPeriodMillis: 1000,
59+
SessionGracePeriodMillis: 1000,
60+
ReadConsistencyMode: coordination.ConsistencyModeStrict,
61+
AttachConsistencyMode: coordination.ConsistencyModeStrict,
62+
RatelimiterCountersMode: coordination.RatelimiterCountersModeDetailed,
63+
})
64+
if err != nil {
65+
fmt.Printf("failed to create node: %v", err)
66+
67+
return
68+
}
69+
defer func() {
70+
dropNodeErr := db.Coordination().DropNode(ctx, "/local/test")
71+
if dropNodeErr != nil {
72+
fmt.Printf("failed to drop node: %v\n", dropNodeErr)
73+
}
74+
}()
75+
76+
e, c, err := db.Coordination().DescribeNode(ctx, "/local/test")
77+
if err != nil {
78+
fmt.Printf("failed to describe node: %v\n", err)
79+
80+
return
81+
}
82+
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)
83+
84+
s, err := db.Coordination().CreateSession(ctx, "/local/test")
85+
if err != nil {
86+
fmt.Printf("failed to create session: %v\n", err)
87+
88+
return
89+
}
90+
defer s.Close(ctx)
91+
fmt.Printf("session 1 created, id: %d\n", s.SessionID())
92+
93+
err = s.CreateSemaphore(ctx, "my-semaphore", 20, options.WithCreateData([]byte{1, 2, 3}))
94+
if err != nil {
95+
fmt.Printf("failed to create semaphore: %v", err)
96+
97+
return
98+
}
99+
fmt.Printf("semaphore my-semaphore created\n")
100+
101+
lease, err := s.AcquireSemaphore(ctx, "my-semaphore", 10)
102+
if err != nil {
103+
fmt.Printf("failed to acquire semaphore: %v", err)
104+
105+
return
106+
}
107+
defer func() {
108+
releaseErr := lease.Release()
109+
if releaseErr != nil {
110+
fmt.Printf("failed to release lease: %v", releaseErr)
111+
}
112+
}()
113+
114+
fmt.Printf("session 1 acquired semaphore 10\n")
115+
116+
s.Reconnect()
117+
fmt.Printf("session 1 reconnected\n")
118+
119+
desc, err := s.DescribeSemaphore(
120+
ctx,
121+
"my-semaphore",
122+
options.WithDescribeOwners(true),
123+
options.WithDescribeWaiters(true),
124+
)
125+
if err != nil {
126+
fmt.Printf("failed to describe semaphore: %v", err)
127+
128+
return
129+
}
130+
fmt.Printf("session 1 described semaphore %v\n", desc)
131+
132+
err = lease.Release()
133+
if err != nil {
134+
fmt.Printf("failed to release semaphore: %v", err)
135+
136+
return
137+
}
138+
fmt.Printf("session 1 released semaphore my-semaphore\n")
139+
140+
err = s.DeleteSemaphore(ctx, "my-semaphore", options.WithForceDelete(true))
141+
if err != nil {
142+
fmt.Printf("failed to delete semaphore: %v", err)
143+
144+
return
145+
}
146+
fmt.Printf("deleted semaphore my-semaphore\n")
147+
}

0 commit comments

Comments
 (0)