diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bf16884..553fd9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,5 +31,6 @@ jobs: runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 - - uses: astral-sh/setup-uv@v3 + - uses: astral-sh/setup-uv@v6 + - run: uv python install - run: uv run pytest diff --git a/.gitignore b/.gitignore index 8eb2d31..ac51e21 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ cover/ # Sphinx documentation docs/_build/ docs/build/ +docs/source/_autosummary/ # PyBuilder .pybuilder/ @@ -74,6 +75,7 @@ venv/ ENV/ env.bak/ venv.bak/ +pyrightconfig.json # Spyder project settings .spyderproject diff --git a/README.md b/README.md index d426fa3..99a6820 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,10 @@ Building is as follows: # uv run sphinx-build -M html docs/source/ docs/build/ ``` +In case of major restructuring, it may be needed to clean up the contents of `docs/_autosummary` and potentially other rst files in `docs`, +followed by re-running the build. +Manually triggering sphinx-apidoc is unnecessary. + ## Credits Base implementation originally by Pieter Moens , diff --git a/docs/source/_autosummary/obelisk.asynchronous.client.Client.rst b/docs/source/_autosummary/obelisk.asynchronous.client.Client.rst deleted file mode 100644 index 95f23a5..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.client.Client.rst +++ /dev/null @@ -1,37 +0,0 @@ -obelisk.asynchronous.client.Client -================================== - -.. currentmodule:: obelisk.asynchronous.client - -.. autoclass:: Client - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Client.__init__ - ~Client.http_post - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Client.grace_period - ~Client.token - ~Client.token_expires - ~Client.retry_strategy - ~Client.kind - ~Client.log - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.client.rst b/docs/source/_autosummary/obelisk.asynchronous.client.rst deleted file mode 100644 index ad556c4..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.client.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.asynchronous.client -=========================== - -.. automodule:: obelisk.asynchronous.client - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Client - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.consumer.Consumer.rst b/docs/source/_autosummary/obelisk.asynchronous.consumer.Consumer.rst deleted file mode 100644 index 6128483..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.consumer.Consumer.rst +++ /dev/null @@ -1,40 +0,0 @@ -obelisk.asynchronous.consumer.Consumer -====================================== - -.. currentmodule:: obelisk.asynchronous.consumer - -.. autoclass:: Consumer - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Consumer.__init__ - ~Consumer.http_post - ~Consumer.query - ~Consumer.query_time_chunked - ~Consumer.single_chunk - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Consumer.grace_period - ~Consumer.token - ~Consumer.token_expires - ~Consumer.retry_strategy - ~Consumer.kind - ~Consumer.log - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.consumer.rst b/docs/source/_autosummary/obelisk.asynchronous.consumer.rst deleted file mode 100644 index 1a4159e..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.consumer.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.asynchronous.consumer -============================= - -.. automodule:: obelisk.asynchronous.consumer - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Consumer - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.consumer_test.rst b/docs/source/_autosummary/obelisk.asynchronous.consumer_test.rst deleted file mode 100644 index d81d359..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.consumer_test.rst +++ /dev/null @@ -1,13 +0,0 @@ -obelisk.asynchronous.consumer\_test -=================================== - -.. automodule:: obelisk.asynchronous.consumer_test - - - .. rubric:: Functions - - .. autosummary:: - :toctree: - - test_demo_igent - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.consumer_test.test_demo_igent.rst b/docs/source/_autosummary/obelisk.asynchronous.consumer_test.test_demo_igent.rst deleted file mode 100644 index f02fbb0..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.consumer_test.test_demo_igent.rst +++ /dev/null @@ -1,6 +0,0 @@ -obelisk.asynchronous.consumer\_test.test\_demo\_igent -===================================================== - -.. currentmodule:: obelisk.asynchronous.consumer_test - -.. autofunction:: test_demo_igent \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.producer.Producer.rst b/docs/source/_autosummary/obelisk.asynchronous.producer.Producer.rst deleted file mode 100644 index a9226ad..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.producer.Producer.rst +++ /dev/null @@ -1,38 +0,0 @@ -obelisk.asynchronous.producer.Producer -====================================== - -.. currentmodule:: obelisk.asynchronous.producer - -.. autoclass:: Producer - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Producer.__init__ - ~Producer.http_post - ~Producer.send - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Producer.grace_period - ~Producer.token - ~Producer.token_expires - ~Producer.retry_strategy - ~Producer.kind - ~Producer.log - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.producer.rst b/docs/source/_autosummary/obelisk.asynchronous.producer.rst deleted file mode 100644 index 2df3161..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.producer.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.asynchronous.producer -============================= - -.. automodule:: obelisk.asynchronous.producer - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Producer - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.asynchronous.rst b/docs/source/_autosummary/obelisk.asynchronous.rst deleted file mode 100644 index 06cd884..0000000 --- a/docs/source/_autosummary/obelisk.asynchronous.rst +++ /dev/null @@ -1,17 +0,0 @@ -obelisk.asynchronous -==================== - -.. automodule:: obelisk.asynchronous - - -.. rubric:: Modules - -.. autosummary:: - :toctree: - :template: custom-module-template.rst - :recursive: - - client - consumer - consumer_test - producer diff --git a/docs/source/_autosummary/obelisk.exceptions.AuthenticationError.rst b/docs/source/_autosummary/obelisk.exceptions.AuthenticationError.rst deleted file mode 100644 index 5285b6b..0000000 --- a/docs/source/_autosummary/obelisk.exceptions.AuthenticationError.rst +++ /dev/null @@ -1,6 +0,0 @@ -obelisk.exceptions.AuthenticationError -====================================== - -.. currentmodule:: obelisk.exceptions - -.. autoexception:: AuthenticationError \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.exceptions.ObeliskError.rst b/docs/source/_autosummary/obelisk.exceptions.ObeliskError.rst deleted file mode 100644 index 59f8ffa..0000000 --- a/docs/source/_autosummary/obelisk.exceptions.ObeliskError.rst +++ /dev/null @@ -1,6 +0,0 @@ -obelisk.exceptions.ObeliskError -=============================== - -.. currentmodule:: obelisk.exceptions - -.. autoexception:: ObeliskError \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.exceptions.rst b/docs/source/_autosummary/obelisk.exceptions.rst deleted file mode 100644 index 2f0a8fa..0000000 --- a/docs/source/_autosummary/obelisk.exceptions.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.exceptions -================== - -.. automodule:: obelisk.exceptions - - - .. rubric:: Exceptions - - .. autosummary:: - :toctree: - - AuthenticationError - ObeliskError - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.rst b/docs/source/_autosummary/obelisk.rst deleted file mode 100644 index ca0754f..0000000 --- a/docs/source/_autosummary/obelisk.rst +++ /dev/null @@ -1,18 +0,0 @@ -obelisk -======= - -.. automodule:: obelisk - - -.. rubric:: Modules - -.. autosummary:: - :toctree: - :template: custom-module-template.rst - :recursive: - - asynchronous - exceptions - strategies - sync - types diff --git a/docs/source/_autosummary/obelisk.strategies.retry.ExponentialBackoffStrategy.rst b/docs/source/_autosummary/obelisk.strategies.retry.ExponentialBackoffStrategy.rst deleted file mode 100644 index e131a4a..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.ExponentialBackoffStrategy.rst +++ /dev/null @@ -1,34 +0,0 @@ -obelisk.strategies.retry.ExponentialBackoffStrategy -=================================================== - -.. currentmodule:: obelisk.strategies.retry - -.. autoclass:: ExponentialBackoffStrategy - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~ExponentialBackoffStrategy.__init__ - ~ExponentialBackoffStrategy.make - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~ExponentialBackoffStrategy.max_retries - ~ExponentialBackoffStrategy.backoff - ~ExponentialBackoffStrategy.max_backoff - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.retry.ImmediateRetryStrategy.rst b/docs/source/_autosummary/obelisk.strategies.retry.ImmediateRetryStrategy.rst deleted file mode 100644 index 4bacb29..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.ImmediateRetryStrategy.rst +++ /dev/null @@ -1,26 +0,0 @@ -obelisk.strategies.retry.ImmediateRetryStrategy -=============================================== - -.. currentmodule:: obelisk.strategies.retry - -.. autoclass:: ImmediateRetryStrategy - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~ImmediateRetryStrategy.__init__ - ~ImmediateRetryStrategy.make - - - - - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.retry.NoRetryStrategy.rst b/docs/source/_autosummary/obelisk.strategies.retry.NoRetryStrategy.rst deleted file mode 100644 index 0dec53f..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.NoRetryStrategy.rst +++ /dev/null @@ -1,26 +0,0 @@ -obelisk.strategies.retry.NoRetryStrategy -======================================== - -.. currentmodule:: obelisk.strategies.retry - -.. autoclass:: NoRetryStrategy - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~NoRetryStrategy.__init__ - ~NoRetryStrategy.make - - - - - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.retry.RetryEvaluator.rst b/docs/source/_autosummary/obelisk.strategies.retry.RetryEvaluator.rst deleted file mode 100644 index 684ce8c..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.RetryEvaluator.rst +++ /dev/null @@ -1,26 +0,0 @@ -obelisk.strategies.retry.RetryEvaluator -======================================= - -.. currentmodule:: obelisk.strategies.retry - -.. autoclass:: RetryEvaluator - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~RetryEvaluator.__init__ - ~RetryEvaluator.should_retry - - - - - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.retry.RetryStrategy.rst b/docs/source/_autosummary/obelisk.strategies.retry.RetryStrategy.rst deleted file mode 100644 index efa8a23..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.RetryStrategy.rst +++ /dev/null @@ -1,26 +0,0 @@ -obelisk.strategies.retry.RetryStrategy -====================================== - -.. currentmodule:: obelisk.strategies.retry - -.. autoclass:: RetryStrategy - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~RetryStrategy.__init__ - ~RetryStrategy.make - - - - - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.retry.rst b/docs/source/_autosummary/obelisk.strategies.retry.rst deleted file mode 100644 index 88abac3..0000000 --- a/docs/source/_autosummary/obelisk.strategies.retry.rst +++ /dev/null @@ -1,18 +0,0 @@ -obelisk.strategies.retry -======================== - -.. automodule:: obelisk.strategies.retry - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - ExponentialBackoffStrategy - ImmediateRetryStrategy - NoRetryStrategy - RetryEvaluator - RetryStrategy - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.strategies.rst b/docs/source/_autosummary/obelisk.strategies.rst deleted file mode 100644 index ad62a2e..0000000 --- a/docs/source/_autosummary/obelisk.strategies.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.strategies -================== - -.. automodule:: obelisk.strategies - - -.. rubric:: Modules - -.. autosummary:: - :toctree: - :template: custom-module-template.rst - :recursive: - - retry diff --git a/docs/source/_autosummary/obelisk.sync.consumer.Consumer.rst b/docs/source/_autosummary/obelisk.sync.consumer.Consumer.rst deleted file mode 100644 index 8394ebd..0000000 --- a/docs/source/_autosummary/obelisk.sync.consumer.Consumer.rst +++ /dev/null @@ -1,35 +0,0 @@ -obelisk.sync.consumer.Consumer -============================== - -.. currentmodule:: obelisk.sync.consumer - -.. autoclass:: Consumer - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Consumer.__init__ - ~Consumer.query - ~Consumer.query_time_chunked - ~Consumer.single_chunk - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Consumer.loop - ~Consumer.async_consumer - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.consumer.rst b/docs/source/_autosummary/obelisk.sync.consumer.rst deleted file mode 100644 index c742ea6..0000000 --- a/docs/source/_autosummary/obelisk.sync.consumer.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.sync.consumer -===================== - -.. automodule:: obelisk.sync.consumer - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Consumer - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.consumer_test.rst b/docs/source/_autosummary/obelisk.sync.consumer_test.rst deleted file mode 100644 index f8d450d..0000000 --- a/docs/source/_autosummary/obelisk.sync.consumer_test.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.sync.consumer\_test -=========================== - -.. automodule:: obelisk.sync.consumer_test - - - .. rubric:: Functions - - .. autosummary:: - :toctree: - - test_demo_igent - test_two_instances - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.consumer_test.test_demo_igent.rst b/docs/source/_autosummary/obelisk.sync.consumer_test.test_demo_igent.rst deleted file mode 100644 index 2b56e9b..0000000 --- a/docs/source/_autosummary/obelisk.sync.consumer_test.test_demo_igent.rst +++ /dev/null @@ -1,6 +0,0 @@ -obelisk.sync.consumer\_test.test\_demo\_igent -============================================= - -.. currentmodule:: obelisk.sync.consumer_test - -.. autofunction:: test_demo_igent \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.consumer_test.test_two_instances.rst b/docs/source/_autosummary/obelisk.sync.consumer_test.test_two_instances.rst deleted file mode 100644 index 727977a..0000000 --- a/docs/source/_autosummary/obelisk.sync.consumer_test.test_two_instances.rst +++ /dev/null @@ -1,6 +0,0 @@ -obelisk.sync.consumer\_test.test\_two\_instances -================================================ - -.. currentmodule:: obelisk.sync.consumer_test - -.. autofunction:: test_two_instances \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.producer.Producer.rst b/docs/source/_autosummary/obelisk.sync.producer.Producer.rst deleted file mode 100644 index 7db923a..0000000 --- a/docs/source/_autosummary/obelisk.sync.producer.Producer.rst +++ /dev/null @@ -1,33 +0,0 @@ -obelisk.sync.producer.Producer -============================== - -.. currentmodule:: obelisk.sync.producer - -.. autoclass:: Producer - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Producer.__init__ - ~Producer.send - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Producer.loop - ~Producer.async_producer - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.producer.rst b/docs/source/_autosummary/obelisk.sync.producer.rst deleted file mode 100644 index 336177e..0000000 --- a/docs/source/_autosummary/obelisk.sync.producer.rst +++ /dev/null @@ -1,14 +0,0 @@ -obelisk.sync.producer -===================== - -.. automodule:: obelisk.sync.producer - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Producer - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.sync.rst b/docs/source/_autosummary/obelisk.sync.rst deleted file mode 100644 index d014992..0000000 --- a/docs/source/_autosummary/obelisk.sync.rst +++ /dev/null @@ -1,16 +0,0 @@ -obelisk.sync -============ - -.. automodule:: obelisk.sync - - -.. rubric:: Modules - -.. autosummary:: - :toctree: - :template: custom-module-template.rst - :recursive: - - consumer - consumer_test - producer diff --git a/docs/source/_autosummary/obelisk.types.Datapoint.rst b/docs/source/_autosummary/obelisk.types.Datapoint.rst deleted file mode 100644 index 556b9e3..0000000 --- a/docs/source/_autosummary/obelisk.types.Datapoint.rst +++ /dev/null @@ -1,64 +0,0 @@ -obelisk.types.Datapoint -======================= - -.. currentmodule:: obelisk.types - -.. autoclass:: Datapoint - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~Datapoint.__init__ - ~Datapoint.construct - ~Datapoint.copy - ~Datapoint.dict - ~Datapoint.from_orm - ~Datapoint.json - ~Datapoint.model_construct - ~Datapoint.model_copy - ~Datapoint.model_dump - ~Datapoint.model_dump_json - ~Datapoint.model_json_schema - ~Datapoint.model_parametrized_name - ~Datapoint.model_post_init - ~Datapoint.model_rebuild - ~Datapoint.model_validate - ~Datapoint.model_validate_json - ~Datapoint.model_validate_strings - ~Datapoint.parse_file - ~Datapoint.parse_obj - ~Datapoint.parse_raw - ~Datapoint.schema - ~Datapoint.schema_json - ~Datapoint.update_forward_refs - ~Datapoint.validate - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~Datapoint.model_computed_fields - ~Datapoint.model_config - ~Datapoint.model_extra - ~Datapoint.model_fields - ~Datapoint.model_fields_set - ~Datapoint.timestamp - ~Datapoint.value - ~Datapoint.dataset - ~Datapoint.metric - ~Datapoint.source - ~Datapoint.userId - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.types.IngestMode.rst b/docs/source/_autosummary/obelisk.types.IngestMode.rst deleted file mode 100644 index 448d4c1..0000000 --- a/docs/source/_autosummary/obelisk.types.IngestMode.rst +++ /dev/null @@ -1,27 +0,0 @@ -obelisk.types.IngestMode -======================== - -.. currentmodule:: obelisk.types - -.. autoclass:: IngestMode - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~IngestMode.DEFAULT - ~IngestMode.STREAM_ONLY - ~IngestMode.STORE_ONLY - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.types.ObeliskKind.rst b/docs/source/_autosummary/obelisk.types.ObeliskKind.rst deleted file mode 100644 index 2df7c80..0000000 --- a/docs/source/_autosummary/obelisk.types.ObeliskKind.rst +++ /dev/null @@ -1,26 +0,0 @@ -obelisk.types.ObeliskKind -========================= - -.. currentmodule:: obelisk.types - -.. autoclass:: ObeliskKind - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~ObeliskKind.CLASSIC - ~ObeliskKind.HFS - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.types.QueryResult.rst b/docs/source/_autosummary/obelisk.types.QueryResult.rst deleted file mode 100644 index 0d2e9e8..0000000 --- a/docs/source/_autosummary/obelisk.types.QueryResult.rst +++ /dev/null @@ -1,60 +0,0 @@ -obelisk.types.QueryResult -========================= - -.. currentmodule:: obelisk.types - -.. autoclass:: QueryResult - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - .. rubric:: Methods - - .. autosummary:: - - ~QueryResult.__init__ - ~QueryResult.construct - ~QueryResult.copy - ~QueryResult.dict - ~QueryResult.from_orm - ~QueryResult.json - ~QueryResult.model_construct - ~QueryResult.model_copy - ~QueryResult.model_dump - ~QueryResult.model_dump_json - ~QueryResult.model_json_schema - ~QueryResult.model_parametrized_name - ~QueryResult.model_post_init - ~QueryResult.model_rebuild - ~QueryResult.model_validate - ~QueryResult.model_validate_json - ~QueryResult.model_validate_strings - ~QueryResult.parse_file - ~QueryResult.parse_obj - ~QueryResult.parse_raw - ~QueryResult.schema - ~QueryResult.schema_json - ~QueryResult.update_forward_refs - ~QueryResult.validate - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~QueryResult.model_computed_fields - ~QueryResult.model_config - ~QueryResult.model_extra - ~QueryResult.model_fields - ~QueryResult.model_fields_set - ~QueryResult.items - ~QueryResult.cursor - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.types.TimestampPrecision.rst b/docs/source/_autosummary/obelisk.types.TimestampPrecision.rst deleted file mode 100644 index f165ddf..0000000 --- a/docs/source/_autosummary/obelisk.types.TimestampPrecision.rst +++ /dev/null @@ -1,27 +0,0 @@ -obelisk.types.TimestampPrecision -================================ - -.. currentmodule:: obelisk.types - -.. autoclass:: TimestampPrecision - :members: - :show-inheritance: - :inherited-members: - - - .. automethod:: __init__ - - - - - - - .. rubric:: Attributes - - .. autosummary:: - - ~TimestampPrecision.SECONDS - ~TimestampPrecision.MILLISECONDS - ~TimestampPrecision.MICROSECONDS - - \ No newline at end of file diff --git a/docs/source/_autosummary/obelisk.types.rst b/docs/source/_autosummary/obelisk.types.rst deleted file mode 100644 index 99f53b0..0000000 --- a/docs/source/_autosummary/obelisk.types.rst +++ /dev/null @@ -1,18 +0,0 @@ -obelisk.types -============= - -.. automodule:: obelisk.types - - - .. rubric:: Classes - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - - Datapoint - IngestMode - ObeliskKind - QueryResult - TimestampPrecision - \ No newline at end of file diff --git a/docs/source/_templates/custom-class-template.rst b/docs/source/_templates/autosummary/class.rst similarity index 68% rename from docs/source/_templates/custom-class-template.rst rename to docs/source/_templates/autosummary/class.rst index b29757c..d57822d 100644 --- a/docs/source/_templates/custom-class-template.rst +++ b/docs/source/_templates/autosummary/class.rst @@ -3,19 +3,21 @@ .. currentmodule:: {{ module }} .. autoclass:: {{ objname }} - :members: :show-inheritance: - :inherited-members: {% block methods %} .. automethod:: __init__ + :no-index: {% if methods %} .. rubric:: {{ _('Methods') }} .. autosummary:: {% for item in methods %} - ~{{ name }}.{{ item }} + {% if item not in inherited_members %} + .. automethod:: {{ item }} + :no-index-entry: + {%- endif %} {%- endfor %} {% endif %} {% endblock %} @@ -26,7 +28,10 @@ .. autosummary:: {% for item in attributes %} - ~{{ name }}.{{ item }} + {% if item not in inherited_members %} + .. autoattribute:: {{ item }} + :no-index-entry: + {%- endif %} {%- endfor %} {% endif %} {% endblock %} diff --git a/docs/source/_templates/custom-module-template.rst b/docs/source/_templates/autosummary/module.rst similarity index 93% rename from docs/source/_templates/custom-module-template.rst rename to docs/source/_templates/autosummary/module.rst index 41e1e91..2b788aa 100644 --- a/docs/source/_templates/custom-module-template.rst +++ b/docs/source/_templates/autosummary/module.rst @@ -1,5 +1,6 @@ {{ fullname | escape | underline}} +.. currentmodule:: {{ module }} .. automodule:: {{ fullname }} {% block attributes %} @@ -32,7 +33,6 @@ .. autosummary:: :toctree: - :template: custom-class-template.rst {% for item in classes %} {{ item }} {%- endfor %} @@ -57,7 +57,6 @@ .. autosummary:: :toctree: - :template: custom-module-template.rst :recursive: {% for item in modules %} {{ item }} diff --git a/docs/source/conf.py b/docs/source/conf.py index 25a54b2..935ecd9 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -27,10 +27,11 @@ 'sphinx.ext.autodoc', 'sphinx.ext.autosummary', 'sphinx.ext.viewcode', - 'numpydoc', 'sphinx.ext.doctest', + 'sphinx.ext.intersphinx', ] autosummary_generate = True +autosummary_ignore_module_all = False templates_path = ['_templates'] exclude_patterns = [] @@ -42,3 +43,15 @@ html_theme = 'alabaster' html_static_path = ['_static'] + +autodoc_default_options = { + 'inherited-members': False, + 'undoc-members': True, +} +autoclass_content = 'class' +autodoc_member_order = 'groupwise' + +intersphinx_mapping = { + 'python': ('https://docs.python.org/3', None), + 'pydantic': ('https://docs.pydantic.dev/latest', None), +} diff --git a/docs/source/index.rst b/docs/source/index.rst index b306562..7e1ef2c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -7,7 +7,6 @@ It brings retry strategies and optional async support .. autosummary:: :toctree: _autosummary - :template: custom-module-template.rst :recursive: obelisk diff --git a/src/obelisk/__init__.py b/src/obelisk/__init__.py index fc049b3..4d6f978 100644 --- a/src/obelisk/__init__.py +++ b/src/obelisk/__init__.py @@ -4,10 +4,10 @@ each with a synchronous and async API. The PyPi package name is ``obelisk-py``, the Python module is called ``obelisk``. -Your starting point will be one of the Consumer or Producer instances in :mod:`~.sync` or :mod:`~.asynchronous` depending on your preferred API. +Your starting point will be one of the Obelisk instances in :mod:`~.sync` or :mod:`~.asynchronous` depending on your preferred API. -Each of the `sync` or `asynchronous` modules will contain two main classes, those being Producer and Consumer. -Those submit or fetch data respectively. +The Obelisk classes in these modules both implement the same interface, +but the asynchronous implementation returns Coroutines. Error handling -------------- diff --git a/src/obelisk/asynchronous/__init__.py b/src/obelisk/asynchronous/__init__.py index a0f4184..852a74b 100644 --- a/src/obelisk/asynchronous/__init__.py +++ b/src/obelisk/asynchronous/__init__.py @@ -1,7 +1,10 @@ """ This module contains the asynchronous API to Obelisk-py. -These methods all return a :any:`Coroutine`. +These methods all return an :external+python:class:`collections.abc.Awaitable`. -Relevant entrance points are :class:`.producer.Producer` to publish data to Obelisk, -or :class:`.consumer.Consumer` to consume data. +Relevant entrance points are :class:`client.Obelisk`. +It can be imported from the :mod:`.client` module, or directly from this one. """ +__all__= ['Obelisk', 'core'] +from .client import Obelisk +from . import core diff --git a/src/obelisk/asynchronous/base.py b/src/obelisk/asynchronous/base.py new file mode 100644 index 0000000..0787c1e --- /dev/null +++ b/src/obelisk/asynchronous/base.py @@ -0,0 +1,191 @@ +from datetime import datetime, timedelta +import logging +import base64 +from typing import Any, Optional + +import httpx + +from obelisk.exceptions import AuthenticationError +from obelisk.strategies.retry import RetryStrategy, \ + NoRetryStrategy +from obelisk.types import ObeliskKind + + +class BaseClient: + """ + Base class handling Obelisk auth and doing the core HTTP communication. + Only exists in asynchronous variety, as it is not usually directly useful for user code. + """ + + _client: str = "" + _secret: str = "" + + token: Optional[str] = None + """Current authentication token""" + token_expires: Optional[datetime] = None + """Deadline after which token is no longer useable""" + + grace_period: timedelta = timedelta(seconds=10) + """Controls how much before the expiration deadline a token will be refreshed.""" + retry_strategy: RetryStrategy + kind: ObeliskKind + + log: logging.Logger + + def __init__(self, client: str, secret: str, + retry_strategy: RetryStrategy = NoRetryStrategy(), + kind: ObeliskKind = ObeliskKind.CLASSIC) -> None: + self._client = client + self._secret = secret + self.retry_strategy = retry_strategy + self.kind = kind + + self.log = logging.getLogger('obelisk') + + async def _get_token(self): + auth_string = str(base64.b64encode( + f'{self._client}:{self._secret}'.encode('utf-8')), 'utf-8') + headers = { + 'Authorization': f'Basic {auth_string}', + 'Content-Type': ('application/json' + if self.kind.use_json_auth else 'application/x-www-form-urlencoded') + } + payload = { + 'grant_type': 'client_credentials' + } + + async with httpx.AsyncClient() as client: + response = None + last_error = None + retry = self.retry_strategy.make() + while not response or await retry.should_retry(): + try: + request = await client.post( + self.kind.token_url, + json=payload if self.kind.use_json_auth else None, + data=payload if not self.kind.use_json_auth else None, + headers=headers) + + response = request.json() + except Exception as e: + last_error = e + self.log.error(e) + continue + + if response is None and last_error is not None: + raise last_error + + if request.status_code != 200: + if 'error' in response: + self.log.warning(f"Could not authenticate, {response['error']}") + raise AuthenticationError + + self.token = response['access_token'] + self.token_expires = (datetime.now() + + timedelta(seconds=response['expires_in'])) + + async def _verify_token(self): + if (self.token is None + or self.token_expires < (datetime.now() - self.grace_period)): + retry = self.retry_strategy.make() + first = True + while first or await retry.should_retry(): + first = False + try: + await self._get_token() + return + except: + continue + + async def http_post(self, url: str, data: Any = None, + params: Optional[dict] = None) -> httpx.Response: + """ + Send an HTTP POST request to Obelisk, + with proper auth. + + Possibly refreshes the authentication token and performs backoff as per `retry_strategy`. + This method is not of stable latency because of these properties. + + No validation is performed on the input data, + callers are responsible for formatting it in a method Obelisk understands. + """ + + await self._verify_token() + + headers = { + 'Authorization': f'Bearer {self.token}', + 'Content-Type': 'application/json' + } + if params is None: + params = {} + async with httpx.AsyncClient() as client: + response = None + retry = self.retry_strategy.make() + last_error = None + while not response or await retry.should_retry(): + if response is not None: + self.log.debug(f"Retrying, last response: {response.status_code}") + + try: + response = await client.post(url, + json=data, + params={k: v for k, v in params.items() if + v is not None}, + headers=headers) + + if response.status_code // 100 == 2: + return response + except Exception as e: + self.log.error(e) + last_error = e + continue + + if not response and last_error: + raise last_error + return response + + + async def http_get(self, url: str, params: Optional[dict] = None) -> httpx.Response: + """ + Send an HTTP GET request to Obelisk, + with proper auth. + + Possibly refreshes the authentication token and performs backoff as per `retry_strategy`. + This method is not of stable latency because of these properties. + + No validation is performed on the input data, + callers are responsible for formatting it in a method Obelisk understands. + """ + + await self._verify_token() + + headers = { + 'Authorization': f'Bearer {self.token}', + 'Content-Type': 'application/json' + } + if params is None: + params = {} + async with httpx.AsyncClient() as client: + response = None + retry = self.retry_strategy.make() + last_error = None + while not response or await retry.should_retry(): + if response is not None: + self.log.debug(f"Retrying, last response: {response.status_code}") + + try: + response = await client.get(url, + params={k: v for k, v in params.items() if + v is not None}, + headers=headers) + + if response.status_code // 100 == 2: + return response + except Exception as e: + self.log.error(e) + last_error = e + continue + + if not response and last_error: + raise last_error + return response diff --git a/src/obelisk/asynchronous/client.py b/src/obelisk/asynchronous/client.py index a0d7aa3..2cd656a 100644 --- a/src/obelisk/asynchronous/client.py +++ b/src/obelisk/asynchronous/client.py @@ -1,172 +1,283 @@ +import json from datetime import datetime, timedelta -import logging -import base64 -from typing import Any, Optional +from math import floor +from typing import AsyncGenerator, Generator, List, Literal, Optional import httpx +from pydantic import ValidationError -from obelisk.exceptions import AuthenticationError, ObeliskError -from obelisk.strategies.retry import RetryStrategy, \ - NoRetryStrategy -from obelisk.types import ObeliskKind +from obelisk.exceptions import ObeliskError +from obelisk.types import Datapoint, IngestMode, QueryResult, TimestampPrecision +from obelisk.asynchronous.base import BaseClient -class Client: + +class Obelisk(BaseClient): """ - Base class handling Obelisk auth and doing the core HTTP communication. - Only exists in asynchronous variety, as it is not usually directly useful for user code. + Component that contains all the logic to consume data from + the Obelisk API (e.g. historical data, sse). + + Obelisk API Documentation: + https://obelisk.docs.apiary.io/ """ - _client: str = "" - _secret: str = "" - - token: Optional[str] = None - """Current authentication token""" - token_expires: Optional[datetime] = None - """Deadline after which token is no longer useable""" - - grace_period: timedelta = timedelta(seconds=10) - """Controls how much before the expiration deadline a token will be refreshed.""" - retry_strategy: RetryStrategy - kind: ObeliskKind - - log: logging.Logger - - _token_url = 'https://obelisk.ilabt.imec.be/api/v3/auth/token' - _root_url = 'https://obelisk.ilabt.imec.be/api/v3' - _metadata_url = 'https://obelisk.ilabt.imec.be/api/v3/catalog/graphql' - _events_url = 'https://obelisk.ilabt.imec.be/api/v3/data/query/events' - _ingest_url = 'https://obelisk.ilabt.imec.be/api/v3/data/ingest' - _streams_url = 'https://obelisk.ilabt.imec.be/api/v3/data/streams' - - def __init__(self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), - kind: ObeliskKind = ObeliskKind.CLASSIC) -> None: - self._client = client - self._secret = secret - self.retry_strategy = retry_strategy - self.kind = kind - - self.log = logging.getLogger('obelisk') - - if self.kind == ObeliskKind.HFS: - self._token_url = 'https://obelisk-hfs.discover.ilabt.imec.be/auth/realms/obelisk-hfs/protocol/openid-connect/token' - self._root_url = 'https://obelisk-hfs.discover.ilabt.imec.be' - self._events_url = 'https://obelisk-hfs.discover.ilabt.imec.be/data/query/events' - self._ingest_url = 'https://obelisk-hfs.discover.ilabt.imec.be/data/ingest' - else: - self._token_url = 'https://obelisk.ilabt.imec.be/api/v3/auth/token' - self._root_url = 'https://obelisk.ilabt.imec.be/api/v3' - self._metadata_url = 'https://obelisk.ilabt.imec.be/api/v3/catalog/graphql' - self._events_url = 'https://obelisk.ilabt.imec.be/api/v3/data/query/events' - self._ingest_url = 'https://obelisk.ilabt.imec.be/api/v3/data/ingest' - self._streams_url = 'https://obelisk.ilabt.imec.be/api/v3/data/streams' - - async def _get_token(self): - auth_string = str(base64.b64encode( - f'{self._client}:{self._secret}'.encode('utf-8')), 'utf-8') - headers = { - 'Authorization': f'Basic {auth_string}', - 'Content-Type': ('application/x-www-form-urlencoded' - if self.kind == ObeliskKind.HFS else 'application/json') - } + async def fetch_single_chunk( + self, + datasets: List[str], + metrics: Optional[List[str]] = None, + fields: Optional[List[str]] = None, + from_timestamp: Optional[int] = None, + to_timestamp: Optional[int] = None, + order_by: Optional[dict] = None, + filter_: Optional[dict] = None, + limit: Optional[int] = None, + limit_by: Optional[dict] = None, + cursor: Optional[str] = None, + ) -> QueryResult: + """ + Queries one chunk of events from Obelisk for given parameters, + does not handle paging over Cursors. + + Parameters + ---------- + + datasets : List[str] + List of Dataset IDs. + metrics : Optional[List[str]] = None + List of Metric IDs or wildcards (e.g. `*::number`), defaults to all metrics. + fields : Optional[List[str]] = None + List of fields to return in the result set. + Defaults to `[metric, source, value]` + from_timestamp : Optional[int] = None + Limit output to events after (and including) + this UTC millisecond timestamp, if present. + to_timestamp : Optional[int] = None + Limit output to events before (and excluding) + this UTC millisecond timestamp, if present. + order_by : Optional[dict] = None + Specifies the ordering of the output, + defaults to ascending by timestamp. + See Obelisk docs for format. Caller is responsible for validity. + filter_ : Optional[dict] = None + Limit output to events matching the specified Filter expression. + See Obelisk docs, caller is responsible for validity. + limit : Optional[int] = None + Limit output to a maximum number of events. + Also determines the page size. + Default is server-determined, usually 2500. + limit_by : Optional[dict] = None + Limit the combination of a specific set of Index fields + to a specified maximum number. + cursor : Optional[str] = None + Specifies the next cursor, + used when paging through large result sets. + """ + + # pylint: disable=too-many-arguments + data_range = {"datasets": datasets} + if metrics is not None: + data_range["metrics"] = metrics + payload = { - 'grant_type': 'client_credentials' + "dataRange": data_range, + "cursor": cursor, + "fields": fields, + "from": from_timestamp, + "to": to_timestamp, + "orderBy": order_by, + "filter": filter_, + "limit": limit, + "limitBy": limit_by, } + response = await self.http_post( + self.kind.query_url, data={k: v for k, v in payload.items() if v is not None} + ) + if response.status_code != 200: + self.log.warning(f"Unexpected status code: {response.status_code}") + raise ObeliskError(response.status_code, response.reason_phrase) + + try: + js = response.json() + return QueryResult.model_validate(js) + except json.JSONDecodeError as e: + msg = f"Obelisk response is not a JSON object: {e}" + self.log.warning(msg) + raise ObeliskError(msg) + except ValidationError as e: + msg = f"Response cannot be validated: {e}" + self.log.warning(msg) + raise ObeliskError(msg) + + async def query( + self, + datasets: List[str], + metrics: Optional[List[str]] = None, + fields: Optional[List[str]] = None, + from_timestamp: Optional[int] = None, + to_timestamp: Optional[int] = None, + order_by: Optional[dict] = None, + filter_: Optional[dict] = None, + limit: Optional[int] = None, + limit_by: Optional[dict] = None, + ) -> List[Datapoint]: + """ + Queries data from obelisk, + automatically iterating when a cursor is returned. + + Parameters + ---------- + + datasets : List[str] + List of Dataset IDs. + metrics : Optional[List[str]] = None + List of Metric IDs or wildcards (e.g. `*::number`), defaults to all metrics. + fields : Optional[List[str]] = None + List of fields to return in the result set. + Defaults to `[metric, source, value]` + from_timestamp : Optional[int] = None + Limit output to events after (and including) + this UTC millisecond timestamp, if present. + to_timestamp : Optional[int] = None + Limit output to events before (and excluding) + this UTC millisecond timestamp, if present. + order_by : Optional[dict] = None + Specifies the ordering of the output, + defaults to ascending by timestamp. + See Obelisk docs for format. Caller is responsible for validity. + filter_ : Optional[dict] = None + Limit output to events matching the specified Filter expression. + See Obelisk docs, caller is responsible for validity. + limit : Optional[int] = None + Limit output to a maximum number of events. + Also determines the page size. + Default is server-determined, usually 2500. + limit_by : Optional[dict] = None + Limit the combination of a specific set of Index fields + to a specified maximum number. + """ + + cursor: Optional[str] | Literal[True] = True + result_set: List[Datapoint] = [] + + while cursor: + actual_cursor = cursor if cursor is not True else None + result: QueryResult = await self.fetch_single_chunk( + datasets=datasets, + metrics=metrics, + fields=fields, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + order_by=order_by, + filter_=filter_, + limit=limit, + limit_by=limit_by, + cursor=actual_cursor, + ) + result_set.extend(result.items) + cursor = result.cursor + + if limit and len(result_set) >= limit: + """On Obelisk HFS, limit is actually page size, + so continuing to read the cursor will result in a larger than desired + set of results. + + On the other hand, if the limit is very large, + we may need to iterate before we reach the desired limit after all. + """ + break + + return result_set + - async with httpx.AsyncClient() as client: - response = None - request = None - last_error = None - retry = self.retry_strategy.make() - while not response: - try: - request = await client.post( - self._token_url, - json=payload if self.kind == ObeliskKind.CLASSIC else None, - data=payload if self.kind == ObeliskKind.HFS else None, - headers=headers) - - response = request.json() - except Exception as e: - last_error = e - self.log.error(e) - if await retry.should_retry(): - continue - else: - break - - if not response or not request: - raise (last_error if last_error is not None else ObeliskError("No response")) - - if request.status_code != 200: - if 'error' in response: - self.log.warning(f"Could not authenticate, {response['error']}") - raise AuthenticationError - - self.token = response['access_token'] - self.token_expires = (datetime.now() - + timedelta(seconds=response['expires_in'])) - - async def _verify_token(self): - if (self.token is None - or self.token_expires < (datetime.now() - self.grace_period)): - retry = self.retry_strategy.make() - first = True - while first or await retry.should_retry(): - first = False - try: - await self._get_token() - return - except: - continue - - async def http_post(self, url: str, data: Any = None, - params: Optional[dict] = None) -> httpx.Response: + async def query_time_chunked( + self, + datasets: List[str], + metrics: List[str], + from_time: datetime, + to_time: datetime, + jump: timedelta, + filter_: Optional[dict] = None, + direction: Literal["asc", "desc"] = "asc", + ) -> AsyncGenerator[List[Datapoint], None]: """ - Send an HTTP POST request to Obelisk, - with proper auth. + Fetches all data matching the provided filters, + yielding one chunk at a time. + One "chunk" may require several Obelisk calls to resolve cursors. + By necessity iterates over time, no other ordering is supported. - Possibly refreshes the authentication token and performs backoff as per `retry_strategy`. - This method is not of stable latency because of these properties. + Parameters + ---------- - No validation is performed on the input data, - callers are responsible for formatting it in a method Obelisk understands. + datasets : List[str] + Dataset IDs to query from + metrics : List[str] + IDs of metrics to query + from_time : `datetime.datetime` + Start time to fetch from + to_time : `datetime.datetime` + End time to fetch until. + jump : `datetime.timedelta` + Size of one yielded chunk + filter_ : Optional[dict] = None + Obelisk filter, caller is responsible for correct format + direction : Literal['asc', 'desc'] = 'asc' + Yield older data or newer data first, defaults to older first. """ - await self._verify_token() + current_start = from_time + while current_start < to_time: + yield await self.query( + datasets=datasets, + metrics=metrics, + from_timestamp=floor(current_start.timestamp() * 1000), + to_timestamp=floor((current_start + jump).timestamp() * 1000 - 1), + order_by={"field": ["timestamp"], "ordering": direction}, + filter_=filter_, + ) + current_start += jump - headers = { - 'Authorization': f'Bearer {self.token}', - 'Content-Type': 'application/json' + async def send( + self, + dataset: str, + data: List[dict], + precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, + mode: IngestMode = IngestMode.DEFAULT, + ) -> httpx.Response: + """ + Publishes data to Obelisk + + Parameters + ---------- + dataset : str + ID for the dataset to publish to + data : List[dict] + List of Obelisk-acceptable datapoints. + Exact format varies between Classic or HFS, + caller is responsible for formatting. + precision : :class:`~obelisk.types.TimestampPrecision` = TimestampPrecision.MILLISECONDS + Precision used in the numeric timestamps contained in data. + Ensure it matches to avoid weird errors. + mode : :class:`~obelisk.types.IngestMode` = IngestMode.DEFAULT + See docs for :class:`~obelisk.types.IngestMode`. + + Raises + ------ + + ObeliskError + When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised. + """ + + params = { + "datasetId": dataset, + "timestampPrecision": precision.value, + "mode": mode.value, } - if params is None: - params = {} - async with httpx.AsyncClient() as client: - response = None - retry = self.retry_strategy.make() - last_error = None - while not response: - if response is not None: - self.log.debug(f"Retrying, last response: {response.status_code}") - - try: - response = await client.post(url, - json=data, - params={k: v for k, v in params.items() if - v is not None}, - headers=headers) - - if response.status_code // 100 == 2: - return response - except Exception as e: - self.log.error(e) - last_error = e - if await retry.should_retry(): - continue - else: - break - - if not response and last_error: - raise last_error - return response + + response = await self.http_post( + f"{self.kind.ingest_url}/{dataset}", data=data, params=params + ) + if response.status_code != 204: + msg = f"An error occured during data ingest. Status {response.status_code}, message: {response.text}" + self.log.warning(msg) + raise ObeliskError(msg) + return response diff --git a/src/obelisk/asynchronous/consumer.py b/src/obelisk/asynchronous/consumer.py deleted file mode 100644 index 878b85c..0000000 --- a/src/obelisk/asynchronous/consumer.py +++ /dev/null @@ -1,218 +0,0 @@ -import json -from datetime import datetime, timedelta -from typing import List, Literal, Generator, Optional - -from pydantic import ValidationError - -from obelisk.asynchronous.client import Client -from obelisk.exceptions import ObeliskError -from obelisk.types import QueryResult, Datapoint - -from math import floor - - -class Consumer(Client): - """ - Component that contains all the logic to consume data from - the Obelisk API (e.g. historical data, sse). - - Obelisk API Documentation: - https://obelisk.docs.apiary.io/ - """ - - async def single_chunk(self, datasets: List[str], metrics: Optional[List[str]] = None, - fields: Optional[List[str]] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict] = None, - filter_: Optional[dict] = None, - limit: Optional[int] = None, - limit_by: Optional[dict] = None, - cursor: Optional[str] = None) -> QueryResult: - """ - Queries one chunk of events from Obelisk for given parameters, - does not handle paging over Cursors. - - Parameters - ---------- - - datasets : List[str] - List of Dataset IDs. - metrics : Optional[List[str]] = None - List of Metric IDs or wildcards (e.g. `*::number`), defaults to all metrics. - fields : Optional[List[str]] = None - List of fields to return in the result set. - Defaults to `[metric, source, value]` - from_timestamp : Optional[int] = None - Limit output to events after (and including) - this UTC millisecond timestamp, if present. - to_timestamp : Optional[int] = None - Limit output to events before (and excluding) - this UTC millisecond timestamp, if present. - order_by : Optional[dict] = None - Specifies the ordering of the output, - defaults to ascending by timestamp. - See Obelisk docs for format. Caller is responsible for validity. - filter_ : Optional[dict] = None - Limit output to events matching the specified Filter expression. - See Obelisk docs, caller is responsible for validity. - limit : Optional[int] = None - Limit output to a maximum number of events. - Also determines the page size. - Default is server-determined, usually 2500. - limit_by : Optional[dict] = None - Limit the combination of a specific set of Index fields - to a specified maximum number. - cursor : Optional[str] = None - Specifies the next cursor, - used when paging through large result sets. - """ - - # pylint: disable=too-many-arguments - data_range = { - 'datasets': datasets - } - if metrics is not None: - data_range['metrics'] = metrics - - payload = { - 'dataRange': data_range, - 'cursor': cursor, - 'fields': fields, - 'from': from_timestamp, - 'to': to_timestamp, - 'orderBy': order_by, - 'filter': filter_, - 'limit': limit, - 'limitBy': limit_by - } - response = await self.http_post(self._events_url, - data={k: v for k, v in payload.items() if - v is not None}) - if response.status_code != 200: - self.log.warning(f"Unexpected status code: {response.status_code}") - raise ObeliskError(response.status_code, response.reason_phrase) - - try: - js = response.json() - return QueryResult.model_validate(js) - except json.JSONDecodeError as e: - msg = f'Obelisk response is not a JSON object: {e}' - self.log.warning(msg) - raise ObeliskError(msg) - except ValidationError as e: - msg = f"Response cannot be validated: {e}" - self.log.warning(msg) - raise ObeliskError(msg) - - - async def query(self, datasets: List[str], metrics:Optional[List[str]] = None, - fields:Optional[List[str]] = None, - from_timestamp: Optional[int] = None, to_timestamp: Optional[int] = None, - order_by: Optional[dict] = None, - filter_: Optional[dict] = None, - limit: Optional[int] = None, - limit_by: Optional[dict] = None) -> List[Datapoint]: - """ - Queries data from obelisk, - automatically iterating when a cursor is returned. - - Parameters - ---------- - - datasets : List[str] - List of Dataset IDs. - metrics : Optional[List[str]] = None - List of Metric IDs or wildcards (e.g. `*::number`), defaults to all metrics. - fields : Optional[List[str]] = None - List of fields to return in the result set. - Defaults to `[metric, source, value]` - from_timestamp : Optional[int] = None - Limit output to events after (and including) - this UTC millisecond timestamp, if present. - to_timestamp : Optional[int] = None - Limit output to events before (and excluding) - this UTC millisecond timestamp, if present. - order_by : Optional[dict] = None - Specifies the ordering of the output, - defaults to ascending by timestamp. - See Obelisk docs for format. Caller is responsible for validity. - filter_ : Optional[dict] = None - Limit output to events matching the specified Filter expression. - See Obelisk docs, caller is responsible for validity. - limit : Optional[int] = None - Limit output to a maximum number of events. - Also determines the page size. - Default is server-determined, usually 2500. - limit_by : Optional[dict] = None - Limit the combination of a specific set of Index fields - to a specified maximum number. - """ - - cursor: Optional[str] | Literal[True] = True - result_set: List[Datapoint] = [] - - while cursor: - actual_cursor = cursor if cursor is not True else None - result: QueryResult = await self.single_chunk(datasets=datasets, - metrics=metrics, fields=fields, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - order_by=order_by, filter_=filter_, - limit=limit, - limit_by=limit_by, - cursor=actual_cursor) - result_set.extend(result.items) - cursor = result.cursor - - if limit and len(result_set) >= limit: - """On Obelisk HFS, limit is actually page size, - so continuing to read the cursor will result in a larger than desired - set of results. - - On the other hand, if the limit is very large, - we may need to iterate before we reach the desired limit after all. - """ - break - - return result_set - - - async def query_time_chunked(self, datasets: List[str], metrics: List[str], - from_time: datetime, to_time: datetime, - jump: timedelta, filter_: Optional[dict] = None, - direction: Literal['asc', 'desc'] = 'asc' - ) -> Generator[List[Datapoint], None, None]: - """ - Fetches all data matching the provided filters, - yielding one chunk at a time. - One "chunk" may require several Obelisk calls to resolve cursors. - By necessity iterates over time, no other ordering is supported. - - Parameters - ---------- - - datasets : List[str] - Dataset IDs to query from - metrics : List[str] - IDs of metrics to query - from_time : `datetime.datetime` - Start time to fetch from - to_time : `datetime.datetime` - End time to fetch until. - jump : `datetime.timedelta` - Size of one yielded chunk - filter_ : Optional[dict] = None - Obelisk filter, caller is responsible for correct format - direction : Literal['asc', 'desc'] = 'asc' - Yield older data or newer data first, defaults to older first. - """ - - current_start = from_time - while current_start < to_time: - yield await self.query(datasets=datasets, metrics=metrics, - from_timestamp=floor(current_start.timestamp() * 1000), - to_timestamp=floor((current_start + jump).timestamp() * 1000 - 1), - order_by={"field": ["timestamp"], "ordering": direction}, - filter_=filter_) - current_start += jump diff --git a/src/obelisk/asynchronous/core.py b/src/obelisk/asynchronous/core.py new file mode 100644 index 0000000..8dc204b --- /dev/null +++ b/src/obelisk/asynchronous/core.py @@ -0,0 +1,260 @@ +""" +This module contains the asynchronous API to interface with Obelisk CORE. +These methods all return a :class:`Awaitable`. + +Relevant entrance points are :class:`Client`. + +This API vaguely resembles that of clients to previous Obelisk versions, +but also significantly diverts from it where the underlying Obelisk CORE API does so. +""" +from obelisk.asynchronous.base import BaseClient +from obelisk.exceptions import ObeliskError +from obelisk.types.core import FieldName, Filter + +from datetime import datetime, timedelta +import httpx +import json +from pydantic import BaseModel, AwareDatetime, ConfigDict, Field, ValidationError, model_validator +from typing import Annotated, AsyncIterator, Dict, Iterator, List, Literal, Optional, Any, cast, get_args +from typing_extensions import Self +from numbers import Number + + +DataType = Literal['number', 'number[]', 'json', 'bool', 'string'] +"""The possible types of data Obelisk can accept""" + + +def type_suffix(metric: str) -> DataType: + split = metric.split('::') + + if len(split) != 2: + raise ValueError("Incorrect amount of type qualifiers") + + suffix = split[1] + if suffix not in get_args(DataType): + raise ValueError(f"Invalid type suffix, should be one of {', '.join(get_args(DataType))}") + return cast(DataType, suffix) + + +Aggregator = Literal['last', 'min', 'mean', 'max', 'count', 'stddev'] +"""Type of aggregation Obelisk can process""" + + +Datapoint = Dict[str, Any] +"""Datapoints resulting from queries are modeled as simple dicts, as fields can come and go depending on query.""" + + +class ObeliskPosition(BaseModel): + """ + Format for coordinates as expected by Obelisk. + """ + + lat: float + """Latitude""" + lng: float + """Longitude""" + elevation: float + + +class IncomingDatapoint(BaseModel): + """ A datapoint to be submitted to Obelisk. These are validated quite extensively, but not fully. + .. automethod:: check_metric_type(self) + """ + timestamp: Optional[AwareDatetime] = None + metric: str + value: Any + labels: Optional[Dict[str, str]] = None + location: Optional[ObeliskPosition] = None + + @model_validator(mode='after') + def check_metric_type(self) -> Self: + suffix = type_suffix(self.metric) + + if suffix == 'number' and not isinstance(self.value, Number): + raise ValueError(f"Type suffix mismatch, expected number, got {type(self.value)}") + + if suffix == 'number[]': + if type(self.value) is not list or any([not isinstance(x, Number) for x in self.value]): + raise ValueError("Type suffix mismatch, expected value of number[]") + + # Do not check json, most things should be serialisable + + if suffix == 'bool' and type(self.value) is not bool: + raise ValueError(f"Type suffix mismatch, expected bool, got {type(self.value)}") + + if suffix == 'string' and type(self.value) is not str: + raise ValueError(f"Type suffix mismatch, expected bool, got {type(self.value)}") + + return self + + +class QueryParams(BaseModel): + dataset: str + groupBy: Optional[List[FieldName]] = None + aggregator: Optional[Aggregator] = None + fields: Optional[List[FieldName]] = None + orderBy: Optional[List[str]] = None # More complex than just FieldName, can be prefixed with - to invert sort + dataType: Optional[DataType] = None + filter_: Annotated[Optional[str|Filter], Field(serialization_alias='filter')] = None + """Filter in `RSQL format `__ Suffix to avoid collisions.""" + cursor: Optional[str] = None + limit: int = 1000 + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @model_validator(mode='after') + def check_datatype_needed(self) -> Self: + if self.fields is None or 'value' in self.fields: + if self.dataType is None: + raise ValueError("Value field requested, must specify datatype") + + return self + + def to_dict(self) -> Dict: + return self.model_dump(exclude_none=True, by_alias=True) + + +class ChunkedParams(BaseModel): + dataset: str + groupBy: Optional[List[FieldName]] = None + aggregator: Optional[Aggregator] = None + fields: Optional[List[FieldName]] = None + orderBy: Optional[List[str]] = None # More complex than just FieldName, can be prefixed with - to invert sort + dataType: Optional[DataType] = None + filter_: Optional[str | Filter] = None + """Underscore suffix to avoid name collisions""" + start: datetime + end: datetime + jump: timedelta = timedelta(hours=1) + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @model_validator(mode='after') + def check_datatype_needed(self) -> Self: + if self.fields is None or 'value' in self.fields: + if self.dataType is None: + raise ValueError("Value field requested, must specify datatype") + + return self + + def chunks(self) -> Iterator[QueryParams]: + current_start = self.start + while current_start < self.end: + current_end = current_start + self.jump + filter_=f'timestamp>={current_start.isoformat()};timestamp<{current_end.isoformat()}' + if self.filter_: + filter_ += f';{self.filter_}' + + yield QueryParams( + dataset=self.dataset, + groupBy=self.groupBy, + aggregator=self.aggregator, + fields=self.fields, + orderBy=self.orderBy, + dataType=self.dataType, + filter_=filter_ + ) + + current_start += self.jump + + + +class QueryResult(BaseModel): + cursor: Optional[str] = None + items: List[Datapoint] + + +class Client(BaseClient): + page_limit: int = 250 + """How many datapoints to request per page in a cursored fetch""" + + async def send( + self, + dataset: str, + data: List[IncomingDatapoint], + ) -> httpx.Response: + """ + Publishes data to Obelisk + + Parameters + ---------- + dataset : str + ID for the dataset to publish to + data : List[IncomingDatapoint] + List of Obelisk-acceptable datapoints. + Exact format varies between Classic or HFS, + caller is responsible for formatting. + + Raises + ------ + + ObeliskError + When the resulting status code is not 204, an :exc:`~obelisk.exceptions.ObeliskError` is raised. + """ + + response = await self.http_post( + f"{self.kind.root_url}/{dataset}/data/ingest", data=[x.model_dump(mode='json') for x in data] + ) + if response.status_code != 204: + msg = f"An error occured during data ingest. Status {response.status_code}, message: {response.text}" + self.log.warning(msg) + raise ObeliskError(msg) + return response + + async def fetch_single_chunk( + self, + params: QueryParams + ) -> QueryResult: + response = await self.http_get( + f"{self.kind.root_url}/{params.dataset}/data/query", + params=params.to_dict() + ) + + if response.status_code != 200: + self.log.warning(f"Unexpected status code: {response.status_code}") + raise ObeliskError(response.status_code, response.reason_phrase) + + try: + js = response.json() + return QueryResult.model_validate(js) + except json.JSONDecodeError as e: + msg = f"Obelisk response is not a JSON object: {e}" + self.log.warning(msg) + raise ObeliskError(msg) + except ValidationError as e: + msg = f"Response cannot be validated: {e}" + self.log.warning(msg) + raise ObeliskError(msg) + + async def query( + self, + params: QueryParams + ) -> List[Datapoint]: + params.cursor = None + result_set: List[Datapoint] = [] + result_limit = params.limit + + # Obelisk CORE does not actually stop emitting a cursor when done, limit serves as page limit + params.limit = self.page_limit + + while True: + result: QueryResult = await self.fetch_single_chunk( + params + ) + result_set.extend(result.items) + params.cursor = result.cursor + + if len(result_set) >= result_limit: + break + + return result_set + + async def query_time_chunked( + self, + params: ChunkedParams + ) -> AsyncIterator[List[Datapoint]]: + for chunk in params.chunks(): + yield await self.query( + chunk + ) + diff --git a/src/obelisk/asynchronous/producer.py b/src/obelisk/asynchronous/producer.py deleted file mode 100644 index 697b217..0000000 --- a/src/obelisk/asynchronous/producer.py +++ /dev/null @@ -1,54 +0,0 @@ -from typing import List - -import httpx - -from obelisk.asynchronous.client import Client -from obelisk.exceptions import ObeliskError -from obelisk.types import IngestMode, TimestampPrecision - - -class Producer(Client): - """ - Allows publishing of data to Obelisk. - """ - - async def send(self, dataset: str, data: List[dict], - precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, - mode: IngestMode = IngestMode.DEFAULT) -> httpx.Response: - """ - Publishes data to Obelisk - - Parameters - ---------- - dataset : str - ID for the dataset to publish to - data : List[dict] - List of Obelisk-acceptable datapoints. - Exact format varies between Classic or HFS, - caller is responsible for formatting. - precision : :class:`~obelisk.types.TimestampPrecision` = TimestampPrecision.MILLISECONDS - Precision used in the numeric timestamps contained in data. - Ensure it matches to avoid weird errors. - mode : :class:`~obelisk.types.IngestMode` = IngestMode.DEFAULT - See docs for :class:`~obelisk.types.IngestMode`. - - Raises - ------ - - ObeliskError - When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised. - """ - - params = { - 'datasetId': dataset, - 'timestampPrecision': precision.value, - 'mode': mode.value - } - - response = await self.http_post(f'{self._ingest_url}/{dataset}', data=data, - params=params) - if response.status_code != 204: - msg = f'An error occured during data ingest. Status {response.status_code}, message: {response.text}' - self.log.warning(msg) - raise ObeliskError(msg) - return response diff --git a/src/obelisk/sync/__init__.py b/src/obelisk/sync/__init__.py index 801ac08..66cf554 100644 --- a/src/obelisk/sync/__init__.py +++ b/src/obelisk/sync/__init__.py @@ -1,9 +1,7 @@ """ -This module provides wrappers for the classes in `obelisk.asynchronous` with a synchronous API. +This module provides wrappers for the classes in :mod:`obelisk.asynchronous` with a synchronous API. These hold on to a private event loop and block until a result is available. -There is no synchronous alternative to `obelisk.asynchronous.client.Client`. - Note ---- @@ -11,3 +9,5 @@ This is because it is internally nothing more than a wrapper over the asynchronous implementation. Use the asynchronous implementation in these situations. """ +__all__ = ["Obelisk"] +from .client import Obelisk diff --git a/src/obelisk/sync/consumer.py b/src/obelisk/sync/client.py similarity index 54% rename from src/obelisk/sync/consumer.py rename to src/obelisk/sync/client.py index 84e7cad..9aa4508 100644 --- a/src/obelisk/sync/consumer.py +++ b/src/obelisk/sync/client.py @@ -1,16 +1,22 @@ import asyncio from datetime import datetime, timedelta -from typing import List, Literal, Generator, Optional from math import floor +from typing import Generator, List, Literal, Optional -from obelisk.asynchronous.consumer import \ - Consumer as AsyncConsumer -from obelisk.strategies.retry import RetryStrategy, \ - NoRetryStrategy -from obelisk.types import QueryResult, Datapoint, ObeliskKind +import httpx +from obelisk.asynchronous import Obelisk as AsyncObelisk +from obelisk.strategies.retry import NoRetryStrategy, RetryStrategy +from obelisk.types import ( + Datapoint, + IngestMode, + ObeliskKind, + QueryResult, + TimestampPrecision, +) -class Consumer: + +class Obelisk: """ Component that contains all the logic to consume data from the Obelisk API (e.g. historical data, sse). @@ -23,22 +29,32 @@ class Consumer: loop: asyncio.AbstractEventLoop """Event loop used to run interal async operations""" - async_consumer: AsyncConsumer + async_obelisk: AsyncObelisk """The actual implementation this synchronous wrapper refers to""" - def __init__(self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), - kind: ObeliskKind = ObeliskKind.CLASSIC): - self.async_consumer = AsyncConsumer(client, secret, retry_strategy, kind) + def __init__( + self, + client: str, + secret: str, + retry_strategy: RetryStrategy = NoRetryStrategy(), + kind: ObeliskKind = ObeliskKind.CLASSIC, + ): + self.async_obelisk = AsyncObelisk(client, secret, retry_strategy, kind) self.loop = asyncio.get_event_loop() - def single_chunk(self, datasets: List[str], metrics: Optional[List[str]] = None, - fields: Optional[dict] = None, - from_timestamp: Optional[int] = None, to_timestamp: Optional[int] = None, - order_by: Optional[dict] = None, - filter_: Optional[dict] = None, - limit: Optional[int] = None, limit_by: Optional[dict] = None, - cursor: Optional[str] = None) -> QueryResult: + def fetch_single_chunk( + self, + datasets: List[str], + metrics: Optional[List[str]] = None, + fields: Optional[dict] = None, + from_timestamp: Optional[int] = None, + to_timestamp: Optional[int] = None, + order_by: Optional[dict] = None, + filter_: Optional[dict] = None, + limit: Optional[int] = None, + limit_by: Optional[dict] = None, + cursor: Optional[str] = None, + ) -> QueryResult: """ Queries one chunk of events from Obelisk for given parameters, does not handle paging over Cursors. @@ -78,23 +94,36 @@ def single_chunk(self, datasets: List[str], metrics: Optional[List[str]] = None, used when paging through large result sets. """ - self.async_consumer.log.info("Starting task") + self.async_obelisk.log.info("Starting task") task = self.loop.create_task( - self.async_consumer.single_chunk(datasets, metrics, fields, from_timestamp, - to_timestamp, order_by, filter_, - limit, limit_by, cursor)) - self.async_consumer.log.info("Blocking...") + self.async_obelisk.fetch_single_chunk( + datasets, + metrics, + fields, + from_timestamp, + to_timestamp, + order_by, + filter_, + limit, + limit_by, + cursor, + ) + ) + self.async_obelisk.log.info("Blocking...") return self.loop.run_until_complete(task) - - def query(self, datasets: List[str], metrics: Optional[List[str]] = None, - fields: Optional[dict] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict] = None, - filter_: Optional[dict] = None, - limit: Optional[int] = None, - limit_by: Optional[dict] = None) -> List[Datapoint]: + def query( + self, + datasets: List[str], + metrics: Optional[List[str]] = None, + fields: Optional[dict] = None, + from_timestamp: Optional[int] = None, + to_timestamp: Optional[int] = None, + order_by: Optional[dict] = None, + filter_: Optional[dict] = None, + limit: Optional[int] = None, + limit_by: Optional[dict] = None, + ) -> List[Datapoint]: """ Queries data from obelisk, automatically iterating when a cursor is returned. @@ -131,19 +160,31 @@ def query(self, datasets: List[str], metrics: Optional[List[str]] = None, to a specified maximum number. """ - task = self.loop.create_task( - self.async_consumer.query(datasets, metrics, fields, from_timestamp, - to_timestamp, order_by, filter_, limit, - limit_by)) + self.async_obelisk.query( + datasets, + metrics, + fields, + from_timestamp, + to_timestamp, + order_by, + filter_, + limit, + limit_by, + ) + ) return self.loop.run_until_complete(task) - - def query_time_chunked(self, datasets: List[str], metrics: List[str], - from_time: datetime, to_time: datetime, - jump: timedelta, filter_: Optional[dict] = None, - direction: Literal['asc', 'desc'] = 'asc' - ) -> Generator[List[Datapoint], None, None]: + def query_time_chunked( + self, + datasets: List[str], + metrics: List[str], + from_time: datetime, + to_time: datetime, + jump: timedelta, + filter_: Optional[dict] = None, + direction: Literal["asc", "desc"] = "asc", + ) -> Generator[List[Datapoint], None, None]: """ Fetches all data matching the provided filters, yielding one chunk at a time. @@ -171,9 +212,48 @@ def query_time_chunked(self, datasets: List[str], metrics: List[str], current_start = from_time while current_start < to_time: - yield self.query(datasets=datasets, metrics=metrics, - from_timestamp=floor(current_start.timestamp() * 1000), - to_timestamp=floor((current_start + jump).timestamp() * 1000 - 1), - order_by={"field": ["timestamp"], "ordering": direction}, - filter_=filter_) + yield self.query( + datasets=datasets, + metrics=metrics, + from_timestamp=floor(current_start.timestamp() * 1000), + to_timestamp=floor((current_start + jump).timestamp() * 1000 - 1), + order_by={"field": ["timestamp"], "ordering": direction}, + filter_=filter_, + ) current_start += jump + + def send( + self, + dataset: str, + data: List[dict], + precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, + mode: IngestMode = IngestMode.DEFAULT, + ) -> httpx.Response: + """ + Publishes data to Obelisk + + Parameters + ---------- + dataset : str + ID for the dataset to publish to + data : List[dict] + List of Obelisk-acceptable datapoints. + Exact format varies between Classic or HFS, + caller is responsible for formatting. + precision : TimestampPrecision = TimestampPrecision.MILLISECONDS + Precision used in the numeric timestamps contained in data. + Ensure it matches to avoid weird errors. + mode : IngestMode = IngestMode.DEFAULT + See docs for :class:`~obelisk.types.IngestMode`. + + Raises + ------ + + ObeliskError + When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised. + """ + + task = self.loop.create_task( + self.async_obelisk.send(dataset, data, precision, mode) + ) + return self.loop.run_until_complete(task) diff --git a/src/obelisk/sync/producer.py b/src/obelisk/sync/producer.py deleted file mode 100644 index 80ad66d..0000000 --- a/src/obelisk/sync/producer.py +++ /dev/null @@ -1,58 +0,0 @@ -import asyncio -from typing import List - -import httpx - -from obelisk.asynchronous.producer import \ - Producer as AsyncProducer -from obelisk.strategies.retry import RetryStrategy, \ - NoRetryStrategy -from obelisk.types import IngestMode, TimestampPrecision, \ - ObeliskKind - - -class Producer: - """ - Synchronous equivalient of :class:`~obelisk.asynchronous.producer.Producer`, - to publish data to Obelisk. - """ - - loop: asyncio.AbstractEventLoop - async_producer: AsyncProducer - - def __init__(self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), - kind: ObeliskKind = ObeliskKind.CLASSIC): - self.async_producer = AsyncProducer(client, secret, retry_strategy, kind) - self.loop = asyncio.get_event_loop() - - def send(self, dataset: str, data: List[dict], - precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, - mode: IngestMode = IngestMode.DEFAULT) -> httpx.Response: - """ - Publishes data to Obelisk - - Parameters - ---------- - dataset : str - ID for the dataset to publish to - data : List[dict] - List of Obelisk-acceptable datapoints. - Exact format varies between Classic or HFS, - caller is responsible for formatting. - precision : TimestampPrecision = TimestampPrecision.MILLISECONDS - Precision used in the numeric timestamps contained in data. - Ensure it matches to avoid weird errors. - mode : IngestMode = IngestMode.DEFAULT - See docs for :class:`~obelisk.types.IngestMode`. - - Raises - ------ - - ObeliskError - When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised. - """ - - task = self.loop.create_task( - self.async_producer.send(dataset, data, precision, mode)) - return self.loop.run_until_complete(task) diff --git a/src/obelisk/types.py b/src/obelisk/types.py deleted file mode 100644 index f4ce39a..0000000 --- a/src/obelisk/types.py +++ /dev/null @@ -1,55 +0,0 @@ -from enum import Enum -from typing import List, Any, Optional - -from pydantic import BaseModel - - -class IngestMode(str, Enum): - """ - Classic Obelisk accepts three ways of submitting new data. - This integrates with the concept of Streams, - which is a way to process datapoints as they come in. - - The default submission method is to publish both to long-term storage - and stream. - This IngestMode can be changed to change this default. - - Does not apply to HFS - """ - - DEFAULT = 'default' - STREAM_ONLY = 'stream_only' - STORE_ONLY = 'store_only' - - -class TimestampPrecision(str, Enum): - """ - When ingesting data it is important to specify which precision provided UNIX timestamps are in. - If a provided timestamp is in seconds, - but interpreted by Obelisk as milliseconds, it would erroneously be somewhere in the past. - """ - - __choices__ = ('SECONDS', 'MILLISECONDS', 'MICROSECONDS') - - SECONDS = 'seconds' - MILLISECONDS = 'milliseconds' - MICROSECONDS = 'microseconds' - - -class Datapoint(BaseModel, extra='allow'): - timestamp: int - value: Any - dataset: Optional[str] = None - metric: Optional[str] = None - source: Optional[str] = None - userId: Optional[int] = None # Only if HFS and no other name for field - - -class QueryResult(BaseModel): - items: List[Datapoint] - cursor: Optional[str] = None - - -class ObeliskKind(str, Enum): - CLASSIC = 'classic' - HFS = 'hfs' diff --git a/src/obelisk/types/__init__.py b/src/obelisk/types/__init__.py new file mode 100644 index 0000000..05e8b81 --- /dev/null +++ b/src/obelisk/types/__init__.py @@ -0,0 +1,116 @@ +from enum import Enum +from typing import List, Any, Optional + +from pydantic import BaseModel + + +class IngestMode(str, Enum): + """ + Classic Obelisk accepts three ways of submitting new data. + This integrates with the concept of Streams, + which is a way to process datapoints as they come in. + + The default submission method is to publish both to long-term storage + and stream. + This IngestMode can be changed to change this default. + + Does not apply to HFS + """ + + DEFAULT = 'default' + STREAM_ONLY = 'stream_only' + STORE_ONLY = 'store_only' + + +class TimestampPrecision(str, Enum): + """ + When ingesting data it is important to specify which precision provided UNIX timestamps are in. + If a provided timestamp is in seconds, + but interpreted by Obelisk as milliseconds, it would erroneously be somewhere in the past. + """ + + __choices__ = ('SECONDS', 'MILLISECONDS', 'MICROSECONDS') + + SECONDS = 'seconds' + MILLISECONDS = 'milliseconds' + MICROSECONDS = 'microseconds' + + +class Datapoint(BaseModel, extra='allow'): + timestamp: int + value: Any + dataset: Optional[str] = None + metric: Optional[str] = None + source: Optional[str] = None + userId: Optional[int] = None # Only if HFS and no other name for field + + +class QueryResult(BaseModel): + items: List[Datapoint] + cursor: Optional[str] = None + + +class ObeliskKind(str, Enum): + CLASSIC = 'classic' + HFS = 'hfs' + CORE = 'core' + + @property + def token_url(self) -> str: + match self: + case ObeliskKind.CLASSIC: + return 'https://obelisk.ilabt.imec.be/api/v3/auth/token' + case ObeliskKind.HFS: + return 'https://obelisk-hfs.discover.ilabt.imec.be/auth/realms/obelisk-hfs/protocol/openid-connect/token' + case ObeliskKind.CORE: + return 'https://auth.obelisk.discover.ilabt.imec.be/realms/obelisk/protocol/openid-connect/token' + + @property + def root_url(self) -> str: + match self: + case ObeliskKind.CLASSIC: + return 'https://obelisk.ilabt.imec.be/api/v3' + case ObeliskKind.HFS: + return 'https://obelisk-hfs.discover.ilabt.imec.be' + case ObeliskKind.CORE: + return 'https://obelisk.discover.ilabt.imec.be/datasets' + + @property + def query_url(self) -> str: + match self: + case ObeliskKind.CLASSIC: + return 'https://obelisk.ilabt.imec.be/api/v3/data/query/events' + case ObeliskKind.HFS: + return 'https://obelisk-hfs.discover.ilabt.imec.be/data/query/events' + case ObeliskKind.CORE: + raise NotImplementedError() + + @property + def ingest_url(self) -> str: + match self: + case ObeliskKind.CLASSIC: + return 'https://obelisk.ilabt.imec.be/api/v3/data/ingest' + case ObeliskKind.HFS: + return 'https://obelisk-hfs.discover.ilabt.imec.be/data/ingest' + case ObeliskKind.CORE: + raise NotImplementedError() + + @property + def stream_url(self) -> str | None: + match self: + case ObeliskKind.CLASSIC: + return 'https://obelisk.ilabt.imec.be/api/v3/data/streams' + case ObeliskKind.HFS: + return None + case ObeliskKind.CORE: + raise NotImplementedError() + + @property + def use_json_auth(self) -> bool: + match self: + case ObeliskKind.CLASSIC: + return True + case ObeliskKind.HFS: + return False + case ObeliskKind.CORE: + return False diff --git a/src/obelisk/types/core.py b/src/obelisk/types/core.py new file mode 100644 index 0000000..74be833 --- /dev/null +++ b/src/obelisk/types/core.py @@ -0,0 +1,147 @@ +""" +Types specific to Obelisk CORE, including an RSQL filter implementation + +To create a filter, look at :class:`Filter`. +Example: + +>>> from datetime import datetime +>>> f = (Filter().add_and( +... Comparison.equal('source', 'test source'), +... Comparison.is_in('metricType', ['number', 'number[]']), +... ).add_or( +... Comparison.less('timestamp', datetime.fromtimestamp(1757422128)) +... )) +>>> print(f) +((('source'=='test source');('metricType'=in=('number', 'number[]'))),('timestamp'<'2025-09-09T14:48:48')) +""" +from __future__ import annotations +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, List + + +FieldName = str # TODO: validate field names? +"""https://obelisk.pages.ilabt.imec.be/obelisk-core/query.html#available-data-point-fields +Field names are not validated at this time, due to the inherent complexity. +""" + + +class Constraint(ABC): + pass + + +class Comparison(): + left: FieldName + right: Any + op: str + + def __init__(self, left: FieldName, right: Any, op: str): + self.left = left + self.right = right + self.op = op + + def __str__(self) -> str: + right = self._sstr(self.right) + if not right.startswith('('): + right = f"'{right}'" + + return f"('{self.left}'{self.op}{right})" + + @staticmethod + def _sstr(item: Any): + """Smart string conversion""" + if isinstance(item, datetime): + return item.isoformat() + return str(item) + + @staticmethod + def _iterable_to_group(iter: Iterable[Any]) -> str: + """Produces a group of the form ("a","b")""" + return str(tuple([Comparison._sstr(x) for x in iter])) + + @classmethod + def equal(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, "==") + + @classmethod + def not_equal(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, "!=") + + @classmethod + def less(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, "<") + + @classmethod + def less_equal(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, "<=") + + @classmethod + def greater(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, ">") + + @classmethod + def greater_equal(cls, left: FieldName, right: Any) -> Comparison: + return Comparison(left, right, ">=") + + @classmethod + def is_in(cls, left: FieldName, right: Iterable[Any]) -> Comparison: + return Comparison(left, cls._iterable_to_group(right), "=in=") + + @classmethod + def is_not_in(cls, left: FieldName, right: Iterable[Any]) -> Comparison: + return Comparison(left, cls._iterable_to_group(right), "=out=") + + @classmethod + def null(cls, left: FieldName) -> Comparison: + return Comparison(left, "", "=null=") + + @classmethod + def not_null(cls, left: FieldName) -> Comparison: + return Comparison(left, "", "=notnull=") + + +Item = Constraint | Comparison + + +class And(Constraint): + content: List[Item] + + def __init__(self, *args: Item): + self.content = list(args) + + def __str__(self) -> str: + return "(" + ";".join([str(x) for x in self.content]) + ")" + + +class Or(Constraint): + content: List[Item] + + def __init__(self, *args: Item): + self.content = list(args) + + def __str__(self) -> str: + return "(" + ",".join([str(x) for x in self.content]) + ")" + + +class Filter(): + content: Item | None = None + + def __init__(self, content: Constraint | None = None): + self.content = content + + def __str__(self) -> str: + return str(self.content) + + def add_and(self, *other: Item) -> Filter: + if self.content is None: + self.content = And(*other) + else: + self.content = And(self.content, *other) + return self + + def add_or(self, *other: Item) -> Filter: + if self.content is None: + self.content = Or(*other) + else: + self.content = Or(self.content, *other) + return self diff --git a/src/tests/asynchronous/__init__.py b/src/tests/asynchronous/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/obelisk/asynchronous/consumer_test.py b/src/tests/asynchronous/client_test.py similarity index 62% rename from src/obelisk/asynchronous/consumer_test.py rename to src/tests/asynchronous/client_test.py index 7348792..f6ff4a7 100644 --- a/src/obelisk/asynchronous/consumer_test.py +++ b/src/tests/asynchronous/client_test.py @@ -1,15 +1,16 @@ import pytest -from .consumer import Consumer - -pytest_plugins = ('pytest_asyncio',) +from obelisk.asynchronous import Obelisk +# Intentionally public client, restricted to public datasets. client_id = "682c6c46604b3b3be35429df" client_secret = "7136832d-01be-456a-a1fe-25c7f9e130c5" +pytest_plugins = ('pytest_asyncio',) + @pytest.mark.asyncio -async def test_demo_igent(): - consumer = Consumer(client=client_id, secret=client_secret) - result = await consumer.single_chunk( +async def test_fetch_demo_igent(): + consumer = Obelisk(client=client_id, secret=client_secret) + result = await consumer.fetch_single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, diff --git a/src/tests/asynchronous/core_test.py b/src/tests/asynchronous/core_test.py new file mode 100644 index 0000000..8a19112 --- /dev/null +++ b/src/tests/asynchronous/core_test.py @@ -0,0 +1,6 @@ +from obelisk.asynchronous.core import QueryParams + +def test_query_param_serialize(): + q = QueryParams(dataset="83989232", filter_="(metric=='smartphone.application::string')", dataType='string') + dump = q.to_dict() + assert "filter" in dump diff --git a/src/tests/sync/__init__.py b/src/tests/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/obelisk/sync/consumer_test.py b/src/tests/sync/client_test.py similarity index 68% rename from src/obelisk/sync/consumer_test.py rename to src/tests/sync/client_test.py index f053cfb..cecc4ed 100644 --- a/src/obelisk/sync/consumer_test.py +++ b/src/tests/sync/client_test.py @@ -1,11 +1,11 @@ -from .consumer import Consumer +from obelisk.sync import Obelisk client_id = "682c6c46604b3b3be35429df" client_secret = "7136832d-01be-456a-a1fe-25c7f9e130c5" -def test_demo_igent(): - consumer = Consumer(client=client_id,secret=client_secret) - result = consumer.single_chunk( +def test_demo_igent_fetch(): + consumer = Obelisk(client=client_id, secret=client_secret) + result = consumer.fetch_single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, @@ -16,16 +16,16 @@ def test_demo_igent(): assert len(result.items) == 2 def test_two_instances(): - consumer_one = Consumer(client=client_id,secret=client_secret) - consumer_two = Consumer(client=client_id,secret=client_secret) - result_one = consumer_one.single_chunk( + consumer_one = Obelisk(client=client_id, secret=client_secret) + consumer_two = Obelisk(client=client_id, secret=client_secret) + result_one = consumer_one.fetch_single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, limit=2 ) - result_two = consumer_two.single_chunk( + result_two = consumer_two.fetch_single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, diff --git a/src/tests/typetest/__init__.py b/src/tests/typetest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/typetest/filter_test.py b/src/tests/typetest/filter_test.py new file mode 100644 index 0000000..7c9eea4 --- /dev/null +++ b/src/tests/typetest/filter_test.py @@ -0,0 +1,19 @@ +from obelisk.types.core import Filter, Comparison +from datetime import datetime + + +def test_basic_filter(): + test_dt = datetime.now() + f = Filter() \ + .add_and( + Comparison.equal('source', 'test source'), + )\ + .add_or( + Comparison.less('timestamp', test_dt) + )\ + .add_or( + Comparison.is_in('metricType', ['number', 'number[]']), + ) + + expected = f"(((('source'=='test source')),('timestamp'<'{test_dt.isoformat()}')),('metricType'=in=('number', 'number[]')))" + assert str(f) == expected