@@ -16,6 +16,8 @@ use std::future::Future;
16
16
use std:: io;
17
17
use std:: sync:: Arc ;
18
18
use std:: time:: Duration ;
19
+ use std:: time:: SystemTime ;
20
+ use std:: time:: UNIX_EPOCH ;
19
21
20
22
use codeq:: Encode ;
21
23
use databend_common_base:: runtime:: spawn_named;
@@ -41,6 +43,19 @@ use crate::queue::PermitEvent;
41
43
use crate :: PermitEntry ;
42
44
use crate :: PermitKey ;
43
45
46
+ #[ derive( Clone , Debug ) ]
47
+ pub ( crate ) enum SeqPolicy {
48
+ /// Use a sequence number generator to ensure globally unique sequence numbers.
49
+ ///
50
+ /// Retry if the seq generator is updated by other process.
51
+ GeneratorKey { generator_key : String } ,
52
+
53
+ /// Use the current timestamp as the sequence number.
54
+ ///
55
+ /// Retry if duplicated
56
+ TimeBased ,
57
+ }
58
+
44
59
/// The acquirer is responsible for acquiring a semaphore permit.
45
60
///
46
61
/// It is used to acquire a semaphore by creating a new [`PermitEntry`] in the meta-service.
@@ -61,12 +76,12 @@ pub(crate) struct Acquirer {
61
76
/// For example, the [`Acquirer`] crashed abnormally.
62
77
pub ( crate ) lease : Duration ,
63
78
64
- /// The key of the permit sequence number generator .
79
+ /// The way to generate sequence number for a [`PermitKey`] .
65
80
///
66
81
/// It is used to generate a globally unique sequence number for each [`Permit`].
67
82
/// [`Permit`] will be sorted in meta-service by this sequence number
68
83
/// and to determine the order to be acquired.
69
- pub ( crate ) seq_generator_key : String ,
84
+ pub ( crate ) seq_policy : SeqPolicy ,
70
85
71
86
/// The meta client to interact with the meta-service.
72
87
pub ( crate ) meta_client : Arc < ClientHandle > ,
@@ -88,9 +103,6 @@ pub(crate) struct Acquirer {
88
103
impl Acquirer {
89
104
/// Acquires a new semaphore permit and returns a [`Permit`] handle.
90
105
pub async fn acquire ( mut self ) -> Result < Permit , AcquireError > {
91
- let mut sleep_time = Duration :: from_millis ( 10 ) ;
92
- let max_sleep_time = Duration :: from_secs ( 1 ) ;
93
-
94
106
self . stat . start ( ) ;
95
107
96
108
let permit_entry = PermitEntry {
@@ -101,10 +113,90 @@ impl Acquirer {
101
113
. encode_to_vec ( )
102
114
. map_err ( |e| conn_io_error ( e, "encode semaphore entry" ) . context ( & self . ctx ) ) ?;
103
115
104
- // The sem_key is the key of the semaphore entry.
105
- let permit_key = loop {
116
+ // Step 1: Create a new semaphore entry with the key format `{prefix}/queue/{seq:020}`.
117
+
118
+ let permit_key = match self . seq_policy . clone ( ) {
119
+ SeqPolicy :: GeneratorKey { generator_key } => {
120
+ self . enqueue_permit_entry ( & permit_entry, & generator_key)
121
+ . await ?
122
+ }
123
+ SeqPolicy :: TimeBased => self . enqueue_time_based_permit_entry ( & permit_entry) . await ?,
124
+ } ;
125
+
126
+ // Step 2: The sem entry is inserted, keep it alive by extending the lease.
127
+
128
+ let leaser_cancel_tx = self . spawn_extend_lease_task ( permit_key. clone ( ) , val_bytes) ;
129
+
130
+ // Step 3: Wait for the semaphore to be acquired or removed.
131
+
132
+ while let Some ( sem_event) = self . permit_event_rx . recv ( ) . await {
133
+ info ! (
134
+ "Acquirer({}): received semaphore event: {:?}" ,
135
+ self . ctx, sem_event
136
+ ) ;
137
+
138
+ self . stat . on_receive_event ( & sem_event) ;
139
+
140
+ match sem_event {
141
+ PermitEvent :: Acquired ( ( seq, _) ) => {
142
+ if seq == permit_key. seq {
143
+ self . stat . on_acquire ( ) ;
144
+
145
+ info ! (
146
+ "{} acquired: {}->{}" ,
147
+ self . ctx, permit_key, self . acquirer_id
148
+ ) ;
149
+ break ;
150
+ }
151
+ }
152
+ PermitEvent :: Removed ( ( seq, _) ) => {
153
+ if seq == permit_key. seq {
154
+ self . stat . on_remove ( ) ;
155
+
156
+ warn ! (
157
+ "semaphore removed before acquired: {}->{}" ,
158
+ permit_key, self . acquirer_id
159
+ ) ;
160
+ return Err ( AcquireError :: EarlyRemoved ( EarlyRemoved :: new (
161
+ permit_key. clone ( ) ,
162
+ permit_entry. clone ( ) ,
163
+ ) ) ) ;
164
+ }
165
+ }
166
+ }
167
+ }
168
+
169
+ let permit = Permit :: new (
170
+ self . permit_event_rx ,
171
+ permit_key,
172
+ permit_entry,
173
+ self . stat ,
174
+ self . subscriber_cancel_tx ,
175
+ leaser_cancel_tx,
176
+ ) ;
177
+
178
+ Ok ( permit)
179
+ }
180
+
181
+ /// Add a new semaphore entry to the `<prefix>/queue` in the meta-service.
182
+ ///
183
+ /// In this method, the sequence number is generated in meta-service, by updating a generator key.
184
+ /// The semaphore entry is inserted with this seq number as key.
185
+ async fn enqueue_permit_entry (
186
+ & mut self ,
187
+ permit_entry : & PermitEntry ,
188
+ seq_generator_key : & str ,
189
+ ) -> Result < PermitKey , ConnectionClosed > {
190
+ let mut sleep_time = Duration :: from_millis ( 50 ) ;
191
+ let max_sleep_time = Duration :: from_secs ( 2 ) ;
192
+
193
+ let val_bytes = permit_entry
194
+ . encode_to_vec ( )
195
+ . map_err ( |e| conn_io_error ( e, "encode semaphore entry" ) . context ( & self . ctx ) ) ?;
196
+
197
+ loop {
106
198
// Step 1: Get a new globally unique sequence number.
107
- let sem_seq = self . next_global_unique_seq ( ) . await ?;
199
+ let sem_seq = self . next_global_unique_seq ( seq_generator_key ) . await ?;
108
200
109
201
self . stat . on_finish_get_seq ( ) ;
110
202
@@ -121,7 +213,7 @@ impl Acquirer {
121
213
122
214
let txn = pb:: TxnRequest :: default ( ) ;
123
215
124
- let cond = pb:: TxnCondition :: eq_seq ( & self . seq_generator_key , sem_seq) ;
216
+ let cond = pb:: TxnCondition :: eq_seq ( seq_generator_key, sem_seq) ;
125
217
let op = pb:: TxnOp :: put_with_ttl ( & sem_key_str, val_bytes. clone ( ) , Some ( self . lease ) ) ;
126
218
let txn = txn. push_if_then ( [ cond] , [ op] ) ;
127
219
@@ -144,7 +236,7 @@ impl Acquirer {
144
236
self . acquirer_id, sem_seq
145
237
) ;
146
238
self . stat . on_insert_seq ( sem_key. seq ) ;
147
- break sem_key;
239
+ return Ok ( sem_key) ;
148
240
} else {
149
241
info ! (
150
242
"acquire semaphore: enqueue conflict: acquirer: {}, sem_seq: {}; sleep {:?} and retry" ,
@@ -154,70 +246,76 @@ impl Acquirer {
154
246
tokio:: time:: sleep ( sleep_time) . await ;
155
247
sleep_time = std:: cmp:: min ( sleep_time * 3 / 2 , max_sleep_time) ;
156
248
}
157
- } ;
249
+ }
250
+ }
158
251
159
- // Step 3: The sem entry is inserted, keep it alive by extending the lease.
252
+ /// Add a new semaphore entry to the `<prefix>/queue` in the meta-service.
253
+ ///
254
+ /// In this method, the sequence number is generated with current timestamp,
255
+ /// which means it may not be globally unique, on which case it will be retried,
256
+ /// and it may not be in strict order, meaning more semaphore entries than the `capacity` may be acquired.
257
+ async fn enqueue_time_based_permit_entry (
258
+ & mut self ,
259
+ permit_entry : & PermitEntry ,
260
+ ) -> Result < PermitKey , ConnectionClosed > {
261
+ let val_bytes = permit_entry
262
+ . encode_to_vec ( )
263
+ . map_err ( |e| conn_io_error ( e, "encode semaphore entry" ) . context ( & self . ctx ) ) ?;
160
264
161
- let leaser_cancel_tx = self . spawn_extend_lease_task ( permit_key. clone ( ) , val_bytes) ;
265
+ loop {
266
+ let sem_seq = SystemTime :: now ( )
267
+ . duration_since ( UNIX_EPOCH )
268
+ . unwrap ( )
269
+ . as_micros ( ) as u64 ;
162
270
163
- // Step 4: Wait for the semaphore to be acquired or removed.
271
+ self . stat . on_finish_get_seq ( ) ;
164
272
165
- while let Some ( sem_event) = self . permit_event_rx . recv ( ) . await {
166
- info ! (
167
- "Acquirer({}): received semaphore event: {:?}" ,
168
- self . ctx, sem_event
169
- ) ;
273
+ // Create a new semaphore entry with the key format `{prefix}/queue/{seq:020}`.
274
+ let sem_key = PermitKey :: new ( self . prefix . clone ( ) , sem_seq) ;
275
+ let sem_key_str = sem_key. format_key ( ) ;
170
276
171
- self . stat . on_receive_event ( & sem_event ) ;
277
+ let upsert = UpsertKV :: insert ( & sem_key_str , & val_bytes ) . with_ttl ( self . lease ) ;
172
278
173
- match sem_event {
174
- PermitEvent :: Acquired ( ( seq, _) ) => {
175
- if seq == permit_key. seq {
176
- self . stat . on_acquire ( ) ;
279
+ let res = self . meta_client . upsert_kv ( upsert) . await ;
280
+ let resp = res. map_err ( |e| {
281
+ conn_io_error (
282
+ e,
283
+ format ! (
284
+ "insert semaphore (seq={} entry={}) in transaction" ,
285
+ & sem_key, & permit_entry
286
+ ) ,
287
+ )
288
+ . context ( & self . ctx )
289
+ } ) ?;
177
290
178
- info ! (
179
- "{} acquired: {}->{}" ,
180
- self . ctx, permit_key, self . acquirer_id
181
- ) ;
182
- break ;
183
- }
184
- }
185
- PermitEvent :: Removed ( ( seq, _) ) => {
186
- if seq == permit_key. seq {
187
- self . stat . on_remove ( ) ;
291
+ self . stat . on_finish_try_insert_seq ( ) ;
188
292
189
- warn ! (
190
- "semaphore removed before acquired: {}->{}" ,
191
- permit_key, self . acquirer_id
192
- ) ;
193
- return Err ( AcquireError :: EarlyRemoved ( EarlyRemoved :: new (
194
- permit_key. clone ( ) ,
195
- permit_entry. clone ( ) ,
196
- ) ) ) ;
197
- }
198
- }
293
+ if resp. result . is_some ( ) {
294
+ info ! (
295
+ "acquire semaphore: enqueue done: acquirer_id: {}, sem_seq: {}" ,
296
+ self . acquirer_id, sem_seq
297
+ ) ;
298
+ self . stat . on_insert_seq ( sem_key. seq ) ;
299
+ return Ok ( sem_key) ;
300
+ } else {
301
+ info ! (
302
+ "acquire semaphore: enqueue failed(duplicated key): acquirer: {}, sem_seq: {}; retry at once" ,
303
+ self . acquirer_id, sem_seq
304
+ ) ;
199
305
}
200
306
}
201
-
202
- let permit = Permit :: new (
203
- self . permit_event_rx ,
204
- permit_key,
205
- permit_entry,
206
- self . stat ,
207
- self . subscriber_cancel_tx ,
208
- leaser_cancel_tx,
209
- ) ;
210
-
211
- Ok ( permit)
212
307
}
213
308
214
309
/// Gets a new globally unique sequence number by updating a key in the meta-service.
215
310
///
216
311
/// This method uses the meta client to perform an upsert operation on the sequence
217
312
/// generator key, which atomically increments the sequence number and returns the
218
313
/// new value.
219
- async fn next_global_unique_seq ( & self ) -> Result < u64 , ConnectionClosed > {
220
- let upsert = UpsertKV :: update ( & self . seq_generator_key , b"" ) ;
314
+ async fn next_global_unique_seq (
315
+ & self ,
316
+ seq_generator_key : & str ,
317
+ ) -> Result < u64 , ConnectionClosed > {
318
+ let upsert = UpsertKV :: update ( seq_generator_key, b"" ) ;
221
319
222
320
let res = self . meta_client . upsert_kv ( upsert) . await ;
223
321
let resp = res. map_err ( |e| {
0 commit comments