Skip to content

Commit bb430f3

Browse files
authored
chore(query): make async function works within local counters (#18178)
* chore(query): make async function works in accumulating mode * chore(query): make async function works in accumulating mode * chore(query): make async function works in accumulating mode * chore(query): add settings sequence_step_size
1 parent d6fe352 commit bb430f3

File tree

10 files changed

+221
-38
lines changed

10 files changed

+221
-38
lines changed

src/query/service/src/pipelines/builders/builder_async_function.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@ impl PipelineBuilder {
2424
self.build_pipeline(&async_function.input)?;
2525

2626
let operators = TransformAsyncFunction::init_operators(&async_function.async_func_descs)?;
27+
let sequence_counters =
28+
TransformAsyncFunction::create_sequence_counters(async_function.async_func_descs.len());
29+
2730
self.main_pipeline.add_async_transformer(|| {
2831
TransformAsyncFunction::new(
2932
self.ctx.clone(),
3033
async_function.async_func_descs.clone(),
3134
operators.clone(),
35+
sequence_counters.clone(),
3236
)
3337
});
3438

src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,15 @@ impl PipelineBuilder {
5252
default_expr_binder
5353
.split_async_default_exprs(source_schema.clone(), default_schema.clone())?
5454
{
55+
let sequence_counters =
56+
TransformAsyncFunction::create_sequence_counters(async_funcs.len());
57+
5558
pipeline.try_add_async_transformer(|| {
5659
Ok(TransformAsyncFunction::new(
5760
ctx.clone(),
5861
async_funcs.clone(),
5962
BTreeMap::new(),
63+
sequence_counters.clone(),
6064
))
6165
})?;
6266
if new_default_schema != new_default_schema_no_cast {

src/query/service/src/pipelines/builders/builder_mutation.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::pipelines::processors::transforms::build_expression_transform;
4646
use crate::pipelines::processors::transforms::AsyncFunctionBranch;
4747
use crate::pipelines::processors::transforms::CastSchemaBranch;
4848
use crate::pipelines::processors::transforms::TransformAddComputedColumns;
49+
use crate::pipelines::processors::transforms::TransformAsyncFunction;
4950
use crate::pipelines::processors::transforms::TransformBranchedAsyncFunction;
5051
use crate::pipelines::processors::transforms::TransformBranchedCastSchema;
5152
use crate::pipelines::processors::transforms::TransformResortAddOnWithoutSourceSchema;
@@ -159,8 +160,11 @@ impl PipelineBuilder {
159160
default_expr_binder
160161
.split_async_default_exprs(input_schema.clone(), default_schema.clone())?
161162
{
163+
let sequence_counters =
164+
TransformAsyncFunction::create_sequence_counters(async_funcs.len());
162165
async_function_branches.insert(idx, AsyncFunctionBranch {
163166
async_func_descs: async_funcs,
167+
sequence_counters,
164168
});
165169

166170
if new_default_schema != new_default_schema_no_cast {
@@ -201,6 +205,7 @@ impl PipelineBuilder {
201205

202206
if !async_function_branches.is_empty() {
203207
let branches = Arc::new(async_function_branches);
208+
204209
let mut builder = self
205210
.main_pipeline
206211
.try_create_async_transform_pipeline_builder_with_len(

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub use transform_add_computed_columns::TransformAddComputedColumns;
5050
pub use transform_add_const_columns::TransformAddConstColumns;
5151
pub use transform_add_internal_columns::TransformAddInternalColumns;
5252
pub use transform_add_stream_columns::TransformAddStreamColumns;
53+
pub use transform_async_function::SequenceCounters;
5354
pub use transform_async_function::TransformAsyncFunction;
5455
pub use transform_branched_async_function::AsyncFunctionBranch;
5556
pub use transform_branched_async_function::TransformBranchedAsyncFunction;

src/query/service/src/pipelines/processors/transforms/transform_async_function.rs

Lines changed: 157 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::sync::atomic::AtomicU64;
17+
use std::sync::atomic::Ordering;
1618
use std::sync::Arc;
1719

20+
use databend_common_base::base::tokio::sync::RwLock;
1821
use databend_common_exception::Result;
19-
use databend_common_expression::types::DataType;
2022
use databend_common_expression::types::UInt64Type;
2123
use databend_common_expression::DataBlock;
2224
use databend_common_expression::FromData;
@@ -30,34 +32,181 @@ use crate::sessions::QueryContext;
3032
use crate::sql::executor::physical_plans::AsyncFunctionDesc;
3133
use crate::sql::plans::AsyncFunctionArgument;
3234

35+
// Structure to manage sequence numbers in batches
36+
pub struct SequenceCounter {
37+
// Current sequence number
38+
current: AtomicU64,
39+
// Maximum sequence number in the current batch
40+
max: AtomicU64,
41+
}
42+
43+
impl SequenceCounter {
44+
fn new() -> Self {
45+
Self {
46+
current: AtomicU64::new(0),
47+
max: AtomicU64::new(0),
48+
}
49+
}
50+
51+
// Try to reserve a range of sequence numbers
52+
fn try_reserve(&self, count: u64) -> Option<(u64, u64)> {
53+
if self.current.load(Ordering::Relaxed) == 0 {
54+
return None;
55+
}
56+
57+
let current = self.current.load(Ordering::Relaxed);
58+
let max = self.max.load(Ordering::Relaxed);
59+
60+
// Check if we have enough sequence numbers in the current batch
61+
if current + count <= max {
62+
let new_current = current + count;
63+
if self
64+
.current
65+
.compare_exchange(current, new_current, Ordering::SeqCst, Ordering::Relaxed)
66+
.is_ok()
67+
{
68+
// Successfully reserved the range
69+
return Some((current, new_current));
70+
}
71+
}
72+
73+
// Failed to reserve
74+
None
75+
}
76+
77+
// Update the counter with a new batch of sequence numbers
78+
fn update_batch(&self, start: u64, count: u64) {
79+
self.current.store(start, Ordering::SeqCst);
80+
self.max.store(start + count, Ordering::SeqCst);
81+
}
82+
}
83+
84+
// Shared sequence counters type
85+
pub type SequenceCounters = Vec<Arc<RwLock<SequenceCounter>>>;
86+
3387
pub struct TransformAsyncFunction {
3488
ctx: Arc<QueryContext>,
3589
// key is the index of async_func_desc
3690
pub(crate) operators: BTreeMap<usize, Arc<DictionaryOperator>>,
3791
async_func_descs: Vec<AsyncFunctionDesc>,
92+
// Shared map of sequence name to sequence counter
93+
pub(crate) sequence_counters: SequenceCounters,
3894
}
3995

4096
impl TransformAsyncFunction {
97+
// New constructor that accepts a shared sequence counters map
4198
pub(crate) fn new(
4299
ctx: Arc<QueryContext>,
43100
async_func_descs: Vec<AsyncFunctionDesc>,
44101
operators: BTreeMap<usize, Arc<DictionaryOperator>>,
102+
sequence_counters: SequenceCounters,
45103
) -> Self {
46104
Self {
47105
ctx,
48106
async_func_descs,
49107
operators,
108+
sequence_counters,
50109
}
51110
}
52111

112+
// Create a new shared sequence counters map
113+
pub(crate) fn create_sequence_counters(size: usize) -> SequenceCounters {
114+
(0..size)
115+
.map(|_| Arc::new(RwLock::new(SequenceCounter::new())))
116+
.collect()
117+
}
118+
53119
// transform add sequence nextval column.
54-
async fn transform_sequence(
55-
&self,
120+
pub async fn transform_sequence(
121+
ctx: Arc<QueryContext>,
56122
data_block: &mut DataBlock,
123+
counter_lock: Arc<RwLock<SequenceCounter>>,
57124
sequence_name: &String,
58-
data_type: &DataType,
59125
) -> Result<()> {
60-
transform_sequence(&self.ctx, data_block, sequence_name, data_type).await
126+
let count = data_block.num_rows() as u64;
127+
let column = if count == 0 {
128+
UInt64Type::from_data(vec![])
129+
} else {
130+
// Get or create the sequence counter
131+
let counter = counter_lock.read().await;
132+
133+
// Try to reserve sequence numbers from the counter
134+
if let Some((start, _end)) = counter.try_reserve(count) {
135+
// We have enough sequence numbers in the current batch
136+
let range = start..start + count;
137+
UInt64Type::from_data(range.collect::<Vec<u64>>())
138+
} else {
139+
// drop the read lock and get the write lock
140+
drop(counter);
141+
let counter = counter_lock.write().await;
142+
{
143+
// try reserve again
144+
if let Some((start, _end)) = counter.try_reserve(count) {
145+
// We have enough sequence numbers in the current batch
146+
let range = start..start + count;
147+
UInt64Type::from_data(range.collect::<Vec<u64>>())
148+
} else {
149+
// We need to fetch more sequence numbers
150+
let tenant = ctx.get_tenant();
151+
let catalog = ctx.get_default_catalog()?;
152+
153+
// Get current state of the counter
154+
let current = counter.current.load(Ordering::Relaxed);
155+
let max = counter.max.load(Ordering::Relaxed);
156+
// Calculate how many sequence numbers we need to fetch
157+
// If there are remaining numbers, we'll use them first
158+
let remaining = max.saturating_sub(current);
159+
let to_fetch = count.saturating_sub(remaining);
160+
161+
let step_size = ctx.get_settings().get_sequence_step_size()?;
162+
let batch_size = to_fetch.max(step_size);
163+
164+
// Calculate batch size - take the larger of count or step_size
165+
let req = GetSequenceNextValueReq {
166+
ident: SequenceIdent::new(&tenant, sequence_name),
167+
count: batch_size,
168+
};
169+
170+
let resp = catalog.get_sequence_next_value(req).await?;
171+
let start = resp.start;
172+
173+
// If we have remaining numbers, use them first
174+
if remaining > 0 {
175+
// Then add the new batch after the remaining numbers
176+
counter.update_batch(start, batch_size);
177+
178+
// Return a combined range: first the remaining numbers, then the new ones
179+
let mut numbers = Vec::with_capacity(count as usize);
180+
181+
// Add the remaining numbers
182+
let remaining_to_use = remaining.min(count);
183+
numbers.extend(
184+
(current..current + remaining_to_use).collect::<Vec<u64>>(),
185+
);
186+
187+
// Add numbers from the new batch if needed
188+
if remaining_to_use < count {
189+
let new_needed = count - remaining_to_use;
190+
numbers.extend((start..start + new_needed).collect::<Vec<u64>>());
191+
// Update the counter to reflect that we've used some of the new batch
192+
counter.current.store(start + new_needed, Ordering::SeqCst);
193+
}
194+
195+
UInt64Type::from_data(numbers)
196+
} else {
197+
// No remaining numbers, just use the new batch
198+
counter.update_batch(start + count, batch_size - count);
199+
// Return the sequence numbers needed for this request
200+
let range = start..start + count;
201+
UInt64Type::from_data(range.collect::<Vec<u64>>())
202+
}
203+
}
204+
}
205+
}
206+
};
207+
208+
data_block.add_column(column);
209+
Ok(())
61210
}
62211
}
63212

@@ -70,10 +219,11 @@ impl AsyncTransform for TransformAsyncFunction {
70219
for (i, async_func_desc) in self.async_func_descs.iter().enumerate() {
71220
match &async_func_desc.func_arg {
72221
AsyncFunctionArgument::SequenceFunction(sequence_name) => {
73-
self.transform_sequence(
222+
Self::transform_sequence(
223+
self.ctx.clone(),
74224
&mut data_block,
225+
self.sequence_counters[i].clone(),
75226
sequence_name,
76-
&async_func_desc.data_type,
77227
)
78228
.await?;
79229
}
@@ -92,28 +242,3 @@ impl AsyncTransform for TransformAsyncFunction {
92242
Ok(data_block)
93243
}
94244
}
95-
96-
pub async fn transform_sequence(
97-
ctx: &Arc<QueryContext>,
98-
data_block: &mut DataBlock,
99-
sequence_name: &String,
100-
_data_type: &DataType,
101-
) -> Result<()> {
102-
let count = data_block.num_rows() as u64;
103-
let column = if count == 0 {
104-
UInt64Type::from_data(vec![])
105-
} else {
106-
let tenant = ctx.get_tenant();
107-
let catalog = ctx.get_default_catalog()?;
108-
let req = GetSequenceNextValueReq {
109-
ident: SequenceIdent::new(&tenant, sequence_name),
110-
count,
111-
};
112-
let resp = catalog.get_sequence_next_value(req).await?;
113-
let range = resp.start..resp.start + count;
114-
UInt64Type::from_data(range.collect::<Vec<u64>>())
115-
};
116-
data_block.add_column(column);
117-
118-
Ok(())
119-
}

src/query/service/src/pipelines/processors/transforms/transform_branched_async_function.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use databend_common_expression::DataBlock;
2121
use databend_common_expression::SourceSchemaIndex;
2222
use databend_common_pipeline_transforms::processors::AsyncTransform;
2323

24-
use crate::pipelines::processors::transforms::transform_async_function::transform_sequence;
24+
use crate::pipelines::processors::transforms::transform_async_function::SequenceCounters;
25+
use crate::pipelines::processors::transforms::TransformAsyncFunction;
2526
use crate::sessions::QueryContext;
2627
use crate::sql::executor::physical_plans::AsyncFunctionDesc;
2728
use crate::sql::plans::AsyncFunctionArgument;
@@ -34,6 +35,7 @@ pub struct TransformBranchedAsyncFunction {
3435

3536
pub struct AsyncFunctionBranch {
3637
pub async_func_descs: Vec<AsyncFunctionDesc>,
38+
pub sequence_counters: SequenceCounters,
3739
}
3840

3941
#[async_trait::async_trait]
@@ -53,16 +55,20 @@ impl AsyncTransform for TransformBranchedAsyncFunction {
5355
return Ok(block);
5456
};
5557

56-
let AsyncFunctionBranch { async_func_descs } = branch;
58+
let AsyncFunctionBranch {
59+
async_func_descs,
60+
sequence_counters,
61+
} = branch;
5762

58-
for async_func_desc in async_func_descs.iter() {
63+
for (i, async_func_desc) in async_func_descs.iter().enumerate() {
5964
match &async_func_desc.func_arg {
6065
AsyncFunctionArgument::SequenceFunction(sequence_name) => {
61-
transform_sequence(
62-
&self.ctx,
66+
let counter_lock = sequence_counters[i].clone();
67+
TransformAsyncFunction::transform_sequence(
68+
self.ctx.clone(),
6369
&mut block,
70+
counter_lock,
6471
sequence_name,
65-
&async_func_desc.data_type,
6672
)
6773
.await?;
6874
}

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ impl DefaultSettings {
150150
scope: SettingScope::Both,
151151
range: Some(SettingRange::Numeric(1..=u64::MAX)),
152152
}),
153+
("sequence_step_size", DefaultSettingValue {
154+
value: UserSettingValue::UInt64(65536),
155+
desc: "Sets the sequence step size for nextval function.",
156+
mode: SettingMode::Both,
157+
scope: SettingScope::Both,
158+
range: Some(SettingRange::Numeric(1..=u64::MAX)),
159+
}),
153160
("week_start", DefaultSettingValue {
154161
value: UserSettingValue::UInt64(1),
155162
desc: "Specifies the first day of the week.(Used by week-related date functions)",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ impl Settings {
192192
self.try_set_u64("max_block_size", val)
193193
}
194194

195+
// Get sequence_step_size.
196+
pub fn get_sequence_step_size(&self) -> Result<u64> {
197+
self.try_get_u64("sequence_step_size")
198+
}
199+
195200
// Max block size for parquet reader
196201
pub fn get_parquet_max_block_size(&self) -> Result<u64> {
197202
self.try_get_u64("parquet_max_block_size")

0 commit comments

Comments
 (0)