Skip to content

Commit 84d2501

Browse files
committed
better queue batch size limits
1 parent 32e4db5 commit 84d2501

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

src/batch_queue.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use delta_kernel::arrow::record_batch::RecordBatch;
33
use std::sync::Arc;
44
use std::time::Duration;
55
use tokio::sync::mpsc;
6-
use tokio_stream::StreamExt;
76
use tokio_stream::wrappers::ReceiverStream;
7+
use tokio_stream::StreamExt;
88
use tracing::{error, info};
99

1010
#[derive(Debug)]
@@ -16,7 +16,10 @@ pub struct BatchQueue {
1616
impl BatchQueue {
1717
pub fn new(db: Arc<crate::database::Database>, interval_ms: u64, max_rows: usize) -> Self {
1818
// Make channel capacity configurable via environment variable
19-
let channel_capacity = std::env::var("TIMEFUSION_BATCH_QUEUE_CAPACITY").unwrap_or_else(|_| "1000".to_string()).parse::<usize>().unwrap_or(1000);
19+
let channel_capacity = std::env::var("TIMEFUSION_BATCH_QUEUE_CAPACITY")
20+
.unwrap_or_else(|_| "100000000".to_string())
21+
.parse::<usize>()
22+
.unwrap_or(100_000_000);
2023

2124
let (tx, rx) = mpsc::channel(channel_capacity);
2225
let shutdown = tokio_util::sync::CancellationToken::new();

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use dotenv::dotenv;
66
use std::{env, sync::Arc};
77
use timefusion::batch_queue::BatchQueue;
88
use timefusion::database::Database;
9-
use tokio::time::{Duration, sleep};
9+
use tokio::time::{sleep, Duration};
1010
use tracing::{error, info};
1111
use tracing_subscriber::EnvFilter;
1212

@@ -24,8 +24,8 @@ async fn main() -> anyhow::Result<()> {
2424

2525
// Setup batch processing with configurable params
2626
let interval_ms = env::var("BATCH_INTERVAL_MS").ok().and_then(|v| v.parse().ok()).unwrap_or(1000);
27-
let max_size = env::var("MAX_BATCH_SIZE").ok().and_then(|v| v.parse().ok()).unwrap_or(1000);
28-
let enable_queue = env::var("ENABLE_BATCH_QUEUE").unwrap_or_else(|_| "false".to_string()) == "true";
27+
let max_size = env::var("MAX_BATCH_SIZE").ok().and_then(|v| v.parse().ok()).unwrap_or(100_000);
28+
let enable_queue = env::var("ENABLE_BATCH_QUEUE").unwrap_or_else(|_| "true".to_string()) == "true";
2929

3030
// Create batch queue
3131
let batch_queue = Arc::new(BatchQueue::new(Arc::new(db.clone()), interval_ms, max_size));

0 commit comments

Comments
 (0)