Skip to content

Commit afb18a3

Browse files
authored
feat: plumbing source name to SourceFactory (#953)
1 parent d320f23 commit afb18a3

File tree

8 files changed

+14
-4
lines changed

8 files changed

+14
-4
lines changed

src/builder/analyzer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,14 +657,15 @@ impl AnalyzerContext {
657657
let source_factory = get_source_factory(&import_op.spec.source.kind)?;
658658
let (output_type, executor) = source_factory
659659
.build(
660+
&import_op.name,
660661
serde_json::Value::Object(import_op.spec.source.spec),
661662
self.flow_ctx.clone(),
662663
)
663664
.await?;
664665

665-
let op_name = import_op.name.clone();
666+
let op_name = import_op.name;
666667
let primary_key_schema = Box::from(output_type.typ.key_schema());
667-
let output = op_scope.add_op_output(import_op.name, output_type)?;
668+
let output = op_scope.add_op_output(op_name.clone(), output_type)?;
668669

669670
let concur_control_options = import_op
670671
.spec

src/ops/factory_bases.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static {
218218

219219
async fn build_executor(
220220
self: Arc<Self>,
221+
source_name: &str,
221222
spec: Self::Spec,
222223
context: Arc<FlowInstanceContext>,
223224
) -> Result<Box<dyn SourceExecutor>>;
@@ -237,6 +238,7 @@ pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static {
237238
impl<T: SourceFactoryBase> SourceFactory for T {
238239
async fn build(
239240
self: Arc<Self>,
241+
source_name: &str,
240242
spec: serde_json::Value,
241243
context: Arc<FlowInstanceContext>,
242244
) -> Result<(
@@ -245,8 +247,9 @@ impl<T: SourceFactoryBase> SourceFactory for T {
245247
)> {
246248
let spec: T::Spec = serde_json::from_value(spec)?;
247249
let output_schema = self.get_output_schema(&spec, &context).await?;
248-
let executor = self.build_executor(spec, context);
249-
Ok((output_schema, executor))
250+
let source_name = source_name.to_string();
251+
let executor = async move { self.build_executor(&source_name, spec, context).await };
252+
Ok((output_schema, Box::pin(executor)))
250253
}
251254
}
252255

src/ops/interface.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ pub trait SourceExecutor: Send + Sync {
162162
pub trait SourceFactory {
163163
async fn build(
164164
self: Arc<Self>,
165+
source_name: &str,
165166
spec: serde_json::Value,
166167
context: Arc<FlowInstanceContext>,
167168
) -> Result<(

src/ops/sources/amazon_s3.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ impl SourceFactoryBase for Factory {
331331

332332
async fn build_executor(
333333
self: Arc<Self>,
334+
_source_name: &str,
334335
spec: Spec,
335336
_context: Arc<FlowInstanceContext>,
336337
) -> Result<Box<dyn SourceExecutor>> {

src/ops/sources/azure_blob.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ impl SourceFactoryBase for Factory {
213213

214214
async fn build_executor(
215215
self: Arc<Self>,
216+
_source_name: &str,
216217
spec: Spec,
217218
context: Arc<FlowInstanceContext>,
218219
) -> Result<Box<dyn SourceExecutor>> {

src/ops/sources/google_drive.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ impl SourceFactoryBase for Factory {
494494

495495
async fn build_executor(
496496
self: Arc<Self>,
497+
_source_name: &str,
497498
spec: Spec,
498499
_context: Arc<FlowInstanceContext>,
499500
) -> Result<Box<dyn SourceExecutor>> {

src/ops/sources/local_file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl SourceFactoryBase for Factory {
163163

164164
async fn build_executor(
165165
self: Arc<Self>,
166+
_source_name: &str,
166167
spec: Spec,
167168
_context: Arc<FlowInstanceContext>,
168169
) -> Result<Box<dyn SourceExecutor>> {

src/ops/sources/postgres.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ impl SourceFactoryBase for Factory {
774774

775775
async fn build_executor(
776776
self: Arc<Self>,
777+
_source_name: &str,
777778
spec: Spec,
778779
context: Arc<FlowInstanceContext>,
779780
) -> Result<Box<dyn SourceExecutor>> {

0 commit comments

Comments
 (0)