Skip to content

Commit e9ca7eb

Browse files
burmeciaCopilot
andauthored
chore(s3vectors): add batch_size server option (#527)
* chore(s3vectors): add batch_size server option * Update docs/catalog/s3vectors.md Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 0b5b9a9 commit e9ca7eb

File tree

3 files changed

+39
-18
lines changed

3 files changed

+39
-18
lines changed

docs/catalog/s3vectors.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ We need to provide Postgres with the credentials to connect to S3 Vectors. We ca
9898
);
9999
```
100100

101+
#### Additional Server Options
102+
103+
- `batch_size` - Controls the batch size of vectors read from or written to remote. Minimum value of 1, maximum value of 500, default value of 300.
104+
101105
### Create a schema
102106

103107
We recommend creating a schema to hold all the foreign tables:

wrappers/src/fdw/s3vectors_fdw/s3vectors_fdw.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub(crate) struct S3VectorsFdw {
3232
tgt_cols: Vec<Column>,
3333
quals: Vec<Qual>,
3434
row_limit: Option<i64>,
35+
batch_size: usize,
3536

3637
// for vectors selection
3738
vectors_stream: Option<PaginationStream<Result<ListVectorsOutput, SdkError<ListVectorsError>>>>,
@@ -69,7 +70,7 @@ impl S3VectorsFdw {
6970
.list_vectors()
7071
.set_vector_bucket_name(self.bucket_name.clone())
7172
.set_index_name(self.index_name.clone())
72-
.set_max_results(Some(500))
73+
.set_max_results(Some(self.batch_size as _))
7374
.set_return_data(Some(true))
7475
.set_return_metadata(Some(true))
7576
.into_paginator()
@@ -197,12 +198,36 @@ impl S3VectorsFdw {
197198

198199
Err(S3VectorsFdwError::QueryNotSupported)
199200
}
201+
202+
fn flush_vectors(&mut self) -> S3VectorsFdwResult<()> {
203+
if self.insert_vectors.is_empty() {
204+
return Ok(());
205+
}
206+
207+
let _ = self.rt.block_on(
208+
self.client
209+
.put_vectors()
210+
.set_vector_bucket_name(self.bucket_name.clone())
211+
.set_index_name(self.index_name.clone())
212+
.set_vectors(Some(self.insert_vectors.clone()))
213+
.send(),
214+
)?;
215+
216+
self.insert_vectors.clear();
217+
218+
Ok(())
219+
}
200220
}
201221

202222
impl ForeignDataWrapper<S3VectorsFdwError> for S3VectorsFdw {
203223
fn new(server: ForeignServer) -> S3VectorsFdwResult<Self> {
204224
let rt = create_async_runtime()?;
205225

226+
let batch_size = require_option_or("batch_size", &server.options, "300")
227+
.parse::<usize>()
228+
.unwrap_or(300)
229+
.clamp(1, 500);
230+
206231
// get AWS credentials from server options
207232
let creds = {
208233
match server.options.get("vault_access_key_id") {
@@ -258,6 +283,7 @@ impl ForeignDataWrapper<S3VectorsFdwError> for S3VectorsFdw {
258283
tgt_cols: Vec::new(),
259284
quals: Vec::new(),
260285
row_limit: None,
286+
batch_size,
261287
vectors_stream: None,
262288
curr_vectors: Vec::new(),
263289
scan_initialised: false,
@@ -392,6 +418,10 @@ impl ForeignDataWrapper<S3VectorsFdwError> for S3VectorsFdw {
392418
let vector = builder.build()?;
393419
self.insert_vectors.push(vector);
394420

421+
if self.insert_vectors.len() >= self.batch_size {
422+
self.flush_vectors()?;
423+
}
424+
395425
Ok(())
396426
}
397427

@@ -415,21 +445,7 @@ impl ForeignDataWrapper<S3VectorsFdwError> for S3VectorsFdw {
415445
}
416446

417447
fn end_modify(&mut self) -> S3VectorsFdwResult<()> {
418-
if self.insert_vectors.is_empty() {
419-
return Ok(());
420-
}
421-
422-
let _ = self.rt.block_on(
423-
self.client
424-
.put_vectors()
425-
.set_vector_bucket_name(self.bucket_name.clone())
426-
.set_index_name(self.index_name.clone())
427-
.set_vectors(Some(self.insert_vectors.clone()))
428-
.send(),
429-
)?;
430-
431-
self.insert_vectors.clear();
432-
448+
self.flush_vectors()?;
433449
Ok(())
434450
}
435451

@@ -446,7 +462,7 @@ impl ForeignDataWrapper<S3VectorsFdwError> for S3VectorsFdw {
446462
.client
447463
.list_indexes()
448464
.set_vector_bucket_name(Some(bucket_name.to_owned()))
449-
.set_max_results(Some(500));
465+
.set_max_results(Some(self.batch_size as _));
450466

451467
if let Some(token) = next_token {
452468
request = request.set_next_token(Some(token));

wrappers/src/fdw/s3vectors_fdw/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ mod tests {
1919
OPTIONS (
2020
aws_access_key_id 'test',
2121
aws_secret_access_key 'test',
22-
endpoint_url 'http://localhost:4444'
22+
endpoint_url 'http://localhost:4444',
23+
batch_size '200'
2324
)"#,
2425
None,
2526
&[],

0 commit comments

Comments
 (0)