Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ dmypy.json

# Cython debug symbols
cython_debug/

# DS_Store
**/.DS_Store
33 changes: 31 additions & 2 deletions backend/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,53 @@

from app.services.crucible_svc import CrucibleService
from tests.unit.fake_elastic import FakeAsyncElasticsearch
from tests.unit.fake_elastic_service import FakeElasticService
from tests.unit.fake_splunk import FakeSplunkService


@pytest.fixture
def fake_config(monkeypatch):
"""Provide a fake configuration"""

vyper = Vyper(config_name="ocpperf")
vyper.set("TEST.url", "http://elastic.example.com:9200")
vyper.set("TEST.indice", "cdmv9dev-hce") # Created for test_hce.py
vyper.set(
"telco.config.job_url", "https://jenkins.telco.example.com/job/telco-tests"
) # Created for test_telco.py
monkeypatch.setattr("app.config.get_config", lambda: vyper)


@pytest.fixture
def fake_elastic(monkeypatch, fake_config):
"""Replace the actual elastic client with a fake"""
fake_elastic = FakeAsyncElasticsearch("http://elastic.example.com:9200")
monkeypatch.setattr(
"app.services.crucible_svc.AsyncElasticsearch",
lambda *args, **kwargs: fake_elastic,
)
return fake_elastic


@pytest.fixture
def fake_elastic_service(monkeypatch, fake_config):
"""Replace the actual ElasticService with fake for commons testing"""
fake_elastic_service = FakeElasticService("TEST")
for path in ("hce", "ocm", "ols", "ocp", "quay", "utils"):
monkeypatch.setattr(
f"app.api.v1.commons.{path}.ElasticService",
lambda *args, **kwargs: fake_elastic_service,
)
return fake_elastic_service


@pytest.fixture
def fake_splunk(monkeypatch, fake_config):
"""Replace the actual SplunkService with fake for telco testing"""
fake_splunk = FakeSplunkService("TEST")
monkeypatch.setattr(
"app.services.crucible_svc.AsyncElasticsearch", FakeAsyncElasticsearch
"app.api.v1.commons.telco.SplunkService", lambda configpath: fake_splunk
)
return fake_splunk


@pytest.fixture
Expand Down
146 changes: 146 additions & 0 deletions backend/tests/unit/fake_elastic_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from collections import defaultdict
from typing import Any, Optional

from app.services.search import ElasticService


class FakeElasticService(ElasticService):
"""
Fake ElasticService for testing commons functions without actual Elasticsearch connections.

This fake implementation provides canned responses for ElasticService methods used in commons modules:
- post(): Used by getData() methods
- filterPost(): Used by getFilterData() methods

"""

def __init__(self, configpath: str = "TEST", index: str = ""):
self.configpath = configpath
self.index = index
self.data = defaultdict(list)
# Error simulation flags
self.post_error = None
self.filterPost_error = None

# Testing helpers to manage fake post/filterPost responses
def set_post_response(
self,
response_type: str,
data_list: Optional[list[dict[str, Any]]] = None,
filter_data: Optional[list[dict[str, Any]]] = None,
summary: Optional[dict[str, Any]] = None,
upstream_list: Optional[list[str]] = None,
total: Optional[int] = None,
repeat: int = 1,
error: Optional[Exception] = None,
):
"""Set a canned response or error for ElasticService methods (post/filterPost)

Args:
response_type: "post" for getData responses, "filterPost" for getFilterData responses
data_list: list of source data objects (for post responses, total auto-calculated from length)
filter_data: filter aggregation data (for filterPost responses)
summary: summary data (for filterPost responses)
upstream_list: list of upstream job names (for filterPost responses)
total: total count (for filterPost responses only, auto-calculated for post responses)
repeat: how many times to return this response
error: Exception to raise instead of returning response data
"""
if error is not None:
# Set error instead of response data
if response_type == "post":
self.post_error = error
elif response_type == "filterPost":
self.filterPost_error = error
else:
raise ValueError(
f"Invalid response_type: {response_type}. Must be 'post' or 'filterPost'"
)
return

# Set normal response data
if response_type == "post":
# Format for getData responses
hits = []
if data_list:
for d in data_list:
hits.append({"_source": d})
# Auto-calculate total from data_list length
calculated_total = len(data_list or [])
response = {"data": hits, "total": calculated_total}
elif response_type == "filterPost":
# Format for getFilterData responses
if total is None:
raise ValueError("total parameter is required for filterPost responses")
response = {
"total": total,
"filterData": filter_data or [],
"summary": summary or {},
}
if upstream_list:
response["upstreamList"] = upstream_list
else:
raise ValueError(
f"Invalid response_type: {response_type}. Must be 'post' or 'filterPost'"
)

# Store in a special key for commons responses
commons_key = f"commons_{response_type}"
if commons_key not in self.data:
self.data[commons_key] = []
for c in range(repeat):
self.data[commons_key].append(response)

# Mock ElasticService methods
async def post(
self,
query,
indice=None,
size=10000,
start_date=None,
end_date=None,
timestamp_field=None,
**kwargs,
):
"""Mock the ElasticService.post method"""
if self.post_error:
raise self.post_error

# Check if a response has been registered
if "commons_post" in self.data and len(self.data["commons_post"]) > 0:
return self.data["commons_post"].pop(0)

# Raise exception if no response registered - indicates broken test
raise Exception(
"No mock data was defined for ElasticService.post() - call set_post_response() first"
)

async def filterPost(
self,
start_datetime,
end_datetime,
aggregate,
refiner,
timestamp_field="timestamp",
indice=None,
**kwargs,
):
"""Mock the ElasticService.filterPost method"""
if self.filterPost_error:
raise self.filterPost_error

# Check if a response has been registered
if (
"commons_filterPost" in self.data
and len(self.data["commons_filterPost"]) > 0
):
return self.data["commons_filterPost"].pop(0)

# Raise exception if no response registered - indicates broken test
raise Exception(
"No mock data was defined for ElasticService.filterPost() - call set_post_response() first"
)

async def close(self):
"""Mock the ElasticService.close method - no-op for testing"""
pass
157 changes: 157 additions & 0 deletions backend/tests/unit/fake_splunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Optional

from app.services.splunk import SplunkService


@dataclass
class SplunkRequest:
"""Represents a request made to the fake Splunk service for testing verification."""

query: Optional[str]
size: int
offset: int
sort: Optional[str]
searchList: Optional[str]
configpath: str

def __eq__(self, other) -> bool:
return (
self.query == other.query
and self.size == other.size
and self.offset == other.offset
and self.sort == other.sort
and self.searchList == other.searchList
and self.configpath == other.configpath
)


class FakeSplunkService(SplunkService):
configpath: str
data: dict[str, Any]
query_error: Optional[Exception]
filterPost_error: Optional[Exception]

"""
Fake SplunkService for testing telco functions without actual Splunk connections.
This fake implementation follows the same pattern as FakeAsyncElasticsearch,
providing canned responses for SplunkService methods used in telco.py:
- query(): Used by telco.getData()
- filterPost(): Used by telco.getFilterData()

Usage:
fake_splunk = FakeSplunkService()

# For getData() tests
fake_splunk.set_query_response(data_list=[...], total=100)

# For getFilterData() tests
fake_splunk.set_filter_response(data_list=[...], summary={...}, total=50)

# For error testing:
fake_splunk.set_query_response(error=Exception("Connection failed"))
"""

def __init__(self, configpath: str = "TEST"):
self.configpath = configpath
self.data = defaultdict(list)
# Error simulation flags
self.query_error = None
self.filterPost_error = None

# Testing helpers to manage fake searches
def set_query_response(
self,
data_list: Optional[list[dict[str, Any]]] = None,
total: int = 0,
error: Optional[Exception] = None,
return_none: bool = False,
):
"""
Set a canned response for SplunkService.query() method.

This method is used by telco.getData() to retrieve time-series data.

Args:
data_list: List of telco data objects to return
total: Total count of results
error: Exception to raise instead of returning response data
return_none: If True, return None instead of a response dict
"""
if error is not None:
self.query_error = error
return

# Clear any previous error
self.query_error = None

if return_none:
response = None
else:
response = {"data": data_list or [], "total": total}

self.data["query_responses"].append(response)

def set_filter_response(
self,
data_list: Optional[list[dict[str, Any]]] = None,
summary: Optional[dict[str, Any]] = None,
total: int = 0,
error: Optional[Exception] = None,
):
"""
Set a canned response for SplunkService.filterPost() method.

This method is used by telco.getFilterData() to retrieve aggregation data.

Args:
data_list: List of aggregation data objects to return
summary: Summary statistics (e.g., {"success": 50, "failure": 10})
total: Total count of results
error: Exception to raise instead of returning response data
"""
if error is not None:
self.filterPost_error = error
return

# Clear any previous error
self.filterPost_error = None

response = {"data": data_list or [], "summary": summary or {}, "total": total}

self.data["filter_responses"].append(response)

# Mock SplunkService methods
async def query(
self,
query: Optional[str] = None,
size: int = 10,
offset: int = 0,
sort: Optional[str] = None,
searchList: Optional[str] = None,
):
# Check for error simulation
if self.query_error:
raise self.query_error

# Return canned response or default empty response
if self.data["query_responses"]:
return self.data["query_responses"].pop(0)

return {"data": [], "total": 0}

async def filterPost(
self,
query: Optional[str] = None,
searchList: Optional[str] = None,
):
# Check for error simulation
if self.filterPost_error:
raise self.filterPost_error

# Return canned response or default empty response
if self.data["filter_responses"]:
return self.data["filter_responses"].pop(0)

return {"data": [], "summary": {}, "total": 0}
Loading
Loading