Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/compute_engines/backends/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ def from_name(name: str) -> DataFrameBackend:
@staticmethod
def infer_from_entity_df(entity_df) -> Optional[DataFrameBackend]:
if (
not entity_df
or isinstance(entity_df, pyarrow.Table)
entity_df is None
or isinstance(entity_df, pd.DataFrame)
or isinstance(entity_df, pyarrow.Table)
or (isinstance(entity_df, str) and not entity_df)
):
return PandasBackend()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from unittest.mock import MagicMock, patch

import pandas as pd
import pyarrow
import pytest

from feast.infra.compute_engines.backends.factory import BackendFactory
from feast.infra.compute_engines.backends.pandas_backend import PandasBackend


class TestBackendFactoryFromName:
def test_pandas_backend(self):
backend = BackendFactory.from_name("pandas")
assert isinstance(backend, PandasBackend)

@patch(
"feast.infra.compute_engines.backends.factory.BackendFactory._get_polars_backend"
)
def test_polars_backend(self, mock_get_polars):
mock_backend = MagicMock()
mock_get_polars.return_value = mock_backend

result = BackendFactory.from_name("polars")
assert result is mock_backend

def test_unsupported_name_raises(self):
with pytest.raises(ValueError, match="Unsupported backend name"):
BackendFactory.from_name("dask")


class TestBackendFactoryInferFromEntityDf:
def test_pandas_dataframe_returns_pandas_backend(self):
"""A non-empty pandas DataFrame is detected via isinstance check."""
df = pd.DataFrame({"a": [1, 2]})
backend = BackendFactory.infer_from_entity_df(df)
assert isinstance(backend, PandasBackend)

def test_empty_pandas_dataframe_returns_pandas_backend(self):
"""An empty pandas DataFrame returns PandasBackend."""
df = pd.DataFrame()
backend = BackendFactory.infer_from_entity_df(df)
assert isinstance(backend, PandasBackend)

def test_pyarrow_table(self):
table = pyarrow.table({"a": [1, 2]})
backend = BackendFactory.infer_from_entity_df(table)
assert isinstance(backend, PandasBackend)

def test_none_input(self):
backend = BackendFactory.infer_from_entity_df(None)
assert isinstance(backend, PandasBackend)

def test_empty_string_input(self):
backend = BackendFactory.infer_from_entity_df("")
assert isinstance(backend, PandasBackend)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from unittest.mock import MagicMock

from feast.infra.compute_engines.base import ComputeEngine
from feast.infra.compute_engines.local.compute import (
LocalComputeEngine,
LocalComputeEngineConfig,
)


class TestLocalComputeEngineConfig:
def test_default_type(self):
config = LocalComputeEngineConfig()
assert config.type == "local"

def test_default_backend_none(self):
config = LocalComputeEngineConfig()
assert config.backend is None

def test_custom_backend(self):
config = LocalComputeEngineConfig(backend="polars")
assert config.backend == "polars"


class TestLocalComputeEngine:
def _make_engine(self):
return LocalComputeEngine(
repo_config=MagicMock(),
offline_store=MagicMock(),
online_store=MagicMock(),
)

def test_is_compute_engine(self):
engine = self._make_engine()
assert isinstance(engine, ComputeEngine)

def test_update_is_noop(self):
engine = self._make_engine()
# Should not raise
engine.update(
project="test",
views_to_delete=[],
views_to_keep=[],
entities_to_delete=[],
entities_to_keep=[],
)

def test_teardown_is_noop(self):
engine = self._make_engine()
# Should not raise
engine.teardown_infra(
project="test",
fvs=[],
entities=[],
)

def test_stores_config(self):
repo_config = MagicMock()
offline = MagicMock()
online = MagicMock()
engine = LocalComputeEngine(
repo_config=repo_config,
offline_store=offline,
online_store=online,
)
assert engine.repo_config is repo_config
assert engine.offline_store is offline
assert engine.online_store is online
100 changes: 100 additions & 0 deletions sdk/python/tests/unit/infra/compute_engines/test_local_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from unittest.mock import MagicMock

import pytest

from feast.infra.common.materialization_job import MaterializationJobStatus
from feast.infra.compute_engines.local.job import (
LocalMaterializationJob,
LocalRetrievalJob,
)


class TestLocalMaterializationJob:
def test_status(self):
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.SUCCEEDED,
)
assert job.status() == MaterializationJobStatus.SUCCEEDED

def test_job_id(self):
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.RUNNING,
)
assert job.job_id() == "test-job-1"

def test_error_none_by_default(self):
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.SUCCEEDED,
)
assert job.error() is None

def test_error_stored(self):
err = RuntimeError("something failed")
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.ERROR,
error=err,
)
assert job.error() is err

def test_should_not_be_retried(self):
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.ERROR,
)
assert job.should_be_retried() is False

def test_url_is_none(self):
job = LocalMaterializationJob(
job_id="test-job-1",
status=MaterializationJobStatus.SUCCEEDED,
)
assert job.url() is None


