Skip to content

Commit 8ebc28f

Browse files
authored
Feat: Add support for rendering records as LLM documents (#48)
1 parent 1964a8b commit 8ebc28f

File tree

9 files changed

+284
-163
lines changed

9 files changed

+284
-163
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Methods for converting Airbyte records into documents."""
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING, Any
6+
7+
import yaml
8+
from pydantic import BaseModel
9+
10+
from airbyte.documents import Document
11+
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Iterable
15+
16+
17+
def _to_title_case(name: str, /) -> str:
18+
"""Convert a string to title case.
19+
20+
Unlike Python's built-in `str.title` method, this function doesn't lowercase the rest of the
21+
string. This is useful for converting "snake_case" to "Title Case" without negatively affecting
22+
strings that are already in title case or camel case.
23+
"""
24+
return " ".join(word[0].upper() + word[1:] for word in name.split("_"))
25+
26+
27+
class CustomRenderingInstructions(BaseModel):
28+
"""Instructions for rendering a stream's records as documents."""
29+
30+
title_property: str | None
31+
content_properties: list[str]
32+
frontmatter_properties: list[str]
33+
metadata_properties: list[str]
34+
35+
36+
class DocumentRenderer(BaseModel):
37+
"""Instructions for rendering a stream's records as documents."""
38+
39+
title_property: str | None
40+
content_properties: list[str] | None
41+
metadata_properties: list[str] | None
42+
render_metadata: bool = False
43+
44+
# TODO: Add primary key and cursor key support:
45+
# primary_key_properties: list[str]
46+
# cursor_property: str | None
47+
48+
def render_document(self, record: dict[str, Any]) -> Document:
49+
"""Render a record as a document.
50+
51+
The document will be rendered as a markdown document, with content, frontmatter, and an
52+
optional title. If there are multiple properties to render as content, they will be rendered
53+
beneath H2 section headers. If there is only one property to render as content, it will be
54+
rendered without a section header. If a title property is specified, it will be rendered as
55+
an H1 header at the top of the document.
56+
57+
Returns:
58+
A tuple of (content: str, metadata: dict).
59+
"""
60+
content = ""
61+
if not self.metadata_properties:
62+
self.metadata_properties = [
63+
key
64+
for key in record
65+
if key not in (self.content_properties or []) and key != self.title_property
66+
]
67+
if self.title_property:
68+
content += f"# {record[self.title_property]}\n\n"
69+
if self.render_metadata or not self.content_properties:
70+
content += "```yaml\n"
71+
content += yaml.dump({key: record[key] for key in self.metadata_properties})
72+
content += "```\n"
73+
74+
# TODO: Add support for primary key and doc ID generation:
75+
# doc_id: str = (
76+
# "-".join(str(record[key]) for key in self.primary_key_properties)
77+
# if self.primary_key_properties
78+
# else str(hash(record))
79+
# )
80+
81+
if not self.content_properties:
82+
pass
83+
elif len(self.content_properties) == 1:
84+
# Only one property to render as content; no need for section headers.
85+
content += str(record[self.content_properties[0]])
86+
else:
87+
# Multiple properties to render as content; use H2 section headers.
88+
content += "\n".join(
89+
f"## {_to_title_case(key)}\n\n{record[key]}\n\n" for key in self.content_properties
90+
)
91+
92+
return Document(
93+
# id=doc_id, # TODD: Add support for primary key and doc ID generation.
94+
content=content,
95+
metadata={key: record[key] for key in self.metadata_properties},
96+
)
97+
98+
def render_documents(self, records: Iterable[dict[str, Any]]) -> Iterable[Document]:
99+
"""Render an iterable of records as documents."""
100+
yield from (self.render_document(record=record) for record in records)

airbyte/datasets/_base.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,28 @@
22
from __future__ import annotations
33

44
from abc import ABC, abstractmethod
5-
from collections.abc import Iterator, Mapping
6-
from typing import Any, cast
5+
from collections.abc import Iterable, Iterator
6+
from typing import TYPE_CHECKING, Any, cast
77

88
from pandas import DataFrame
99

10+
from airbyte._util.document_rendering import DocumentRenderer
11+
12+
13+
if TYPE_CHECKING:
14+
from airbyte_protocol.models import ConfiguredAirbyteStream
15+
16+
from airbyte.documents import Document
17+
1018

1119
class DatasetBase(ABC):
1220
"""Base implementation for all datasets."""
1321

22+
def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
23+
self._stream_metadata = stream_metadata
24+
1425
@abstractmethod
15-
def __iter__(self) -> Iterator[Mapping[str, Any]]:
26+
def __iter__(self) -> Iterator[dict[str, Any]]:
1627
"""Return the iterator of records."""
1728
raise NotImplementedError
1829

@@ -25,3 +36,27 @@ def to_pandas(self) -> DataFrame:
2536
# expects an iterator of dict objects. This cast is safe because we know
2637
# duck typing is correct for this use case.
2738
return DataFrame(cast(Iterator[dict[str, Any]], self))
39+
40+
def to_documents(
41+
self,
42+
title_property: str | None = None,
43+
content_properties: list[str] | None = None,
44+
metadata_properties: list[str] | None = None,
45+
*,
46+
render_metadata: bool = False,
47+
) -> Iterable[Document]:
48+
"""Return the iterator of documents.
49+
50+
If metadata_properties is not set, all properties that are not content will be added to
51+
the metadata.
52+
53+
If render_metadata is True, metadata will be rendered in the document, as well as the
54+
the main content. Otherwise, metadata will be attached to the document but not rendered.
55+
"""
56+
renderer = DocumentRenderer(
57+
title_property=title_property,
58+
content_properties=content_properties,
59+
metadata_properties=metadata_properties,
60+
render_metadata=render_metadata,
61+
)
62+
yield from renderer.render_documents(self)

airbyte/datasets/_lazy.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,24 @@
1111
if TYPE_CHECKING:
1212
from collections.abc import Iterator, Mapping
1313

14+
from airbyte_protocol.models import ConfiguredAirbyteStream
15+
1416

1517
class LazyDataset(DatasetBase):
1618
"""A dataset that is loaded incrementally from a source or a SQL query."""
1719

1820
def __init__(
1921
self,
20-
iterator: Iterator[Mapping[str, Any]],
22+
iterator: Iterator[dict[str, Any]],
23+
stream_metadata: ConfiguredAirbyteStream,
2124
) -> None:
22-
self._iterator: Iterator[Mapping[str, Any]] = iterator
23-
super().__init__()
25+
self._iterator: Iterator[dict[str, Any]] = iterator
26+
super().__init__(
27+
stream_metadata=stream_metadata,
28+
)
2429

2530
@overrides
26-
def __iter__(self) -> Iterator[Mapping[str, Any]]:
31+
def __iter__(self) -> Iterator[dict[str, Any]]:
2732
return self._iterator
2833

2934
def __next__(self) -> Mapping[str, Any]:

airbyte/datasets/_sql.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from __future__ import annotations
55

6-
from collections.abc import Mapping
76
from typing import TYPE_CHECKING, Any, cast
87

98
from overrides import overrides
@@ -39,18 +38,22 @@ def __init__(
3938
self._cache: CacheBase = cache
4039
self._stream_name: str = stream_name
4140
self._query_statement: Selectable = query_statement
42-
super().__init__()
41+
super().__init__(
42+
stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it.
43+
stream_name=stream_name
44+
),
45+
)
4346

4447
@property
4548
def stream_name(self) -> str:
4649
return self._stream_name
4750

48-
def __iter__(self) -> Iterator[Mapping[str, Any]]:
51+
def __iter__(self) -> Iterator[dict[str, Any]]:
4952
with self._cache.processor.get_sql_connection() as conn:
5053
for row in conn.execute(self._query_statement):
5154
# Access to private member required because SQLAlchemy doesn't expose a public API.
5255
# https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html
53-
yield cast(Mapping[str, Any], row._mapping) # noqa: SLF001
56+
yield cast(dict[str, Any], row._mapping) # noqa: SLF001
5457

5558
def __len__(self) -> int:
5659
"""Return the number of records in the dataset.
@@ -100,7 +103,11 @@ class CachedDataset(SQLDataset):
100103
underlying table as a SQLAlchemy Table object.
101104
"""
102105

103-
def __init__(self, cache: CacheBase, stream_name: str) -> None:
106+
def __init__(
107+
self,
108+
cache: CacheBase,
109+
stream_name: str,
110+
) -> None:
104111
"""We construct the query statement by selecting all columns from the table.
105112
106113
This prevents the need to scan the table schema to construct the query statement.

airbyte/documents.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Methods for converting Airbyte records into documents."""
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING, Any
6+
7+
from pydantic import BaseModel
8+
9+
10+
if TYPE_CHECKING:
11+
import datetime
12+
13+
14+
MAX_SINGLE_LINE_LENGTH = 60
15+
AIRBYTE_DOCUMENT_RENDERING = "airbyte_document_rendering"
16+
TITLE_PROPERTY = "title_property"
17+
CONTENT_PROPS = "content_properties"
18+
METADATA_PROPERTIES = "metadata_properties"
19+
20+
21+
class Document(BaseModel):
22+
"""A PyAirbyte document is a specific projection on top of a record.
23+
24+
Documents have the following structure:
25+
- id (str): A unique string identifier for the document.
26+
- content (str): A string representing the record when rendered as a document.
27+
- metadata (dict[str, Any]): Associated metadata about the document, such as the record's IDs
28+
and/or URLs.
29+
30+
This class is duck-typed to be compatible with LangChain project's `Document` class.
31+
"""
32+
33+
id: str | None = None
34+
content: str
35+
metadata: dict[str, Any]
36+
last_modified: datetime.datetime | None = None
37+
38+
def __str__(self) -> str:
39+
return self.content
40+
41+
@property
42+
def page_content(self) -> str:
43+
"""Return the content of the document.
44+
45+
This is an alias for the `content` property, and is provided for duck-type compatibility
46+
with the LangChain project's `Document` class.
47+
"""
48+
return self.content

airbyte/source.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
from airbyte._executor import Executor
4949
from airbyte.caches import CacheBase
50+
from airbyte.documents import Document
5051

5152

5253
@contextmanager
@@ -351,7 +352,34 @@ def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[st
351352
),
352353
)
353354
)
354-
return LazyDataset(iterator)
355+
return LazyDataset(
356+
iterator,
357+
stream_metadata=configured_stream,
358+
)
359+
360+
def get_documents(
361+
self,
362+
stream: str,
363+
title_property: str | None = None,
364+
content_properties: list[str] | None = None,
365+
metadata_properties: list[str] | None = None,
366+
*,
367+
render_metadata: bool = False,
368+
) -> Iterable[Document]:
369+
"""Read a stream from the connector and return the records as documents.
370+
371+
If metadata_properties is not set, all properties that are not content will be added to
372+
the metadata.
373+
374+
If render_metadata is True, metadata will be rendered in the document, as well as the
375+
the main content.
376+
"""
377+
return self.get_records(stream).to_documents(
378+
title_property=title_property,
379+
content_properties=content_properties,
380+
metadata_properties=metadata_properties,
381+
render_metadata=render_metadata,
382+
)
355383

356384
def check(self) -> None:
357385
"""Call check on the connector.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""This examples script demonstrates how to render documents from a source."""
3+
from __future__ import annotations
4+
5+
import rich
6+
7+
import airbyte as ab
8+
9+
10+
def main() -> None:
11+
read_result = ab.get_source(
12+
"source-github",
13+
config={
14+
"repositories": ["airbytehq/quickstarts"],
15+
"credentials": {"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN")},
16+
},
17+
streams=["issues"],
18+
).read()
19+
20+
for doc in read_result["issues"].to_documents(
21+
title_property="title",
22+
content_properties=["body"],
23+
metadata_properties=["state", "url", "number"],
24+
# primary_key_properties=["id"],
25+
# cursor_property="updated_at",
26+
render_metadata=True,
27+
):
28+
rich.print(rich.markdown.Markdown(str(doc) + "\n\n" + str("-" * 40)))
29+
30+
31+
if __name__ == "__main__":
32+
main()

0 commit comments

Comments
 (0)