Skip to content

Commit 536b2ae

Browse files
committed
Add refresh_options to ImportOpSpec.
1 parent 8f10543 commit 536b2ae

File tree

4 files changed

+19
-12
lines changed

4 files changed

+19
-12
lines changed

src/base/spec.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use std::ops::Deref;
2-
3-
use serde::{Deserialize, Serialize};
1+
use crate::prelude::*;
42

53
use super::schema::{EnrichedValueType, FieldSchema};
4+
use std::ops::Deref;
65

76
#[derive(Debug, Clone, Serialize, Deserialize)]
87
#[serde(tag = "kind")]
@@ -163,9 +162,17 @@ pub struct OpSpec {
163162
pub spec: serde_json::Map<String, serde_json::Value>,
164163
}
165164

165+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
166+
pub struct SourceRefreshOptions {
167+
pub refresh_interval: Option<chrono::Duration>,
168+
}
169+
166170
#[derive(Debug, Clone, Serialize, Deserialize)]
167171
pub struct ImportOpSpec {
168172
pub source: OpSpec,
173+
174+
#[serde(default)]
175+
pub refresh_options: SourceRefreshOptions,
169176
}
170177

171178
/// Transform data using a given operator.

src/builder/analyzer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,7 @@ impl AnalyzerContext<'_> {
668668
.typ
669669
.clone(),
670670
name: op_name,
671+
refresh_options: import_op.spec.refresh_options,
671672
})
672673
};
673674
Ok(result_fut)

src/builder/flow_builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,13 +380,14 @@ impl FlowBuilder {
380380
self.root_data_scope_ref.clone()
381381
}
382382

383-
#[pyo3(signature = (kind, op_spec, target_scope, name))]
383+
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))]
384384
pub fn add_source(
385385
&mut self,
386386
kind: String,
387387
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
388388
target_scope: Option<DataScopeRef>,
389389
name: String,
390+
refresh_options: Option<py::Pythonized<spec::SourceRefreshOptions>>,
390391
) -> PyResult<DataSlice> {
391392
if let Some(target_scope) = target_scope {
392393
if !Arc::ptr_eq(&target_scope.0, &self.root_data_scope_ref.0) {
@@ -402,6 +403,7 @@ impl FlowBuilder {
402403
kind,
403404
spec: op_spec.into_inner(),
404405
},
406+
refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(),
405407
},
406408
};
407409
let analyzer_ctx = AnalyzerContext {

src/builder/plan.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
1-
use std::sync::Arc;
1+
use crate::prelude::*;
22

3-
use serde::Serialize;
4-
5-
use crate::base::schema::ValueType;
6-
use crate::base::value;
73
use crate::execution::db_tracking_setup;
84
use crate::ops::interface::*;
95
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
@@ -60,7 +56,8 @@ pub struct AnalyzedImportOp {
6056
pub source_id: i32,
6157
pub executor: Box<dyn SourceExecutor>,
6258
pub output: AnalyzedOpOutput,
63-
pub primary_key_type: ValueType,
59+
pub primary_key_type: schema::ValueType,
60+
pub refresh_options: spec::SourceRefreshOptions,
6461
}
6562

6663
pub struct AnalyzedFunctionExecInfo {
@@ -69,7 +66,7 @@ pub struct AnalyzedFunctionExecInfo {
6966

7067
/// Fingerprinter of the function's behavior.
7168
pub fingerprinter: Fingerprinter,
72-
pub output_type: ValueType,
69+
pub output_type: schema::ValueType,
7370
}
7471

7572
pub struct AnalyzedTransformOp {
@@ -106,7 +103,7 @@ pub struct AnalyzedExportOp {
106103
pub executor: Arc<dyn ExportTargetExecutor>,
107104
pub query_target: Option<Arc<dyn QueryTarget>>,
108105
pub primary_key_def: AnalyzedPrimaryKeyDef,
109-
pub primary_key_type: ValueType,
106+
pub primary_key_type: schema::ValueType,
110107
/// idx for value fields - excluding the primary key field.
111108
pub value_fields: Vec<u32>,
112109
/// If true, value is never changed on the same primary key.

0 commit comments

Comments
 (0)