class TestLocalRetrievalJob:
def test_full_feature_names(self):
job = LocalRetrievalJob(
plan=None,
context=MagicMock(),
full_feature_names=True,
)
assert job.full_feature_names is True

def test_full_feature_names_false(self):
job = LocalRetrievalJob(
plan=None,
context=MagicMock(),
full_feature_names=False,
)
assert job.full_feature_names is False

def test_error_none_by_default(self):
job = LocalRetrievalJob(plan=None, context=MagicMock())
assert job.error() is None

def test_error_stored(self):
err = ValueError("bad data")
job = LocalRetrievalJob(plan=None, context=MagicMock(), error=err)
assert job.error() is err

def test_on_demand_feature_views_default_empty(self):
job = LocalRetrievalJob(plan=None, context=MagicMock())
assert job.on_demand_feature_views == []

def test_metadata_default_none(self):
job = LocalRetrievalJob(plan=None, context=MagicMock())
assert job.metadata is None

def test_to_remote_storage_raises(self):
job = LocalRetrievalJob(plan=None, context=MagicMock())
with pytest.raises(NotImplementedError, match="Remote storage"):
job.to_remote_storage()

def test_to_sql_raises(self):
job = LocalRetrievalJob(plan=None, context=MagicMock())
with pytest.raises(NotImplementedError, match="SQL generation"):
job.to_sql()
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from datetime import datetime
from unittest.mock import MagicMock

from feast.infra.common.materialization_job import (
MaterializationJobStatus,
MaterializationTask,
)


class TestMaterializationJobStatus:
def test_all_statuses_defined(self):
expected = {
"WAITING",
"RUNNING",
"AVAILABLE",
"ERROR",
"CANCELLING",
"CANCELLED",
"SUCCEEDED",
"PAUSED",
"RETRYING",
}
actual = {s.name for s in MaterializationJobStatus}
assert actual == expected


class TestMaterializationTask:
def test_creation(self):
mock_fv = MagicMock()
task = MaterializationTask(
project="my_project",
feature_view=mock_fv,
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 2),
)
assert task.project == "my_project"
assert task.feature_view is mock_fv
assert task.start_time == datetime(2024, 1, 1)
assert task.end_time == datetime(2024, 1, 2)

def test_default_only_latest(self):
mock_fv = MagicMock()
task = MaterializationTask(
project="p",
feature_view=mock_fv,
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 2),
)
assert task.only_latest is True

def test_default_disable_event_timestamp(self):
mock_fv = MagicMock()
task = MaterializationTask(
project="p",
feature_view=mock_fv,
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 2),
)
assert task.disable_event_timestamp is False
90 changes: 90 additions & 0 deletions sdk/python/tests/unit/infra/compute_engines/test_topo_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from unittest.mock import MagicMock

from feast.infra.compute_engines.algorithms.topo import (
topological_sort,
topological_sort_multiple,
)
from feast.infra.compute_engines.dag.node import DAGNode


def _make_node(name, inputs=None):
"""Create a mock DAGNode."""
node = MagicMock(spec=DAGNode)
node.name = name
node.inputs = inputs or []
return node


class TestTopologicalSort:
def test_single_node(self):
root = _make_node("root")
result = topological_sort(root)
assert len(result) == 1
assert result[0] is root

def test_linear_chain(self):
"""A -> B -> C should produce [A, B, C]."""
a = _make_node("A")
b = _make_node("B", inputs=[a])
c = _make_node("C", inputs=[b])

result = topological_sort(c)
assert len(result) == 3
# Dependencies must come before dependents
assert result.index(a) < result.index(b)
assert result.index(b) < result.index(c)

def test_diamond_dependency(self):
"""
A → B
A → C
B,C → D
Should visit A before B and C, and B/C before D.
"""
a = _make_node("A")
b = _make_node("B", inputs=[a])
c = _make_node("C", inputs=[a])
d = _make_node("D", inputs=[b, c])

result = topological_sort(d)
assert len(result) == 4
assert result.index(a) < result.index(b)
assert result.index(a) < result.index(c)
assert result.index(b) < result.index(d)
assert result.index(c) < result.index(d)

def test_no_duplicates(self):
"""Shared dependencies should appear only once."""
shared = _make_node("shared")
b = _make_node("B", inputs=[shared])
c = _make_node("C", inputs=[shared])
root = _make_node("root", inputs=[b, c])

result = topological_sort(root)
assert result.count(shared) == 1


class TestTopologicalSortMultiple:
def test_multiple_roots_no_overlap(self):
r1 = _make_node("root1")
r2 = _make_node("root2")

result = topological_sort_multiple([r1, r2])
assert len(result) == 2
assert r1 in result
assert r2 in result

def test_multiple_roots_with_shared_dep(self):
shared = _make_node("shared")
r1 = _make_node("root1", inputs=[shared])
r2 = _make_node("root2", inputs=[shared])

result = topological_sort_multiple([r1, r2])
assert len(result) == 3
assert result.count(shared) == 1
assert result.index(shared) < result.index(r1)
assert result.index(shared) < result.index(r2)

def test_empty_roots(self):
result = topological_sort_multiple([])
assert result == []
Loading
Loading