Skip to content

Commit 8879481

Browse files
golorodenclaude
andcommitted
feat: add pandas DataFrame integration
Add support for converting event streams to pandas DataFrames for data analysis and exploration. The events_to_dataframe() function accepts an AsyncGenerator of events and returns a DataFrame with all event fields as columns. - Add eventsourcingdb.pandas module with events_to_dataframe() function - Add comprehensive test suite following TDD approach - Add pandas as optional dependency group - Update README with usage examples and flattening guide 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent ca69441 commit 8879481

File tree

4 files changed

+349
-0
lines changed

4 files changed

+349
-0
lines changed

README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,60 @@ async for row in rows:
314314
await rows.aclose()
315315
```
316316

317+
### Converting Events to pandas DataFrame
318+
319+
For data analysis and exploration, you can convert event streams to pandas DataFrames. To use this feature, install the client SDK with pandas support:
320+
321+
```shell
322+
pip install eventsourcingdb[pandas]
323+
```
324+
325+
Import the `events_to_dataframe` function from the `eventsourcingdb.pandas` module and pass an event stream to it:
326+
327+
```python
328+
from eventsourcingdb import Client, ReadEventsOptions
329+
from eventsourcingdb.pandas import events_to_dataframe
330+
331+
events = client.read_events(
332+
subject = '/books',
333+
options = ReadEventsOptions(
334+
recursive = True
335+
),
336+
)
337+
338+
df = await events_to_dataframe(events)
339+
```
340+
341+
The resulting DataFrame includes all event fields as columns: `event_id`, `time`, `source`, `subject`, `type`, `data`, `spec_version`, `data_content_type`, `predecessor_hash`, `hash`, `trace_parent`, `trace_state`, and `signature`.
342+
343+
The `data` field remains as a dict column, which you can access directly:
344+
345+
```python
346+
# Access the data field
347+
for index, row in df.iterrows():
348+
print(row['data'])
349+
```
350+
351+
#### Flattening the Data Field
352+
353+
For analysis of specific event types, you may want to flatten the `data` field into separate columns. Use pandas' `json_normalize` function:
354+
355+
```python
356+
import pandas as pd
357+
358+
# Filter for a specific event type first
359+
book_acquired_df = df[df['type'] == 'io.eventsourcingdb.library.book-acquired']
360+
361+
# Flatten the data field
362+
flattened_df = book_acquired_df.join(
363+
pd.json_normalize(book_acquired_df['data'])
364+
)
365+
366+
# Now you can access data fields as columns
367+
print(flattened_df['title'])
368+
print(flattened_df['author'])
369+
```
370+
317371
### Observing Events
318372

319373
To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.

eventsourcingdb/pandas.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from typing import AsyncGenerator
2+
3+
import pandas as pd
4+
5+
from .event.event import Event
6+
7+
__all__ = ["events_to_dataframe"]
8+
9+
10+
async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFrame:
11+
"""
12+
Convert an async stream of events to a pandas DataFrame.
13+
14+
All event fields are included as columns. The 'data' field remains
15+
as a dict column - use pd.json_normalize() for flattening if needed.
16+
17+
Args:
18+
events: An async generator of Event objects
19+
20+
Returns:
21+
A pandas DataFrame with all event fields as columns
22+
"""
23+
event_list = []
24+
25+
async for event in events:
26+
event_dict = {
27+
"event_id": event.event_id,
28+
"time": event.time,
29+
"source": event.source,
30+
"subject": event.subject,
31+
"type": event.type,
32+
"data": event.data,
33+
"spec_version": event.spec_version,
34+
"data_content_type": event.data_content_type,
35+
"predecessor_hash": event.predecessor_hash,
36+
"hash": event.hash,
37+
"trace_parent": event.trace_parent,
38+
"trace_state": event.trace_state,
39+
"signature": event.signature,
40+
}
41+
event_list.append(event_dict)
42+
43+
return pd.DataFrame(event_list)

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ dev = [
2222
"bandit==1.8.6",
2323
"pyright==1.1.407",
2424
"twine==6.2.0",
25+
"pandas>=2.0.0",
26+
]
27+
pandas = [
28+
"pandas>=2.0.0",
2529
]
2630

2731
[build-system]

tests/test_events_to_dataframe.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
from datetime import datetime
2+
3+
import pandas as pd
4+
import pytest
5+
6+
from eventsourcingdb import EventCandidate, ReadEventsOptions
7+
from eventsourcingdb.pandas import events_to_dataframe
8+
9+
from .conftest import TestData
10+
from .shared.database import Database
11+
12+
13+
class TestEventsToDataframe:
14+
@staticmethod
15+
@pytest.mark.asyncio
16+
async def test_returns_empty_dataframe_for_empty_event_stream(
17+
database: Database,
18+
) -> None:
19+
client = database.get_client()
20+
21+
events = client.read_events("/nonexistent", ReadEventsOptions(recursive=False))
22+
df = await events_to_dataframe(events)
23+
24+
assert isinstance(df, pd.DataFrame)
25+
assert len(df) == 0
26+
assert list(df.columns) == [
27+
"event_id",
28+
"time",
29+
"source",
30+
"subject",
31+
"type",
32+
"data",
33+
"spec_version",
34+
"data_content_type",
35+
"predecessor_hash",
36+
"hash",
37+
"trace_parent",
38+
"trace_state",
39+
"signature",
40+
]
41+
42+
@staticmethod
43+
@pytest.mark.asyncio
44+
async def test_returns_dataframe_with_single_event(
45+
database: Database, test_data: TestData
46+
) -> None:
47+
client = database.get_client()
48+
49+
await client.write_events(
50+
[
51+
EventCandidate(
52+
source=test_data.TEST_SOURCE_STRING,
53+
subject=test_data.REGISTERED_SUBJECT,
54+
type=test_data.REGISTERED_TYPE,
55+
data=test_data.JANE_DATA,
56+
trace_parent=test_data.TRACE_PARENT_1,
57+
)
58+
]
59+
)
60+
61+
events = client.read_events(
62+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
63+
)
64+
df = await events_to_dataframe(events)
65+
66+
assert isinstance(df, pd.DataFrame)
67+
assert len(df) == 1
68+
69+
assert df.iloc[0]["source"] == test_data.TEST_SOURCE_STRING
70+
assert df.iloc[0]["subject"] == test_data.REGISTERED_SUBJECT
71+
assert df.iloc[0]["type"] == test_data.REGISTERED_TYPE
72+
assert df.iloc[0]["data"] == test_data.JANE_DATA
73+
assert df.iloc[0]["trace_parent"] == test_data.TRACE_PARENT_1
74+
75+
@staticmethod
76+
@pytest.mark.asyncio
77+
async def test_returns_dataframe_with_multiple_events(
78+
prepared_database: Database, test_data: TestData
79+
) -> None:
80+
client = prepared_database.get_client()
81+
82+
events = client.read_events("/users", ReadEventsOptions(recursive=True))
83+
df = await events_to_dataframe(events)
84+
85+
assert isinstance(df, pd.DataFrame)
86+
expected_event_count = 4
87+
assert len(df) == expected_event_count
88+
89+
assert df.iloc[0]["data"] == test_data.JANE_DATA
90+
assert df.iloc[1]["data"] == test_data.JANE_DATA
91+
assert df.iloc[2]["data"] == test_data.JOHN_DATA
92+
assert df.iloc[3]["data"] == test_data.JOHN_DATA
93+
94+
@staticmethod
95+
@pytest.mark.asyncio
96+
async def test_dataframe_has_correct_column_names(
97+
database: Database, test_data: TestData
98+
) -> None:
99+
client = database.get_client()
100+
101+
await client.write_events(
102+
[
103+
EventCandidate(
104+
source=test_data.TEST_SOURCE_STRING,
105+
subject=test_data.REGISTERED_SUBJECT,
106+
type=test_data.REGISTERED_TYPE,
107+
data=test_data.JANE_DATA,
108+
)
109+
]
110+
)
111+
112+
events = client.read_events(
113+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
114+
)
115+
df = await events_to_dataframe(events)
116+
117+
expected_columns = [
118+
"event_id",
119+
"time",
120+
"source",
121+
"subject",
122+
"type",
123+
"data",
124+
"spec_version",
125+
"data_content_type",
126+
"predecessor_hash",
127+
"hash",
128+
"trace_parent",
129+
"trace_state",
130+
"signature",
131+
]
132+
assert list(df.columns) == expected_columns
133+
134+
@staticmethod
135+
@pytest.mark.asyncio
136+
async def test_data_field_remains_as_dict(
137+
database: Database, test_data: TestData
138+
) -> None:
139+
client = database.get_client()
140+
141+
await client.write_events(
142+
[
143+
EventCandidate(
144+
source=test_data.TEST_SOURCE_STRING,
145+
subject=test_data.REGISTERED_SUBJECT,
146+
type=test_data.REGISTERED_TYPE,
147+
data=test_data.JANE_DATA,
148+
)
149+
]
150+
)
151+
152+
events = client.read_events(
153+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
154+
)
155+
df = await events_to_dataframe(events)
156+
157+
assert isinstance(df.iloc[0]["data"], dict)
158+
assert df.iloc[0]["data"] == test_data.JANE_DATA
159+
160+
@staticmethod
161+
@pytest.mark.asyncio
162+
async def test_time_field_is_datetime_object(
163+
database: Database, test_data: TestData
164+
) -> None:
165+
client = database.get_client()
166+
167+
await client.write_events(
168+
[
169+
EventCandidate(
170+
source=test_data.TEST_SOURCE_STRING,
171+
subject=test_data.REGISTERED_SUBJECT,
172+
type=test_data.REGISTERED_TYPE,
173+
data=test_data.JANE_DATA,
174+
)
175+
]
176+
)
177+
178+
events = client.read_events(
179+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
180+
)
181+
df = await events_to_dataframe(events)
182+
183+
assert isinstance(df.iloc[0]["time"], datetime)
184+
185+
@staticmethod
186+
@pytest.mark.asyncio
187+
async def test_optional_fields_can_be_none(
188+
database: Database, test_data: TestData
189+
) -> None:
190+
client = database.get_client()
191+
192+
await client.write_events(
193+
[
194+
EventCandidate(
195+
source=test_data.TEST_SOURCE_STRING,
196+
subject=test_data.REGISTERED_SUBJECT,
197+
type=test_data.REGISTERED_TYPE,
198+
data=test_data.JANE_DATA,
199+
)
200+
]
201+
)
202+
203+
events = client.read_events(
204+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
205+
)
206+
df = await events_to_dataframe(events)
207+
208+
assert df.iloc[0]["trace_parent"] is None or pd.isna(df.iloc[0]["trace_parent"])
209+
assert df.iloc[0]["trace_state"] is None or pd.isna(df.iloc[0]["trace_state"])
210+
assert df.iloc[0]["signature"] is None or pd.isna(df.iloc[0]["signature"])
211+
212+
@staticmethod
213+
@pytest.mark.asyncio
214+
async def test_all_event_fields_are_present(
215+
database: Database, test_data: TestData
216+
) -> None:
217+
client = database.get_client()
218+
219+
await client.write_events(
220+
[
221+
EventCandidate(
222+
source=test_data.TEST_SOURCE_STRING,
223+
subject=test_data.REGISTERED_SUBJECT,
224+
type=test_data.REGISTERED_TYPE,
225+
data=test_data.JANE_DATA,
226+
trace_parent=test_data.TRACE_PARENT_1,
227+
)
228+
]
229+
)
230+
231+
events = client.read_events(
232+
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
233+
)
234+
df = await events_to_dataframe(events)
235+
236+
row = df.iloc[0]
237+
238+
assert isinstance(row["event_id"], str)
239+
assert isinstance(row["time"], datetime)
240+
assert row["source"] == test_data.TEST_SOURCE_STRING
241+
assert row["subject"] == test_data.REGISTERED_SUBJECT
242+
assert row["type"] == test_data.REGISTERED_TYPE
243+
assert row["data"] == test_data.JANE_DATA
244+
assert isinstance(row["spec_version"], str)
245+
assert isinstance(row["data_content_type"], str)
246+
assert isinstance(row["predecessor_hash"], str)
247+
assert isinstance(row["hash"], str)
248+
assert row["trace_parent"] == test_data.TRACE_PARENT_1

0 commit comments

Comments
 (0)