Skip to content

Commit 4d32ae6

Browse files
committed
Add IngestMode
1 parent 2591963 commit 4d32ae6

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

src/obelisk/asynchronous/core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
from obelisk.strategies.retry import NoRetryStrategy, RetryStrategy
4141
from obelisk.types import ObeliskKind
42+
from obelisk.types.core import IngestMode
4243

4344

4445
DataType = Literal["number", "number[]", "json", "bool", "string"]
@@ -289,6 +290,7 @@ async def send(
289290
self,
290291
dataset: str,
291292
data: list[IncomingDatapoint],
293+
ingest_mode: IngestMode = IngestMode.BOTH,
292294
) -> httpx.Response:
293295
"""
294296
Publishes data to Obelisk
@@ -312,6 +314,7 @@ async def send(
312314
response = await self.http_post(
313315
f"{self.kind.root_url}/{dataset}/data/ingest",
314316
data=[x.model_dump(mode="json") for x in data],
317+
params={"mode": ingest_mode.value}
315318
)
316319
if response.status_code != 204:
317320
msg = f"An error occured during data ingest. Status {response.status_code}, message: {response.text}"

src/obelisk/types/core.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from datetime import datetime
2121
from typing import Any
2222
from collections.abc import Iterable
23+
from enum import Enum
2324

2425

2526
FieldName = str
@@ -199,3 +200,10 @@ def add_or(self, *other: Item) -> Filter:
199200
else:
200201
self.content = Or(self.content, *other)
201202
return self
203+
204+
205+
class IngestMode(str, Enum):
206+
"""Whether the ingested datapoints should be streamed, stored, or both (the default)"""
207+
BOTH = "DEFAULT"
208+
STREAM = "STREAM_ONLY"
209+
STORE = "STORE_ONLY"

0 commit comments

Comments
 (0)