Skip to content

Conversation

AGhafaryy
Copy link
Contributor

No description provided.

@AGhafaryy AGhafaryy requested review from zprobst and ccloes as code owners June 27, 2025 23:18
Copy link

codecov bot commented Jun 27, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.25%. Comparing base (d32b3cd) to head (f06c0ba).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #426      +/-   ##
==========================================
+ Coverage   98.21%   98.25%   +0.03%     
==========================================
  Files         152      154       +2     
  Lines        6111     6249     +138     
==========================================
+ Hits         6002     6140     +138     
  Misses        109      109              
Flag Coverage Δ
3.10-macos-latest 98.23% <100.00%> (+0.05%) ⬆️
3.10-ubuntu-latest 98.22% <100.00%> (+0.04%) ⬆️
3.10-windows-latest 98.22% <100.00%> (+0.04%) ⬆️
3.11-macos-latest 98.22% <100.00%> (+0.02%) ⬆️
3.11-ubuntu-latest 98.22% <100.00%> (+0.04%) ⬆️
3.11-windows-latest 98.22% <100.00%> (+0.04%) ⬆️
3.12-macos-latest 98.22% <100.00%> (+0.04%) ⬆️
3.12-ubuntu-latest 98.22% <100.00%> (+0.04%) ⬆️
3.12-windows-latest 98.22% <100.00%> (+0.04%) ⬆️
3.13-macos-latest 98.23% <100.00%> (+0.05%) ⬆️
3.13-ubuntu-latest 98.22% <100.00%> (+0.04%) ⬆️
3.13-windows-latest 98.22% <100.00%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

return headers

@property
def _normalized_query(self) -> str:
Copy link
Contributor

@angelosantos4 angelosantos4 Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for base queries the search is implied:
index=index_a
But for subqueries:
index=index_a | join [search index=index_b] the search field is required.

