|
3 | 3 | //! This module implements the main sink that orchestrates the flow of events |
4 | 4 | //! from Vector to Delta Lake tables. |
5 | 5 |
|
6 | | -use std::num::NonZeroUsize; |
| 6 | +use std::sync::Arc; |
| 7 | + |
| 8 | +use tracing::Span; |
7 | 9 |
|
8 | 10 | use crate::sinks::prelude::*; |
9 | 11 | use crate::sinks::util::builder::SinkBuilderExt; |
| 12 | +use crate::sinks::util::request_builder::default_request_builder_concurrency_limit; |
10 | 13 |
|
11 | 14 | use super::request_builder::{DeltaLakeRequest, DeltaLakeRequestBuilder}; |
12 | 15 |
|
@@ -48,18 +51,22 @@ where |
48 | 51 |
|
49 | 52 | async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> { |
50 | 53 | let batch_settings = self.batch_settings.as_byte_size_config(); |
51 | | - let request_builder = self.request_builder; |
52 | | - let concurrency = NonZeroUsize::new(8).expect("static"); |
| 54 | + let request_builder = Arc::new(self.request_builder); |
| 55 | + let concurrency = default_request_builder_concurrency_limit(); |
| 56 | + |
| 57 | + // Preserve span context for tracing propagation into concurrent tasks |
| 58 | + let span = Arc::new(Span::current()); |
53 | 59 |
|
54 | 60 | input |
55 | | - // Batch events by size/time |
56 | 61 | .batched(batch_settings) |
57 | | - // Build requests (convert directly to RecordBatch - no Parquet round-trip) |
58 | 62 | .concurrent_map(concurrency, move |events| { |
59 | | - let builder = request_builder.clone(); |
60 | | - Box::pin(async move { builder.build_request(events) }) |
| 63 | + let builder = Arc::clone(&request_builder); |
| 64 | + let span = Arc::clone(&span); |
| 65 | + Box::pin(async move { |
| 66 | + let _entered = span.enter(); |
| 67 | + builder.build_request(events) |
| 68 | + }) |
61 | 69 | }) |
62 | | - // Filter out failed request builds |
63 | 70 | .filter_map(|request| async { |
64 | 71 | match request { |
65 | 72 | Err(error) => { |
|
69 | 76 | Ok(req) => Some(req), |
70 | 77 | } |
71 | 78 | }) |
72 | | - // Send to Delta Lake service |
73 | 79 | .into_driver(self.service) |
74 | 80 | .run() |
75 | 81 | .await |
|
0 commit comments