Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions nodestream/pipeline/value_providers/jmespath_value_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Any, Iterable, Type

import jmespath
Expand All @@ -8,6 +9,7 @@
from .context import ProviderContext
from .value_provider import ValueProvider, ValueProviderException


# `QueryStrategy` is here to provide the seam for different optimizations
# for executing jmespath queries. We can either execute a "fully fledged"
# jmespath query or we can implement some simple access patterns that
Expand Down Expand Up @@ -62,8 +64,8 @@ class JmespathValueProvider(ValueProvider):
def install_yaml_tag(cls, loader: Type[SafeLoader]):
loader.add_constructor(
"!jmespath",
lambda loader, node: cls.from_string_expression(
loader.construct_scalar(node)
lambda loader_param, node: cls.from_string_expression(
loader_param.construct_scalar(node)
),
)

Expand All @@ -74,24 +76,26 @@ def from_string_expression(cls, expression: str):
def __init__(self, strategy: QueryStrategy) -> None:
self.strategy = strategy

def search(self, context: ProviderContext):
raw_search = self.strategy.search(context)
if raw_search is None:
return
if isinstance(raw_search, list):
yield from raw_search
else:
yield raw_search

def single_value(self, context: ProviderContext) -> Any:
try:
return next(self.search(context), None)
return self.strategy.search(context)
except Exception as e:
raise ValueProviderException(str(context.document), self) from e

def many_values(self, context: ProviderContext) -> Iterable[Any]:
try:
yield from self.search(context)
result = self.strategy.search(context)
if not result:
return

if isinstance(result, (bool, str, int, float)):
yield result
elif isinstance(result, Sequence):
yield from result
elif result:
yield result

return
except Exception as e:
raise ValueProviderException(str(context.document), self) from e

Expand All @@ -101,7 +105,5 @@ def __str__(self) -> str:

SafeDumper.add_representer(
JmespathValueProvider,
lambda dumper, jmespath: dumper.represent_scalar(
"!jmespath", str(jmespath.strategy)
),
lambda dumper, jp: dumper.represent_scalar("!jmespath", str(jp.strategy)),
)
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ markers = [
"integration: marks the test as an integration test (deselect with '-m \"not integration\"')",
"e2e: marks the test as an end-to-end test (deselect with '-m \"not e2e\"')",
]
log_cli = false
log_cli_level = "INFO"
asyncio_default_fixture_loop_scope = "function"

[tool.poetry.scripts]
nodestream = 'nodestream.cli.application:run'
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{"first_name": "Zach", "last_name": "Probst"},
{"first_name": "Chad", "last_name": "Cloes"},
],
"project": {"tags": ["graphdb", "python"]},
"project": {"tags": ["graphdb", "python"], "name": "project_name"},
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,104 @@
from collections.abc import Generator

import pytest
from hamcrest import assert_that, equal_to, has_length, none

from nodestream.pipeline.value_providers import JmespathValueProvider
from nodestream.model import DesiredIngestion
from nodestream.pipeline.value_providers import JmespathValueProvider, ProviderContext
from nodestream.pipeline.value_providers.value_provider import ValueProviderException


def test_single_value_present(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("team.name")
assert_that(
subject.single_value(blank_context_with_document), equal_to("nodestream")
assert subject.single_value(blank_context_with_document) == "nodestream"


def test_single_value_present_complicated(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("*.name")
assert subject.single_value(blank_context_with_document) == [
"nodestream",
"project_name",
]


def test_single_value_present_complicated_always_arr(blank_context_with_document):
context = ProviderContext(
{"toplevel": {"level2": [{"name": "first"}, {"name": "second"}]}},
DesiredIngestion(),
)
subject = JmespathValueProvider.from_string_expression("toplevel.level2[].name")
assert subject.single_value(context) == [
"first",
"second",
]


def test_many_values_present_complicated_always_arr(blank_context_with_document):
context = ProviderContext(
{"toplevel": {"level2": [{"name": "first"}, {"name": "second"}]}},
DesiredIngestion(),
)
subject = JmespathValueProvider.from_string_expression("toplevel.level2[].name")
result = subject.many_values(context)

assert isinstance(result, Generator)
assert next(result) == "first"
assert next(result) == "second"
with pytest.raises(StopIteration):
next(result)


def test_single_value_missing(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("team.description")
assert_that(subject.single_value(blank_context_with_document), none())
assert subject.single_value(blank_context_with_document) is None


def test_single_value_is_list(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("project.tags")
result = subject.single_value(blank_context_with_document)
assert_that(result, equal_to("graphdb"))
assert result == ["graphdb", "python"]


def test_multiple_values_missing(blank_context_with_document):
def test_many_values_missing(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("team.description")
assert_that(list(subject.many_values(blank_context_with_document)), has_length(0))
result = list(subject.many_values(blank_context_with_document))
assert result == []


def test_many_values_dict_return():
context = ProviderContext({"a": {"b": {"name": "example"}}}, DesiredIngestion())
subject = JmespathValueProvider.from_string_expression("a.b")
result = list(subject.many_values(context))
assert result == [{"name": "example"}]

def test_multiple_values_returns_one_value(blank_context_with_document):

def test_single_value_single_level_array():
context = ProviderContext(
{"toplevel": ["test1", "test2", "test3"]}, DesiredIngestion()
)
subject = JmespathValueProvider.from_string_expression("toplevel")
result = list(subject.single_value(context))
assert result == ["test1", "test2", "test3"]


def test_many_values_single_level_array():
context = ProviderContext(
{"toplevel": ["test1", "test2", "test3"]}, DesiredIngestion()
)
subject = JmespathValueProvider.from_string_expression("toplevel")
result = list(subject.many_values(context))
assert result == ["test1", "test2", "test3"]


def test_many_values_returns_one_value(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("team.name")
result = list(subject.many_values(blank_context_with_document))
assert_that(result, has_length(1))
assert_that(result[0], equal_to("nodestream"))
assert result == ["nodestream"]


def test_multiple_values_hit(blank_context_with_document):
def test_many_values_hit(blank_context_with_document):
subject = JmespathValueProvider.from_string_expression("project.tags")
result = subject.many_values(blank_context_with_document)
assert_that(list(result), equal_to(["graphdb", "python"]))
result = list(subject.many_values(blank_context_with_document))
assert result == ["graphdb", "python"]


def test_single_value_error(blank_context_with_document):
Expand All @@ -55,14 +115,14 @@ def test_single_value_error(blank_context_with_document):
assert some_text_from_document in error_message


def test_multiple_values_error(blank_context_with_document):
def test_many_values_error(blank_context_with_document):
# this will error because team2 does not exist causing the join to throw an error
expression_with_error = "join('/', [team.name || '', team2.name])"
subject = JmespathValueProvider.from_string_expression(expression_with_error)

with pytest.raises(Exception) as e_info:
generator = subject.many_values(blank_context_with_document)
list(generator)
with pytest.raises(ValueProviderException) as e_info:
list(subject.many_values(blank_context_with_document))

error_message = str(e_info.value)

assert expression_with_error in error_message
Loading