Skip to content

Commit 8c0fc88

Browse files
authored
DATA 4228: Update SDKs and CLI (#934)
1 parent f887544 commit 8c0fc88

File tree

3 files changed

+23
-3
lines changed

3 files changed

+23
-3
lines changed

src/viam/app/data_client.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ class DataPipeline:
270270
enabled: bool
271271
"""Whether the data pipeline is enabled"""
272272

273+
data_source_type: TabularDataSourceType.ValueType
274+
"""The type of data source for the data pipeline"""
275+
273276
@classmethod
274277
def from_proto(cls, data_pipeline: ProtoDataPipeline) -> Self:
275278
return cls(
@@ -281,6 +284,7 @@ def from_proto(cls, data_pipeline: ProtoDataPipeline) -> Self:
281284
created_on=data_pipeline.created_on.ToDatetime(),
282285
updated_at=data_pipeline.updated_at.ToDatetime(),
283286
enabled=data_pipeline.enabled,
287+
data_source_type=data_pipeline.data_source_type,
284288
)
285289

286290
@dataclass
@@ -1883,7 +1887,14 @@ async def list_data_pipelines(self, organization_id: str) -> List[DataPipeline]:
18831887
response: ListDataPipelinesResponse = await self._data_pipelines_client.ListDataPipelines(request, metadata=self._metadata)
18841888
return [DataClient.DataPipeline.from_proto(pipeline) for pipeline in response.data_pipelines]
18851889

1886-
async def create_data_pipeline(self, organization_id: str, name: str, mql_binary: List[Dict[str, Any]], schedule: str) -> str:
1890+
async def create_data_pipeline(
1891+
self,
1892+
organization_id: str,
1893+
name: str,
1894+
mql_binary: List[Dict[str, Any]],
1895+
schedule: str,
1896+
data_source_type: TabularDataSourceType.ValueType = TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
1897+
) -> str:
18871898
"""Create a new data pipeline.
18881899
18891900
::
@@ -1892,7 +1903,8 @@ async def create_data_pipeline(self, organization_id: str, name: str, mql_binary
18921903
organization_id="<YOUR-ORGANIZATION-ID>",
18931904
name="<YOUR-PIPELINE-NAME>",
18941905
mql_binary=[<YOUR-MQL-PIPELINE-AGGREGATION>],
1895-
schedule="<YOUR-SCHEDULE>"
1906+
schedule="<YOUR-SCHEDULE>",
1907+
data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
18961908
)
18971909
18981910
Args:
@@ -1902,12 +1914,14 @@ async def create_data_pipeline(self, organization_id: str, name: str, mql_binary
19021914
mql_binary (List[Dict[str, Any]]):The MQL pipeline to run, as a list of MongoDB aggregation pipeline stages.
19031915
schedule (str): A cron expression representing the expected execution schedule in UTC (note this also
19041916
defines the input time window; an hourly schedule would process 1 hour of data at a time).
1917+
data_source_type (TabularDataSourceType): The type of data source to use for the pipeline.
1918+
Defaults to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD.
19051919
19061920
Returns:
19071921
str: The ID of the newly created pipeline.
19081922
"""
19091923
binary: List[bytes] = [bson.encode(query) for query in mql_binary]
1910-
request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule)
1924+
request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule, data_source_type=data_source_type)
19111925
response: CreateDataPipelineResponse = await self._data_pipelines_client.CreateDataPipeline(request, metadata=self._metadata)
19121926
return response.id
19131927

tests/mocks/services.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,7 @@ async def CreateDataPipeline(self, stream: Stream[CreateDataPipelineRequest, Cre
11471147
self.mql_binary = request.mql_binary
11481148
self.schedule = request.schedule
11491149
self.org_id = request.organization_id
1150+
self.data_source_type = request.data_source_type
11501151
await stream.send_message(CreateDataPipelineResponse(id=self.create_response))
11511152

11521153
async def GetDataPipeline(self, stream: Stream[GetDataPipelineRequest, GetDataPipelineResponse]) -> None:

tests/test_data_pipelines.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from viam.app.data_client import DataClient
77
from viam.proto.app.datapipelines import DataPipeline, DataPipelineRun, DataPipelineRunStatus
88
from viam.utils import datetime_to_timestamp
9+
from viam.proto.app.data import TabularDataSourceType
910

1011
from .mocks.services import MockDataPipelines
1112

@@ -15,6 +16,8 @@
1516
SCHEDULE = "0 0 * * *"
1617
UPDATED_SCHEDULE = "0 1 * * *"
1718
MQL_BINARY = []
19+
DATA_SOURCE_TYPE = TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_UNSPECIFIED
20+
STANDARD_DATA_SOURCE_TYPE = TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD
1821

1922
TIMESTAMP = datetime.fromtimestamp(0)
2023
TIMESTAMP_PROTO = datetime_to_timestamp(TIMESTAMP)
@@ -28,6 +31,7 @@
2831
enabled=True,
2932
created_on=TIMESTAMP_PROTO,
3033
updated_at=TIMESTAMP_PROTO,
34+
data_source_type=DATA_SOURCE_TYPE,
3135
)
3236
PROTO_DATA_PIPELINES = [PROTO_DATA_PIPELINE]
3337

@@ -84,6 +88,7 @@ async def test_create_data_pipeline(self, service: MockDataPipelines):
8488
assert service.org_id == ORG_ID
8589
assert service.schedule == SCHEDULE
8690
assert service.mql_binary == MQL_BINARY
91+
assert service.data_source_type == STANDARD_DATA_SOURCE_TYPE
8792

8893
async def test_get_data_pipeline(self, service: MockDataPipelines):
8994
async with ChannelFor([service]) as channel:

0 commit comments

Comments
 (0)