diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index a79b146bf..9e36b014f 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -657,14 +657,15 @@ impl AnalyzerContext { let source_factory = get_source_factory(&import_op.spec.source.kind)?; let (output_type, executor) = source_factory .build( + &import_op.name, serde_json::Value::Object(import_op.spec.source.spec), self.flow_ctx.clone(), ) .await?; - let op_name = import_op.name.clone(); + let op_name = import_op.name; let primary_key_schema = Box::from(output_type.typ.key_schema()); - let output = op_scope.add_op_output(import_op.name, output_type)?; + let output = op_scope.add_op_output(op_name.clone(), output_type)?; let concur_control_options = import_op .spec diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 3570c7ae5..dd8bbc23d 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -218,6 +218,7 @@ pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static { async fn build_executor( self: Arc, + source_name: &str, spec: Self::Spec, context: Arc, ) -> Result>; @@ -237,6 +238,7 @@ pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static { impl SourceFactory for T { async fn build( self: Arc, + source_name: &str, spec: serde_json::Value, context: Arc, ) -> Result<( @@ -245,8 +247,9 @@ impl SourceFactory for T { )> { let spec: T::Spec = serde_json::from_value(spec)?; let output_schema = self.get_output_schema(&spec, &context).await?; - let executor = self.build_executor(spec, context); - Ok((output_schema, executor)) + let source_name = source_name.to_string(); + let executor = async move { self.build_executor(&source_name, spec, context).await }; + Ok((output_schema, Box::pin(executor))) } } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 1902025ed..ad05b7f2d 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -162,6 +162,7 @@ pub trait SourceExecutor: Send + Sync { pub trait SourceFactory { async fn build( self: Arc, + source_name: &str, spec: serde_json::Value, context: Arc, ) -> Result<( diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 8c1763557..d07d51858 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -331,6 +331,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, + _source_name: &str, spec: Spec, _context: Arc, ) -> Result> { diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index 053fc2dc7..304ed68b9 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -213,6 +213,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, + _source_name: &str, spec: Spec, context: Arc, ) -> Result> { diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 7560f3a71..4415c8523 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -494,6 +494,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, + _source_name: &str, spec: Spec, _context: Arc, ) -> Result> { diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 29e233ded..3f48e7db8 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -163,6 +163,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, + _source_name: &str, spec: Spec, _context: Arc, ) -> Result> { diff --git a/src/ops/sources/postgres.rs b/src/ops/sources/postgres.rs index 2755f08ff..ac467454e 100644 --- a/src/ops/sources/postgres.rs +++ b/src/ops/sources/postgres.rs @@ -774,6 +774,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, + _source_name: &str, spec: Spec, context: Arc, ) -> Result> {