Skip to content

Commit 6158512

Browse files
golorodenclaude
andauthored
feat: add pandas DataFrame integration for event analysis (#164)
* feat: add pandas DataFrame integration Add support for converting event streams to pandas DataFrames for data analysis and exploration. The events_to_dataframe() function accepts an AsyncGenerator of events and returns a DataFrame with all event fields as columns. - Add eventsourcingdb.pandas module with events_to_dataframe() function - Add comprehensive test suite following TDD approach - Add pandas as optional dependency group - Update README with usage examples and flattening guide 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * fix: ensure DataFrame has correct columns for empty event streams When converting an empty event stream, the DataFrame now includes all expected columns instead of being completely empty. This is achieved by explicitly defining the columns parameter when creating the DataFrame. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * fix: resolve type checking error for DataFrame columns Use conditional logic to create DataFrame with columns parameter only when the event list is empty, avoiding type checking issues with pyright while maintaining correct behavior. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * fix: add type ignore for pandas DataFrame columns parameter Add type: ignore annotation for the columns parameter when creating an empty DataFrame, as pyright's pandas stubs don't recognize list[str] as a valid type despite it working correctly at runtime. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * fix: use pd.Index for DataFrame columns to satisfy type checker Set DataFrame columns using pd.Index after creation instead of passing columns parameter to constructor, resolving pyright type checking errors while maintaining correct functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * fix: use pyright-specific ignore for DataFrame columns parameter Revert to using columns parameter with correct pyright ignore syntax (reportArgumentType) instead of attempting to set columns property after creation, which fails for empty DataFrames. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent e797728 commit 6158512

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed

README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,60 @@ async for row in rows:
314314
await rows.aclose()
315315
```
316316

317+
### Converting Events to pandas DataFrame
318+
319+
For data analysis and exploration, you can convert event streams to pandas DataFrames. To use this feature, install the client SDK with pandas support:
320+
321+
```shell
322+
pip install eventsourcingdb[pandas]
323+
```
324+
325+
Import the `events_to_dataframe` function from the `eventsourcingdb.pandas` module and pass an event stream to it:
326+
327+
```python
328+
from eventsourcingdb import Client, ReadEventsOptions
329+
from eventsourcingdb.pandas import events_to_dataframe
330+
331+
events = client.read_events(
332+
subject = '/books',
333+
options = ReadEventsOptions(
334+
recursive = True
335+
),
336+
)
337+
338+
df = await events_to_dataframe(events)
339+
```
340+
341+
The resulting DataFrame includes all event fields as columns: `event_id`, `time`, `source`, `subject`, `type`, `data`, `spec_version`, `data_content_type`, `predecessor_hash`, `hash`, `trace_parent`, `trace_state`, and `signature`.
342+
343+
The `data` field remains as a dict column, which you can access directly:
344+
345+
```python
346+
# Access the data field
347+
for index, row in df.iterrows():
348+
print(row['data'])
349+
```
350+
351+
#### Flattening the Data Field
352+
353+
For analysis of specific event types, you may want to flatten the `data` field into separate columns. Use pandas' `json_normalize` function:
354+
355+
```python
356+
import pandas as pd
357+
358+
# Filter for a specific event type first
359+
book_acquired_df = df[df['type'] == 'io.eventsourcingdb.library.book-acquired']
360+
361+
# Flatten the data field
362+
flattened_df = book_acquired_df.join(
363+
pd.json_normalize(book_acquired_df['data'])
364+
)
365+
366+
# Now you can access data fields as columns
367+
print(flattened_df['title'])
368+
print(flattened_df['author'])
369+
```
370+
317371
### Observing Events
318372

319373
To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.

eventsourcingdb/pandas.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from typing import AsyncGenerator
2+
3+
import pandas as pd
4+
5+
from .event.event import Event
6+
7+
__all__ = ["events_to_dataframe"]
8+
9+
10+
async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFrame:
11+
"""
12+
Convert an async stream of events to a pandas DataFrame.
13+
14+
All event fields are included as columns. The 'data' field remains
15+
as a dict column - use pd.json_normalize() for flattening if needed.
16+
17+
Args:
18+
events: An async generator of Event objects
19+
20+
Returns:
21+
A pandas DataFrame with all event fields as columns
22+
"""
23+
event_list = []
24+
25+
async for event in events:
26+
event_dict = {
27+
"event_id": event.event_id,
28+
"time": event.time,
29+
"source": event.source,
30+
"subject": event.subject,
31+
"type": event.type,
32+
"data": event.data,
33+
"spec_version": event.spec_version,
34+
"data_content_type": event.data_content_type,
35+
"predecessor_hash": event.predecessor_hash,
36+
"hash": event.hash,
37+
"trace_parent": event.trace_parent,
38+
"trace_state": event.trace_state,
39+
"signature": event.signature,
40+
}
41+
event_list.append(event_dict)
42+
43+
if len(event_list) == 0:
44+
return pd.DataFrame(
45+
columns=[ # pyright: ignore[reportArgumentType]
46+
"event_id",
47+
"time",
48+
"source",
49+
"subject",
50+
"type",
51+
"data",
52+
"spec_version",
53+
"data_content_type",
54+
"predecessor_hash",
55+
"hash",
56+
"trace_parent",
57+
"trace_state",
58+
"signature",
59+
]
60+
)
61+
62+
return pd.DataFrame(event_list)

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ dev = [
2222
"bandit==1.8.6",
2323
"pyright==1.1.407",
2424
"twine==6.2.0",
25+
"pandas>=2.0.0",
26+
]
27+
pandas = [
28+
"pandas>=2.0.0",
2529
]
2630

2731
[build-system]

tests/test_events_to_dataframe.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
from datetime import datetime
2+
3+
import pandas as pd
4+
import pytest
5+
6+
from eventsourcingdb import EventCandidate, ReadEventsOptions
7+
from eventsourcingdb.pandas import events_to_dataframe
8+
9+
from .conftest import TestData
10+
from .shared.database import Database
11+
12+
13+
class TestEventsToDataframe:
14+
@staticmethod
15+
@pytest.mark.asyncio
16+
async def test_returns_empty_dataframe_for_empty_event_stream(
17+
database: Database,
18+
) -> None:
19+
client = database.get_client()
20+
21+
events = client.read_events("/nonexistent", ReadEventsOptions(recursive=False))
22+
df = await events_to_dataframe(events)
23+
24+
assert isinstance(df, pd.DataFrame)
25+
assert len(df) == 0
26+
assert list(df.columns) == [
27+
"event_id",
28+
"time",
29+
"source",
30+
"subject",
31+
"type",
32+
"data",
33+
"spec_version",
34+
"data_content_type",
35+
"predecessor_hash",
36+
"hash",
37+
"trace_parent",
38+
"trace_state",
39+
"signature",
40+
]
41+
42+
@staticmethod
43+
@pytest.mark.asyncio
44+
async def test_returns_dataframe_with_single_event(
45+
database: Database, test_data: TestData
46+
) -> None:
47+
client = database.get_client()
48+
49+
await client.write_events(
50+
[
51+
EventCandidate(
52+
source=test_data.TEST_SOURCE_STRING,
53+
subject=test_data.REGISTERED_SUBJECT,
54+
type=test_data.REGISTERED_TYPE,
55+
data=test_data.JANE_DATA,
56+
trace_parent=test_data.TRACE_PARENT_1,
57+
)
58+
]
59+
)
60+
61+
events = client.read_events(
62+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
63+
)
64+
df = await events_to_dataframe(events)
65+
66+
assert isinstance(df, pd.DataFrame)
67+
assert len(df) == 1
68+
69+
assert df.iloc[0]["source"] == test_data.TEST_SOURCE_STRING
70+
assert df.iloc[0]["subject"] == test_data.REGISTERED_SUBJECT
71+
assert df.iloc[0]["type"] == test_data.REGISTERED_TYPE
72+
assert df.iloc[0]["data"] == test_data.JANE_DATA
73+
assert df.iloc[0]["trace_parent"] == test_data.TRACE_PARENT_1
74+
75+
@staticmethod
76+
@pytest.mark.asyncio
77+
async def test_returns_dataframe_with_multiple_events(
78+
prepared_database: Database, test_data: TestData
79+
) -> None:
80+
client = prepared_database.get_client()
81+
82+
events = client.read_events("/users", ReadEventsOptions(recursive=True))
83+
df = await events_to_dataframe(events)
84+
85+
assert isinstance(df, pd.DataFrame)
86+
expected_event_count = 4
87+
assert len(df) == expected_event_count
88+
89+
assert df.iloc[0]["data"] == test_data.JANE_DATA
90+
assert df.iloc[1]["data"] == test_data.JANE_DATA
91+
assert df.iloc[2]["data"] == test_data.JOHN_DATA
92+
assert df.iloc[3]["data"] == test_data.JOHN_DATA
93+
94+
@staticmethod
95+
@pytest.mark.asyncio
96+
async def test_dataframe_has_correct_column_names(
97+
database: Database, test_data: TestData
98+
) -> None:
99+
client = database.get_client()
100+
101+
await client.write_events(
102+
[
103+
EventCandidate(
104+
source=test_data.TEST_SOURCE_STRING,
105+
subject=test_data.REGISTERED_SUBJECT,
106+
type=test_data.REGISTERED_TYPE,
107+
data=test_data.JANE_DATA,
108+
)
109+
]
110+
)
111+
112+
events = client.read_events(
113+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
114+
)
115+
df = await events_to_dataframe(events)
116+
117+
expected_columns = [
118+
"event_id",
119+
"time",
120+
"source",
121+
"subject",
122+
"type",
123+
"data",
124+
"spec_version",
125+
"data_content_type",
126+
"predecessor_hash",
127+
"hash",
128+
"trace_parent",
129+
"trace_state",
130+
"signature",
131+
]
132+
assert list(df.columns) == expected_columns
133+
134+
@staticmethod
135+
@pytest.mark.asyncio
136+
async def test_data_field_remains_as_dict(
137+
database: Database, test_data: TestData
138+
) -> None:
139+
client = database.get_client()
140+
141+
await client.write_events(
142+
[
143+
EventCandidate(
144+
source=test_data.TEST_SOURCE_STRING,
145+
subject=test_data.REGISTERED_SUBJECT,
146+
type=test_data.REGISTERED_TYPE,
147+
data=test_data.JANE_DATA,
148+
)
149+
]
150+
)
151+
152+
events = client.read_events(
153+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
154+
)
155+
df = await events_to_dataframe(events)
156+
157+
assert isinstance(df.iloc[0]["data"], dict)
158+
assert df.iloc[0]["data"] == test_data.JANE_DATA
159+
160+
@staticmethod
161+
@pytest.mark.asyncio
162+
async def test_time_field_is_datetime_object(
163+
database: Database, test_data: TestData
164+
) -> None:
165+
client = database.get_client()
166+
167+
await client.write_events(
168+
[
169+
EventCandidate(
170+
source=test_data.TEST_SOURCE_STRING,
171+
subject=test_data.REGISTERED_SUBJECT,
172+
type=test_data.REGISTERED_TYPE,
173+
data=test_data.JANE_DATA,
174+
)
175+
]
176+
)
177+
178+
events = client.read_events(
179+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
180+
)
181+
df = await events_to_dataframe(events)
182+
183+
assert isinstance(df.iloc[0]["time"], datetime)
184+
185+
@staticmethod
186+
@pytest.mark.asyncio
187+
async def test_optional_fields_can_be_none(
188+
database: Database, test_data: TestData
189+
) -> None:
190+
client = database.get_client()
191+
192+
await client.write_events(
193+
[
194+
EventCandidate(
195+
source=test_data.TEST_SOURCE_STRING,
196+
subject=test_data.REGISTERED_SUBJECT,
197+
type=test_data.REGISTERED_TYPE,
198+
data=test_data.JANE_DATA,
199+
)
200+
]
201+
)
202+
203+
events = client.read_events(
204+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
205+
)
206+
df = await events_to_dataframe(events)
207+
208+
assert df.iloc[0]["trace_parent"] is None or pd.isna(df.iloc[0]["trace_parent"])
209+
assert df.iloc[0]["trace_state"] is None or pd.isna(df.iloc[0]["trace_state"])
210+
assert df.iloc[0]["signature"] is None or pd.isna(df.iloc[0]["signature"])
211+
212+
@staticmethod
213+
@pytest.mark.asyncio
214+
async def test_all_event_fields_are_present(
215+
database: Database, test_data: TestData
216+
) -> None:
217+
client = database.get_client()
218+
219+
await client.write_events(
220+
[
221+
EventCandidate(
222+
source=test_data.TEST_SOURCE_STRING,
223+
subject=test_data.REGISTERED_SUBJECT,
224+
type=test_data.REGISTERED_TYPE,
225+
data=test_data.JANE_DATA,
226+
trace_parent=test_data.TRACE_PARENT_1,
227+
)
228+
]
229+
)
230+
231+
events = client.read_events(
232+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
233+
)
234+
df = await events_to_dataframe(events)
235+
236+
row = df.iloc[0]
237+
238+
assert isinstance(row["event_id"], str)
239+
assert isinstance(row["time"], datetime)
240+
assert row["source"] == test_data.TEST_SOURCE_STRING
241+
assert row["subject"] == test_data.REGISTERED_SUBJECT
242+
assert row["type"] == test_data.REGISTERED_TYPE
243+
assert row["data"] == test_data.JANE_DATA
244+
assert isinstance(row["spec_version"], str)
245+
assert isinstance(row["data_content_type"], str)
246+
assert isinstance(row["predecessor_hash"], str)
247+
assert isinstance(row["hash"], str)
248+
assert row["trace_parent"] == test_data.TRACE_PARENT_1

0 commit comments

Comments
 (0)