Comment on lines +236 to +249
try:
root = ET.fromstring(response.text)
for elem in root.iter():
if (
"dispatchState" in elem.tag
or elem.get("name") == "dispatchState"
):
dispatch_state = elem.text
break
except Exception as e:
self.logger.warning(
"Failed to parse job status",
extra={"error": str(e), "search_id": search_id},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there is a pattern here between parsing things as Json and parsing them as XML. I would recommend creating a parser class that will take an arbitrary object of one of the two types and get the corresponding object a bit more declaratively.

class SplunkResponseParser:
    def parse(object: Json | XML )
        ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to riff on this - create a parser interface that you can call parse on and it returns to you a consistent result. Switch on the return type and create an instance of either a JsonSplunkResponseParser or XmlSplunkResponseParser. This can be done as a factory method on SplunkResponseParser.

This is a refactoring stategy called "replace conditional with polymorphism"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the interface idea.

Comment on lines +226 to +231
job_status = response.json()
dispatch_state = (
job_status.get("entry", [{}])[0]
.get("content", {})
.get("dispatchState")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are heavy assumptions here about the state of the JSON, do we know why it is the first field in the entry list? I fear that a parallel job run from a different splunk extractor will make a job, and it will end up being the top of the list here. Or is it the case that a search can have multiple Jobs? Why is it the first one that we choose that we care about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

for entry in job_status.get("entry", []):
    Do checking on all entries that some condition is true. Or that one of the entries is the one we are looking for.

Comment on lines +266 to +267
await asyncio.sleep(2) # Wait 2 seconds before checking again
wait_count += 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more standard to do something akin to:

attempts = 0
while should_continue:
    should_continue = success_condition AND attempts < MAX_ATTEMPTS
    attempts += 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #425 we're discussing adding a library like tenacity to handle things like this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to tenacity

elif dispatch_state == "FAILED":
raise RuntimeError(f"Search job failed: {search_id}")

await asyncio.sleep(2) # Wait 2 seconds before checking again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also replace this with a global variable SPLUNK_STATUS_CHECK_PERIOD_SECONDS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would very much prefer this. My mantra is "prefer no magic numbers/constant strings". Ideally, most constants should be extracted into config or something like it, but at the very least make them module level constants until it's clear which ones need to change often.

Comment on lines +202 to +204
async def _wait_for_job_completion(
self, client: AsyncClient, search_id: str, max_wait_seconds: int = 300
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try typing the return types. Instead of nesting try: catches, you can return a completion state of True and False in order to hand off the expected state to the over-arching function.

"search": self._normalized_query,
"earliest_time": self.earliest_time,
"latest_time": self.latest_time,
"max_count": str(self.max_count),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why turn to string here, maybe have the interface expect a string

Comment on lines +25 to +42
class SplunkExtractor(Extractor):
@classmethod
def from_file_data(
cls,
base_url: str,
query: str,
auth_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
earliest_time: str = "-24h",
latest_time: str = "now",
verify_ssl: bool = True,
request_timeout_seconds: int = 300,
max_count: int = 10000,
app: str = "search",
user: Optional[str] = None,
chunk_size: int = 1000,
) -> "SplunkExtractor":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to seperate the functionality into a Client and have the extract_records

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Comment on lines +280 to +281
"count": str(self.chunk_size),
"offset": str(self.offset),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these strings, do we need to change the input parameters to just intake strigified integers?

Comment on lines +291 to +296
if response.status_code != 200:
raise HTTPStatusError(
f"Failed to get job results: {response.status_code}",
request=response.request,
response=response,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the error is a 504, it could be the case that we hit the endpoint often and they would try to rate-limit us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO:

  • 401/403/404 - stop trying
  • 2xx are mostly ok. Limiting to 200 may be a problem, but it rarely is.
  • 3xx should almost never be seen, because they should be handled by the underlying http library and do whatever forwarding is necessary
  • 4xx,5xx - log and try again with backoff
  • Connection/DNS error - log and try again with backoff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I feel like this indicates there might be a way to make a more pluggable "client" idea once we figure out a unified retry logic. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an attempt to do this in our internal library but have not had the time to introduce it nodestream. Does everything from the status check handling, retrying, error handling, and json safe loading.

@angelosantos4
Copy link
Contributor

Also note that there is a Splunk client supported by Splunk itself.

Not sure if there is much distinguising the two, but it might make a lot of the abstraction easier like the Job handling:
https://docs.splunk.com/DocumentationStatic/PythonSDK/1.1/client.html


async def resume_from_checkpoint(self, checkpoint_object):
"""Resume extraction from a checkpoint."""
if checkpoint_object:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not required. Nodestream does this. You should only be getting called this with a non-none object.

Comment on lines +25 to +42
class SplunkExtractor(Extractor):
@classmethod
def from_file_data(
cls,
base_url: str,
query: str,
auth_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
earliest_time: str = "-24h",
latest_time: str = "now",
verify_ssl: bool = True,
request_timeout_seconds: int = 300,
max_count: int = 10000,
app: str = "search",
user: Optional[str] = None,
chunk_size: int = 1000,
) -> "SplunkExtractor":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Comment on lines +119 to +122
def get_jobs_endpoint(self) -> str:
"""Get the Splunk jobs endpoint."""
return f"{self.base_url}/servicesNS/{self.user}/{self.app}/search/jobs"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be computed in the constructor - I don't see a clear reason to generate this string constantly.

"""Get the results endpoint for a specific search job."""
return f"{self.base_url}/servicesNS/{self.user}/{self.app}/search/jobs/{search_id}/results"

async def _create_search_job(self, client: AsyncClient) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has a lot going on:

  • formulates a request body
  • makes the request
  • parses the response in one of two different possible return types.

For a simpler request, that may be fine, but given the leg work required, its easy to get lost in this function. I'd recommend breaking this down. Make this function tell a "story" while other functions describe the details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, shouldn't this be search/v2/jobs/{search_id}/results or am I looking at the wrong docs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually actually, do we want to support a streaming splunk search result from search/v2/jobs/export?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, more splunk admin/hacker knowledge is being unlocked:

when we say "splunk extractor" we should consider supporting (and being clear about which we're supporting):

  • ad-hoc "streaming" queries (time-bound, no job-id required)
  • running a "job" and then getting the results (requires creating the job and then getting the results, what this PR covers)
  • accessing "scheduled" search results (by schedule name)

Comment on lines +236 to +249
try:
root = ET.fromstring(response.text)
for elem in root.iter():
if (
"dispatchState" in elem.tag
or elem.get("name") == "dispatchState"
):
dispatch_state = elem.text
break
except Exception as e:
self.logger.warning(
"Failed to parse job status",
extra={"error": str(e), "search_id": search_id},
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to riff on this - create a parser interface that you can call parse on and it returns to you a consistent result. Switch on the return type and create an instance of either a JsonSplunkResponseParser or XmlSplunkResponseParser. This can be done as a factory method on SplunkResponseParser.

This is a refactoring stategy called "replace conditional with polymorphism"

Comment on lines +266 to +267
await asyncio.sleep(2) # Wait 2 seconds before checking again
wait_count += 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #425 we're discussing adding a library like tenacity to handle things like this as well.

)
return search_id

async def _wait_for_job_completion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE flags this as having too much "cognitive complexity" (Aka too many nested branches and loops)

It would be good to refactor chunks of this into smaller functions to reduce the amount of brainpower required to figure out what's going on.

)
except (json.JSONDecodeError, KeyError, IndexError):
# Try XML parsing
import xml.etree.ElementTree as ET
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more "pythonic" to say:

Suggested change
import xml.etree.ElementTree as ET
from xml.etree import ElementTree

typically the community reserves aliases for modules (and almost always lowercase only):

    import pandas as pd
    import numpy as np

Comment on lines +175 to +178
mock_response = mocker.MagicMock()
mock_response.status_code = 201
mock_response.json.return_value = {"sid": "json123"}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should prefer using responses or pytest-httpx to handle this mocking, they're more careful about rejecting unexpected calls and emulating an actual http call.


# Helper property tests
def test_splunk_extractor_auth_property_with_token(splunk_extractor):
assert_that(splunk_extractor._auth, equal_to(None)) # Token goes in header
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general I would avoid testing internal/private properties and functions to avoid making tests that rely on implementation details.

These tests can be handled by responses and pytest-httpx by having these mock libraries watching for expected auth headers.

Comment on lines +395 to +405
assert_that(results, has_length(3))
assert_that(
results[0],
has_entries(
{
"_time": "2023-01-01T10:00:00",
"host": "server1",
"message": "Login successful",
}
),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not assert the whole result?

Suggested change
assert_that(results, has_length(3))
assert_that(
results[0],
has_entries(
{
"_time": "2023-01-01T10:00:00",
"host": "server1",
"message": "Login successful",
}
),
)
results == [
{
"_time": "2023-01-01T10:00:00",
"host": "server1",
"message": "Login successful",
},
{...},
{...}
]


# Should handle gracefully and return empty results
assert_that(results, has_length(0))
assert_that(splunk_extractor.is_done, equal_to(True))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hamcrest is great IMO for lists, but it just feels ugly for equalities. This is purely me being a hater of hamcrest.

Suggested change
assert_that(splunk_extractor.is_done, equal_to(True))
assert splunk_extractor.is_done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants