Skip to content

Commit b098f18

Browse files
authored
chore(query): use time base semaphore (#17879)
1 parent c948883 commit b098f18

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

src/meta/store/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,44 @@ impl MetaStore {
125125
}
126126
}
127127
}
128+
129+
pub async fn new_acquired_by_time(
130+
&self,
131+
prefix: impl ToString,
132+
capacity: u64,
133+
id: impl ToString,
134+
lease: Duration,
135+
) -> Result<Permit, AcquireError> {
136+
match self {
137+
MetaStore::L(v) => {
138+
let mut local_lock_map = v.locks.lock().await;
139+
140+
let acquire_res = match local_lock_map.entry(prefix.to_string()) {
141+
Entry::Occupied(v) => v.get().clone(),
142+
Entry::Vacant(v) => v
143+
.insert(Arc::new(TokioSemaphore::new(capacity as usize)))
144+
.clone(),
145+
};
146+
147+
match acquire_res.acquire_owned().await {
148+
Ok(guard) => Ok(Permit {
149+
stat: SharedAcquirerStat::new(),
150+
fu: Box::pin(async move {
151+
let _guard = guard;
152+
Ok(())
153+
}),
154+
}),
155+
Err(_e) => Err(AcquireError::ConnectionClosed(ConnectionClosed::new_str(
156+
"",
157+
))),
158+
}
159+
}
160+
MetaStore::R(grpc_client) => {
161+
Semaphore::new_acquired_by_time(grpc_client.clone(), prefix, capacity, id, lease)
162+
.await
163+
}
164+
}
165+
}
128166
}
129167

130168
#[async_trait::async_trait]

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl<Data: QueueData> QueueManager<Data> {
175175
let start_time = SystemTime::now();
176176
let acquire_res = match self.global_statement_queue {
177177
true => {
178-
let semaphore_acquire = self.meta_store.new_acquired(
178+
let semaphore_acquire = self.meta_store.new_acquired_by_time(
179179
data.get_lock_key(),
180180
self.permits as u64,
181181
data.get_key(), // ID of this acquirer

0 commit comments

Comments
 (0)