Skip to content

Commit 15f1894

Browse files
authored
ref(defaults): Make batch size parameters optional (#344)
1 parent cb1edcc commit 15f1894

14 files changed

+130
-61
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,6 @@ utoipa = { version = "5.4.0", default-features = false }
7272
utoipa-swagger-ui = { version = "9.0.2", default-features = false }
7373
uuid = { version = "1.17.0", default-features = false }
7474
x509-cert = { version = "0.2.2", default-features = false }
75+
76+
[profile.bench]
77+
debug = true

etl-api/src/configs/pipeline.rs

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
1-
use crate::configs::store::Store;
21
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
32
use serde::{Deserialize, Serialize};
43
use utoipa::ToSchema;
54

6-
const DEFAULT_BATCH_MAX_SIZE: usize = 1000000;
5+
use crate::configs::store::Store;
6+
7+
const DEFAULT_BATCH_MAX_SIZE: usize = 100000;
78
const DEFAULT_BATCH_MAX_FILL_MS: u64 = 10000;
89
const DEFAULT_TABLE_ERROR_RETRY_DELAY_MS: u64 = 10000;
910
const DEFAULT_MAX_TABLE_SYNC_WORKERS: u16 = 4;
1011

12+
/// Batch processing configuration for pipelines.
13+
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
14+
#[serde(rename_all = "snake_case")]
15+
pub struct ApiBatchConfig {
16+
/// Maximum number of items in a batch for table copy and event streaming.
17+
#[schema(example = 1000)]
18+
pub max_size: Option<usize>,
19+
/// Maximum time, in milliseconds, to wait for a batch to fill before processing.
20+
#[schema(example = 1000)]
21+
pub max_fill_ms: Option<u64>,
22+
}
23+
1124
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
1225
pub struct FullApiPipelineConfig {
1326
#[schema(example = "my_publication")]
1427
pub publication_name: String,
1528
#[serde(skip_serializing_if = "Option::is_none")]
16-
pub batch: Option<BatchConfig>,
29+
pub batch: Option<ApiBatchConfig>,
1730
#[schema(example = 1000)]
1831
#[serde(skip_serializing_if = "Option::is_none")]
1932
pub table_error_retry_delay_ms: Option<u64>,
@@ -26,7 +39,10 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
2639
fn from(value: StoredPipelineConfig) -> Self {
2740
Self {
2841
publication_name: value.publication_name,
29-
batch: Some(value.batch),
42+
batch: Some(ApiBatchConfig {
43+
max_size: Some(value.batch.max_size),
44+
max_fill_ms: Some(value.batch.max_fill_ms),
45+
}),
3046
table_error_retry_delay_ms: Some(value.table_error_retry_delay_ms),
3147
max_table_sync_workers: Some(value.max_table_sync_workers),
3248
}
@@ -40,7 +56,7 @@ pub struct PartialApiPipelineConfig {
4056
pub publication_name: Option<String>,
4157
#[schema(example = r#"{"max_size": 1000000, "max_fill_ms": 10000}"#)]
4258
#[serde(skip_serializing_if = "Option::is_none")]
43-
pub batch: Option<BatchConfig>,
59+
pub batch: Option<ApiBatchConfig>,
4460
#[schema(example = 1000)]
4561
#[serde(skip_serializing_if = "Option::is_none")]
4662
pub table_error_retry_delay_ms: Option<u64>,
@@ -78,8 +94,13 @@ impl StoredPipelineConfig {
7894
self.publication_name = value;
7995
}
8096

81-
if let Some(value) = partial.batch {
82-
self.batch = value;
97+
if let Some(value) = partial.batch
98+
&& let (Some(max_size), Some(max_fill_ms)) = (value.max_size, value.max_fill_ms)
99+
{
100+
self.batch = BatchConfig {
101+
max_size,
102+
max_fill_ms,
103+
};
83104
}
84105

85106
if let Some(value) = partial.table_error_retry_delay_ms {
@@ -96,12 +117,20 @@ impl Store for StoredPipelineConfig {}
96117

97118
impl From<FullApiPipelineConfig> for StoredPipelineConfig {
98119
fn from(value: FullApiPipelineConfig) -> Self {
99-
Self {
100-
publication_name: value.publication_name,
101-
batch: value.batch.unwrap_or(BatchConfig {
120+
let batch = value
121+
.batch
122+
.map(|b| BatchConfig {
123+
max_size: b.max_size.unwrap_or(DEFAULT_BATCH_MAX_SIZE),
124+
max_fill_ms: b.max_fill_ms.unwrap_or(DEFAULT_BATCH_MAX_FILL_MS),
125+
})
126+
.unwrap_or(BatchConfig {
102127
max_size: DEFAULT_BATCH_MAX_SIZE,
103128
max_fill_ms: DEFAULT_BATCH_MAX_FILL_MS,
104-
}),
129+
});
130+
131+
Self {
132+
publication_name: value.publication_name,
133+
batch,
105134
table_error_retry_delay_ms: value
106135
.table_error_retry_delay_ms
107136
.unwrap_or(DEFAULT_TABLE_ERROR_RETRY_DELAY_MS),
@@ -196,9 +225,9 @@ mod tests {
196225

197226
let partial = PartialApiPipelineConfig {
198227
publication_name: Some("new_publication".to_string()),
199-
batch: Some(BatchConfig {
200-
max_size: 2000,
201-
max_fill_ms: 8000,
228+
batch: Some(ApiBatchConfig {
229+
max_size: Some(1000),
230+
max_fill_ms: Some(8000),
202231
}),
203232
table_error_retry_delay_ms: Some(5000),
204233
max_table_sync_workers: None,
@@ -207,7 +236,7 @@ mod tests {
207236
stored.merge(partial);
208237

209238
assert_eq!(stored.publication_name, "new_publication");
210-
assert_eq!(stored.batch.max_size, 2000);
239+
assert_eq!(stored.batch.max_size, 1000);
211240
assert_eq!(stored.batch.max_fill_ms, 8000);
212241
assert_eq!(stored.table_error_retry_delay_ms, 5000);
213242
assert_eq!(stored.max_table_sync_workers, 2);

etl-api/tests/pipelines.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use etl_api::configs::pipeline::ApiBatchConfig;
12
use etl_api::routes::pipelines::{
23
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
34
GetPipelineVersionResponse, ReadPipelineResponse, ReadPipelinesResponse,
45
RollbackTableStateRequest, RollbackTableStateResponse, RollbackType,
56
SimpleTableReplicationState, UpdatePipelineConfigRequest, UpdatePipelineConfigResponse,
67
UpdatePipelineRequest, UpdatePipelineVersionRequest,
78
};
8-
use etl_config::shared::{BatchConfig, PgConnectionConfig};
9+
use etl_config::shared::PgConnectionConfig;
910
use etl_postgres::sqlx::test_utils::drop_pg_database;
1011
use etl_telemetry::tracing::init_test_tracing;
1112
use reqwest::StatusCode;
@@ -755,10 +756,12 @@ async fn pipeline_config_can_be_updated() {
755756

756757
// Act
757758
let update_request = UpdatePipelineConfigRequest {
758-
config: partially_updated_optional_pipeline_config(ConfigUpdateType::Batch(BatchConfig {
759-
max_size: 10_000,
760-
max_fill_ms: 100,
761-
})),
759+
config: partially_updated_optional_pipeline_config(ConfigUpdateType::Batch(
760+
ApiBatchConfig {
761+
max_size: Some(10_000),
762+
max_fill_ms: Some(100),
763+
},
764+
)),
762765
};
763766
let response = app
764767
.update_pipeline_config(&tenant_id, pipeline_id, &update_request)

etl-api/tests/snapshots/destinations_pipelines__an_existing_destination_and_pipeline_can_be_updated-2.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: response.config
55
FullApiPipelineConfig {
66
publication_name: "updated_publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 2000,
10-
max_fill_ms: 10,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
2000,
11+
),
12+
max_fill_ms: Some(
13+
10,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/destinations_pipelines__destination_and_pipeline_can_be_created-2.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: response.config
55
FullApiPipelineConfig {
66
publication_name: "publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 1000,
10-
max_fill_ms: 5,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
1000,
11+
),
12+
max_fill_ms: Some(
13+
5,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/pipelines__all_pipelines_can_be_read-2.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: pipeline.config
55
FullApiPipelineConfig {
66
publication_name: "updated_publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 2000,
10-
max_fill_ms: 10,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
2000,
11+
),
12+
max_fill_ms: Some(
13+
10,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/pipelines__all_pipelines_can_be_read.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: pipeline.config
55
FullApiPipelineConfig {
66
publication_name: "publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 1000,
10-
max_fill_ms: 5,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
1000,
11+
),
12+
max_fill_ms: Some(
13+
5,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/pipelines__an_existing_pipeline_can_be_read.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: response.config
55
FullApiPipelineConfig {
66
publication_name: "publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 1000,
10-
max_fill_ms: 5,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
1000,
11+
),
12+
max_fill_ms: Some(
13+
5,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/pipelines__an_existing_pipeline_can_be_updated.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: response.config
55
FullApiPipelineConfig {
66
publication_name: "updated_publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 2000,
10-
max_fill_ms: 10,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
2000,
11+
),
12+
max_fill_ms: Some(
13+
10,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

etl-api/tests/snapshots/pipelines__pipeline_config_can_be_updated-2.snap

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ expression: response.config
55
FullApiPipelineConfig {
66
publication_name: "publication",
77
batch: Some(
8-
BatchConfig {
9-
max_size: 10000,
10-
max_fill_ms: 100,
8+
ApiBatchConfig {
9+
max_size: Some(
10+
10000,
11+
),
12+
max_fill_ms: Some(
13+
100,
14+
),
1115
},
1216
),
1317
table_error_retry_delay_ms: Some(

0 commit comments

Comments
 (0)