Skip to content

Commit 349b00b

Browse files
authored
example: add custom_source_hn example (#1200)
1 parent 9a0064e commit 349b00b

File tree

5 files changed

+337
-0
lines changed

5 files changed

+337
-0
lines changed

examples/custom_source_hn/.env

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Postgres database address for cocoindex
2+
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
3+
4+
# Fallback to CPU for operations not supported by MPS on Mac.
5+
# It's no-op for other platforms.
6+
PYTORCH_ENABLE_MPS_FALLBACK=1
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Example environment variables for HackerNews Custom Source Example
2+
# Copy this to .env and fill in your actual database connection string
3+
4+
# PostgreSQL database connection URL
5+
# Format: postgresql://username:password@host:port/database
6+
COCOINDEX_DATABASE_URL=postgresql://username:password@localhost:5432/cocoindex_hackernews
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# HackerNews Custom Source Example
2+
3+
[![GitHub](https://img.shields.io/github/stars/cocoindex-io/cocoindex?color=5B5BD6)](https://github.com/cocoindex-io/cocoindex)
4+
5+
In this example, we use [CocoIndex Custom Source](https://cocoindex.io/docs/custom_ops/custom_targets) to define a source to get HackerNews recent content, by calling [HackerNews API](https://hn.algolia.com/api).
6+
We build index for HackerNews threads and their comments, and provides a lightweight query handler to search by keywords.
7+
8+
We appreciate a star ⭐ at [CocoIndex Github](https://github.com/cocoindex-io/cocoindex) if this is helpful.
9+
10+
## Steps
11+
12+
### Indexing Flow
13+
14+
1. We define a custom source connector `HackerNews` to get HackerNews recent threads by calling HackerNews API.
15+
2. We build index for HackerNews threads and their comments.
16+
17+
## Prerequisite
18+
19+
[Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one.
20+
21+
## Run
22+
23+
Install dependencies:
24+
25+
```bash
26+
pip install -e .
27+
```
28+
29+
Update the target:
30+
31+
```bash
32+
cocoindex update --setup main
33+
```
34+
35+
Each time when you run the `update` command, cocoindex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews.
36+
37+
You can also run `update` command in live mode, which will keep the target in sync with the source continuously:
38+
39+
```bash
40+
cocoindex update --setup -L main.py
41+
```
42+
43+
## CocoInsight
44+
45+
I used CocoInsight (Free beta now) to troubleshoot the index generation and understand the data lineage of the pipeline.
46+
It just connects to your local CocoIndex server, with Zero pipeline data retention. Run following command to start CocoInsight:
47+
48+
```
49+
cocoindex server -ci -L main
50+
```
51+
52+
Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight).

examples/custom_source_hn/main.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
"""
2+
HackerNews Custom Source Example
3+
4+
This example demonstrates how to use a custom source with CocoIndex to index
5+
and search HackerNews threads.
6+
"""
7+
8+
import cocoindex
9+
import os
10+
import functools
11+
from psycopg_pool import ConnectionPool
12+
from datetime import timedelta, datetime
13+
from typing import Any, AsyncIterator, NamedTuple
14+
import aiohttp
15+
import dataclasses
16+
17+
from cocoindex.op import (
18+
NON_EXISTENCE,
19+
SourceSpec,
20+
NO_ORDINAL,
21+
source_connector,
22+
PartialSourceRow,
23+
PartialSourceRowData,
24+
)
25+
26+
27+
class _HackerNewsThreadKey(NamedTuple):
28+
"""Row key type for HackerNews source."""
29+
30+
thread_id: str
31+
32+
33+
@dataclasses.dataclass
34+
class _HackerNewsComment:
35+
id: str
36+
author: str | None
37+
text: str | None
38+
created_at: datetime | None
39+
40+
41+
@dataclasses.dataclass
42+
class _HackerNewsThread:
43+
"""Value type for HackerNews source."""
44+
45+
author: str | None
46+
text: str
47+
url: str | None
48+
created_at: datetime | None
49+
comments: list[_HackerNewsComment]
50+
51+
52+
# Define the source spec that users will instantiate
53+
class HackerNewsSource(SourceSpec):
54+
"""Source spec for HackerNews API."""
55+
56+
tag: str | None = None
57+
max_results: int = 100
58+
59+
60+
@source_connector(
61+
spec_cls=HackerNewsSource,
62+
key_type=_HackerNewsThreadKey,
63+
value_type=_HackerNewsThread,
64+
)
65+
class HackerNewsConnector:
66+
"""Custom source connector for HackerNews API."""
67+
68+
_spec: HackerNewsSource
69+
_session: aiohttp.ClientSession
70+
71+
def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession):
72+
self._spec = spec
73+
self._session = session
74+
75+
@staticmethod
76+
async def create(spec: HackerNewsSource) -> "HackerNewsConnector":
77+
"""Create a HackerNews connector from the spec."""
78+
return HackerNewsConnector(spec, aiohttp.ClientSession())
79+
80+
async def _ensure_session(self) -> aiohttp.ClientSession:
81+
"""Ensure we have an active HTTP session."""
82+
if self._session is None or self._session.closed:
83+
self._session = aiohttp.ClientSession()
84+
return self._session
85+
86+
async def list(
87+
self,
88+
) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]:
89+
"""List HackerNews threads using the search API."""
90+
session = await self._ensure_session()
91+
92+
# Use HackerNews search API
93+
search_url = "https://hn.algolia.com/api/v1/search_by_date"
94+
params: dict[str, Any] = {"hitsPerPage": self._spec.max_results}
95+
if self._spec.tag:
96+
params["tags"] = self._spec.tag
97+
async with session.get(search_url, params=params) as response:
98+
response.raise_for_status()
99+
data = await response.json()
100+
for hit in data.get("hits", []):
101+
if thread_id := hit.get("objectID", None):
102+
utime = hit.get("updated_at")
103+
ordinal = (
104+
int(datetime.fromisoformat(utime).timestamp())
105+
if utime
106+
else NO_ORDINAL
107+
)
108+
yield PartialSourceRow(
109+
key=_HackerNewsThreadKey(thread_id=thread_id),
110+
data=PartialSourceRowData(ordinal=ordinal),
111+
)
112+
113+
async def get_value(
114+
self, key: _HackerNewsThreadKey
115+
) -> PartialSourceRowData[_HackerNewsThread]:
116+
"""Get a specific HackerNews thread by ID using the items API."""
117+
session = await self._ensure_session()
118+
119+
# Use HackerNews items API to get full thread with comments
120+
item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}"
121+
122+
async with session.get(item_url) as response:
123+
response.raise_for_status()
124+
data = await response.json()
125+
126+
if not data:
127+
return PartialSourceRowData(
128+
value=NON_EXISTENCE,
129+
ordinal=NO_ORDINAL,
130+
content_version_fp=None,
131+
)
132+
return PartialSourceRowData(
133+
value=HackerNewsConnector._parse_hackernews_thread(data)
134+
)
135+
136+
def provides_ordinal(self) -> bool:
137+
"""Indicate that this source provides ordinal information."""
138+
return True
139+
140+
@staticmethod
141+
def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread:
142+
comments: list[_HackerNewsComment] = []
143+
144+
def _add_comments(parent: dict[str, Any]) -> None:
145+
children = parent.get("children", None)
146+
if not children:
147+
return
148+
for child in children:
149+
ctime = child.get("created_at")
150+
if comment_id := child.get("id", None):
151+
comments.append(
152+
_HackerNewsComment(
153+
id=str(comment_id),
154+
author=child.get("author", ""),
155+
text=child.get("text", ""),
156+
created_at=datetime.fromisoformat(ctime) if ctime else None,
157+
)
158+
)
159+
_add_comments(child)
160+
161+
_add_comments(data)
162+
163+
ctime = data.get("created_at")
164+
text = data.get("title", "")
165+
if more_text := data.get("text", None):
166+
text += "\n\n" + more_text
167+
return _HackerNewsThread(
168+
author=data.get("author"),
169+
text=text,
170+
url=data.get("url"),
171+
created_at=datetime.fromisoformat(ctime) if ctime else None,
172+
comments=comments,
173+
)
174+
175+
176+
@cocoindex.flow_def(name="HackerNewsIndex")
177+
def hackernews_flow(
178+
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
179+
) -> None:
180+
"""
181+
Define a flow that indexes HackerNews threads and their comments.
182+
"""
183+
184+
# Add the custom source to the flow
185+
data_scope["threads"] = flow_builder.add_source(
186+
HackerNewsSource(tag="story", max_results=500),
187+
refresh_interval=timedelta(minutes=1),
188+
)
189+
190+
# Create collectors for different types of searchable content
191+
message_index = data_scope.add_collector()
192+
193+
# Process each thread
194+
with data_scope["threads"].row() as thread:
195+
# Index the main thread content
196+
message_index.collect(
197+
id=thread["thread_id"],
198+
thread_id=thread["thread_id"],
199+
content_type="thread",
200+
author=thread["author"],
201+
text=thread["text"],
202+
url=thread["url"],
203+
created_at=thread["created_at"],
204+
)
205+
206+
# Index individual comments
207+
with thread["comments"].row() as comment:
208+
message_index.collect(
209+
id=comment["id"],
210+
thread_id=thread["thread_id"],
211+
content_type="comment",
212+
author=comment["author"],
213+
text=comment["text"],
214+
url="",
215+
created_at=comment["created_at"],
216+
)
217+
218+
# Export to database tables
219+
message_index.export(
220+
"hn_messages",
221+
cocoindex.targets.Postgres(),
222+
primary_key_fields=["id"],
223+
)
224+
225+
226+
@functools.cache
227+
def connection_pool() -> ConnectionPool:
228+
"""Get a connection pool to the database."""
229+
return ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
230+
231+
232+
@hackernews_flow.query_handler()
233+
def search_text(query: str) -> cocoindex.QueryOutput:
234+
"""Search HackerNews threads by title and content."""
235+
table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages")
236+
237+
with connection_pool().connection() as conn:
238+
with conn.cursor() as cur:
239+
# Simple text search using PostgreSQL's text search capabilities
240+
cur.execute(
241+
f"""
242+
SELECT id, thread_id, author, content_type, text, created_at,
243+
ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank
244+
FROM {table_name}
245+
WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s)
246+
ORDER BY rank DESC, created_at DESC
247+
""",
248+
(query, query),
249+
)
250+
251+
results = []
252+
for row in cur.fetchall():
253+
results.append(
254+
{
255+
"id": row[0],
256+
"thread_id": row[1],
257+
"author": row[2],
258+
"content_type": row[3],
259+
"text": row[4],
260+
"created_at": row[5].isoformat(),
261+
}
262+
)
263+
264+
return cocoindex.QueryOutput(results=results)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[project]
2+
name = "custom-source-hn"
3+
version = "0.1.0"
4+
description = "Example using custom source for HackerNews API integration with cocoindex."
5+
requires-python = ">=3.11"
6+
dependencies = ["cocoindex>=0.2.21", "aiohttp>=3.8.0", "psycopg[binary,pool]"]
7+
8+
[tool.setuptools]
9+
packages = []

0 commit comments

Comments
 (0)