Skip to content

Commit d34bcda

Browse files
feat: implement AI ingestion pipeline
1 parent 7c7f529 commit d34bcda

File tree

6 files changed

+561
-5
lines changed

6 files changed

+561
-5
lines changed

posthog/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ def __init__(
262262

263263
self.project_root = project_root
264264

265+
self.use_ai_ingestion_pipeline = os.environ.get(
266+
"LLMA_INGESTION_PIPELINE", "false"
267+
).lower() in ("true", "1", "yes")
268+
265269
# personal_api_key: This should be a generated Personal API Key, private
266270
self.personal_api_key = personal_api_key
267271
if debug:
@@ -309,6 +313,7 @@ def __init__(
309313
retries=max_retries,
310314
timeout=timeout,
311315
historical_migration=historical_migration,
316+
use_ai_ingestion_pipeline=self.use_ai_ingestion_pipeline,
312317
)
313318
self.consumers.append(consumer)
314319

posthog/consumer.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
retries=10,
3838
timeout=15,
3939
historical_migration=False,
40+
use_ai_ingestion_pipeline=False,
4041
):
4142
"""Create a consumer thread."""
4243
Thread.__init__(self)
@@ -57,6 +58,7 @@ def __init__(
5758
self.retries = retries
5859
self.timeout = timeout
5960
self.historical_migration = historical_migration
61+
self.use_ai_ingestion_pipeline = use_ai_ingestion_pipeline
6062

6163
def run(self):
6264
"""Runs the consumer."""
@@ -136,17 +138,82 @@ def fatal_exception(exc):
136138
# retry on all other errors (eg. network)
137139
return False
138140

141+
if self.use_ai_ingestion_pipeline:
142+
ai_events = []
143+
non_ai_events = []
144+
145+
for item in batch:
146+
event_name = item.get("event", "")
147+
if event_name.startswith("$ai_"):
148+
ai_events.append(item)
149+
else:
150+
non_ai_events.append(item)
151+
152+
for ai_event in ai_events:
153+
self._send_ai_event(ai_event, fatal_exception)
154+
155+
if non_ai_events:
156+
157+
@backoff.on_exception(
158+
backoff.expo,
159+
Exception,
160+
max_tries=self.retries + 1,
161+
giveup=fatal_exception,
162+
)
163+
def send_batch_request():
164+
batch_post(
165+
self.api_key,
166+
self.host,
167+
gzip=self.gzip,
168+
timeout=self.timeout,
169+
batch=non_ai_events,
170+
historical_migration=self.historical_migration,
171+
)
172+
173+
send_batch_request()
174+
else:
175+
@backoff.on_exception(
176+
backoff.expo,
177+
Exception,
178+
max_tries=self.retries + 1,
179+
giveup=fatal_exception,
180+
)
181+
def send_request():
182+
batch_post(
183+
self.api_key,
184+
self.host,
185+
gzip=self.gzip,
186+
timeout=self.timeout,
187+
batch=batch,
188+
historical_migration=self.historical_migration,
189+
)
190+
191+
send_request()
192+
193+
def _send_ai_event(self, event, fatal_exception):
194+
"""Send a single AI event to the /i/v0/ai endpoint"""
195+
from posthog.request import ai_post
196+
from posthog.utils import extract_ai_blob_properties
197+
198+
# Extract blob properties from the event
199+
properties = event.get("properties", {})
200+
cleaned_properties, blobs = extract_ai_blob_properties(properties)
201+
139202
@backoff.on_exception(
140203
backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception
141204
)
142-
def send_request():
143-
batch_post(
205+
def send_ai_request():
206+
ai_post(
144207
self.api_key,
145208
self.host,
146209
gzip=self.gzip,
147210
timeout=self.timeout,
148-
batch=batch,
149-
historical_migration=self.historical_migration,
211+
event_name=event.get("event"),
212+
distinct_id=event.get("distinct_id"),
213+
properties=cleaned_properties,
214+
blobs=blobs,
215+
timestamp=event.get("timestamp"),
216+
uuid=event.get("uuid"),
150217
)
151218

152-
send_request()
219+
send_ai_request()

posthog/request.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import json
22
import logging
3+
import secrets
34
from datetime import date, datetime
45
from gzip import GzipFile
56
from io import BytesIO
67
from typing import Any, Optional, Union
8+
from uuid import uuid4
79

810
import requests
911
from dateutil.tz import tzutc
@@ -193,3 +195,166 @@ def default(self, obj: Any):
193195
return obj.isoformat()
194196

195197
return json.JSONEncoder.default(self, obj)
198+
199+
200+
def build_ai_multipart_request(
201+
event_name: str,
202+
distinct_id: str,
203+
properties: dict[str, Any],
204+
blobs: dict[str, Any],
205+
timestamp: Optional[str] = None,
206+
event_uuid: Optional[str] = None,
207+
) -> tuple[bytes, str]:
208+
"""
209+
Build a multipart/form-data request body for AI events.
210+
211+
Args:
212+
event_name: The event name (e.g., "$ai_generation")
213+
distinct_id: The distinct ID for the event
214+
properties: Event properties (without blob properties)
215+
blobs: Dictionary of blob properties to include as separate parts
216+
timestamp: Optional timestamp for the event
217+
event_uuid: Optional UUID for the event
218+
219+
Returns:
220+
Tuple of (body_bytes, boundary) for the multipart request
221+
222+
Format follows the /i/v0/ai endpoint spec:
223+
Part 1: "event" - JSON with {uuid, event, distinct_id, timestamp}
224+
Part 2: "event.properties" - JSON with non-blob properties
225+
Part 3+: "event.properties.$ai_input" etc. - Blob data as JSON
226+
"""
227+
# Generate a random boundary that's unlikely to appear in the data
228+
boundary = "----WebKitFormBoundary" + secrets.token_hex(16)
229+
230+
# Ensure we have a UUID
231+
if event_uuid is None:
232+
event_uuid = str(uuid4())
233+
234+
# Build the event part
235+
event_data = {
236+
"uuid": event_uuid,
237+
"event": event_name,
238+
"distinct_id": distinct_id,
239+
}
240+
if timestamp is not None:
241+
event_data["timestamp"] = timestamp
242+
243+
# Build multipart body
244+
parts = []
245+
246+
# Part 1: event
247+
parts.append(f"--{boundary}\r\n".encode())
248+
parts.append(b'Content-Disposition: form-data; name="event"\r\n')
249+
parts.append(b"Content-Type: application/json\r\n\r\n")
250+
parts.append(json.dumps(event_data, cls=DatetimeSerializer).encode("utf-8"))
251+
parts.append(b"\r\n")
252+
253+
# Part 2: event.properties
254+
parts.append(f"--{boundary}\r\n".encode())
255+
parts.append(b'Content-Disposition: form-data; name="event.properties"\r\n')
256+
parts.append(b"Content-Type: application/json\r\n\r\n")
257+
parts.append(json.dumps(properties, cls=DatetimeSerializer).encode("utf-8"))
258+
parts.append(b"\r\n")
259+
260+
# Part 3+: blob parts
261+
for blob_name, blob_value in blobs.items():
262+
parts.append(f"--{boundary}\r\n".encode())
263+
parts.append(
264+
f'Content-Disposition: form-data; name="event.properties.{blob_name}"\r\n'.encode()
265+
)
266+
parts.append(b"Content-Type: application/json\r\n\r\n")
267+
parts.append(json.dumps(blob_value, cls=DatetimeSerializer).encode("utf-8"))
268+
parts.append(b"\r\n")
269+
270+
# Final boundary
271+
parts.append(f"--{boundary}--\r\n".encode())
272+
273+
# Combine all parts
274+
body = b"".join(parts)
275+
276+
return body, boundary
277+
278+
279+
def ai_post(
280+
api_key: str,
281+
host: Optional[str] = None,
282+
gzip: bool = False,
283+
timeout: int = 15,
284+
**kwargs,
285+
) -> requests.Response:
286+
"""
287+
Post an AI event to the /i/v0/ai endpoint using multipart/form-data.
288+
289+
Args:
290+
api_key: The PostHog API key
291+
host: The host to post to
292+
gzip: Whether to gzip compress the request
293+
timeout: Request timeout in seconds
294+
**kwargs: Event parameters including event_name, distinct_id, properties, blobs, etc.
295+
296+
Returns:
297+
The response from the server
298+
299+
Raises:
300+
APIError: If the request fails
301+
"""
302+
log = logging.getLogger("posthog")
303+
304+
# Extract event parameters
305+
event_name = kwargs.get("event_name")
306+
distinct_id = kwargs.get("distinct_id")
307+
properties = kwargs.get("properties", {})
308+
blobs = kwargs.get("blobs", {})
309+
timestamp = kwargs.get("timestamp")
310+
event_uuid = kwargs.get("uuid")
311+
312+
# Build multipart request
313+
body, boundary = build_ai_multipart_request(
314+
event_name=event_name,
315+
distinct_id=distinct_id,
316+
properties=properties,
317+
blobs=blobs,
318+
timestamp=timestamp,
319+
event_uuid=event_uuid,
320+
)
321+
322+
# Optionally gzip compress the body if enabled and body is large enough
323+
# Spec recommends compression for requests > 10KB
324+
data = body
325+
headers = {
326+
"Content-Type": f"multipart/form-data; boundary={boundary}",
327+
"Authorization": f"Bearer {api_key}",
328+
"User-Agent": USER_AGENT,
329+
}
330+
331+
if gzip or len(body) > 10 * 1024: # Compress if gzip enabled or body > 10KB
332+
headers["Content-Encoding"] = "gzip"
333+
buf = BytesIO()
334+
with GzipFile(fileobj=buf, mode="w") as gz:
335+
gz.write(body)
336+
data = buf.getvalue()
337+
log.debug("Compressed AI event from %d bytes to %d bytes", len(body), len(data))
338+
339+
url = remove_trailing_slash(host or DEFAULT_HOST) + "/i/v0/ai"
340+
log.debug("Posting AI event to %s", url)
341+
log.debug(
342+
"Event: %s, Distinct ID: %s, Blobs: %s",
343+
event_name,
344+
distinct_id,
345+
list(blobs.keys()),
346+
)
347+
348+
res = _session.post(url, data=data, headers=headers, timeout=timeout)
349+
350+
if res.status_code == 200:
351+
log.debug("AI event uploaded successfully")
352+
return res
353+
354+
# Handle errors
355+
try:
356+
payload = res.json()
357+
log.debug("Received error response: %s", payload)
358+
raise APIError(res.status_code, payload.get("detail", "Unknown error"))
359+
except (KeyError, ValueError):
360+
raise APIError(res.status_code, res.text)

0 commit comments

Comments
 (0)