Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions apps/integration/poetry.lock

Large diffs are not rendered by default.

329 changes: 183 additions & 146 deletions apps/jupyter/poetry.lock

Large diffs are not rendered by default.

329 changes: 183 additions & 146 deletions apps/remote-processor-service/poetry.lock

Large diffs are not rendered by default.

66 changes: 33 additions & 33 deletions lib/poetry-lock/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/poetry-lock/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fsspec = "2024.2.0" # sycamore (via datasets)
# the following will fail which prevents upgrading pytest to 8.1.1
# poetry run pytest lib/sycamore/sycamore/tests/unit/scans/test_materialized_scan.py lib/sycamore/sycamore/tests/unit/test_docset.py
pytest = "7.4.0"
ray = "^2.52.1"
ray = "^2.54.0"
pandas = "2.3.0" # Bumped to a version that supports 3.13.
scipy = "^1.15.0"
weaviate-client = "^4.16.10"
Expand Down
329 changes: 183 additions & 146 deletions lib/remote-processors/poetry.lock

Large diffs are not rendered by default.

339 changes: 180 additions & 159 deletions lib/sycamore/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/sycamore/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ python = ">=3.10,<3.14"

pandas = "^2.3.0"
pdf2image = "^1.16.3"
ray = { extras = ["default"], version = "^2.52.1" }
ray = { extras = ["default"], version = "^2.54.0" }

pyarrow = "^20.0.0"
numpy = ">=2.2.0"
Expand Down
6 changes: 3 additions & 3 deletions lib/sycamore/sycamore/connectors/file/file_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import boto3
import mimetypes
from typing import Any, Optional, Union, Tuple, Callable, TYPE_CHECKING, cast
from typing import Any, Literal, Optional, Union, Tuple, Callable, TYPE_CHECKING, cast
import logging

from functools import partial
Expand Down Expand Up @@ -228,12 +228,12 @@ def execute(self, **kwargs) -> "Dataset":

# TODO: Consider refactoring to use kwargs = self._get_read_args() for better extensibility
# when adding new read arguments in the future
partition_filter: Optional[Callable[[dict[str, str]], bool]] = None
partition_filter: Optional[PathPartitionFilter] = None
if self._path_filter is not None:
partition_filter = PathPartitionFilter(
cast(PathPartitionParser, RayPathParser()), partial(self._path_filter, read_binary=True)
)
shuffle = None if partition_filter is None else "files"
shuffle: Optional[Literal["files"]] = None if partition_filter is None else "files"

try:
files = read_binary_files(
Expand Down
7 changes: 5 additions & 2 deletions lib/sycamore/sycamore/connectors/file/file_writer_ray.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Iterable, Optional
from typing import Any, Callable, Iterable, Optional, TYPE_CHECKING

import posixpath
from pyarrow.fs import FileSystem, FileType
Expand All @@ -13,6 +13,9 @@
from sycamore.data import Document, MetadataDocument
from sycamore.utils.time_trace import TimeTrace

if TYPE_CHECKING:
import pyarrow as pa


class _FileDataSink(Datasink):
def __init__(
Expand All @@ -33,7 +36,7 @@ def __init__(
self._makedirs = makedirs
self._include_metadata = include_metadata

def on_write_start(self) -> None:
def on_write_start(self, pyarrow_schema: Optional["pa.Schema"] = None) -> None:
if not self._makedirs:
return

Expand Down
15 changes: 13 additions & 2 deletions lib/sycamore/sycamore/connectors/iceberg/iceberg_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,19 @@ def f(batch: "pa.Table") -> "pa.Table":
def execute(self, **kwargs):
_ = self._get_table() # Creates the table if it does not exist.
dataset = self.child().execute(**kwargs)
dataset.map_batches(self._to_property_dict(), batch_format="pyarrow").write_iceberg(
self._table_id, catalog_kwargs=self._catalog_kwargs, **kwargs

# There seems to be an issue with how Ray does iceberg schema
# evolution in recent versions. When map_batches and the write are
# fused, I get a combined schema that has the schema from the
# docs_to_pyarrow table (correct) + a docs column, which is null. This
# apparently comes from the schema before the map_batches and is there
# because of the evolution code. The "fix" here sets different
# resource requirements (num_cpus) so that the two operators won't be
# fused. Eventually it would be preferable if there were a way to
# disable schema evolution, but I don't see one currently.
batches = dataset.map_batches(self._to_property_dict(), batch_format="pyarrow", num_cpus=0.1)
batches.write_iceberg(
self._table_id, catalog_kwargs=self._catalog_kwargs, **kwargs, ray_remote_args={"num_cpus": 0.2}
)
return dataset

Expand Down
11 changes: 7 additions & 4 deletions lib/sycamore/sycamore/grouped_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Callable
from typing import Any, Callable, cast, TYPE_CHECKING

from sycamore import DocSet
from sycamore.data import Document, MetadataDocument
Expand All @@ -9,11 +9,14 @@
from ray.data import Dataset


def filter_meta(row):
def filter_meta(row: dict[str, bytes]) -> bool:
doc = Document.from_row(row)
return not isinstance(doc, MetadataDocument)


_cast_from_row = cast(Callable[[dict[str, Any]], dict[str, Any]], Document.from_row)


class AggregateCount(NonCPUUser, NonGPUUser, Transform):
def __init__(self, child: Node, key: str):
super().__init__(child)
Expand All @@ -40,7 +43,7 @@ def to_doc(self, row: dict):

def execute(self, **kwargs) -> "Dataset":
dataset = self.child().execute()
grouped = dataset.filter(filter_meta).map(Document.from_row).groupby(self._grouped_key)
grouped = dataset.filter(filter_meta).map(_cast_from_row).groupby(self._grouped_key)
aggregated = grouped.map_groups(self.group_udf)
serialized = aggregated.map(self.to_doc)
return serialized
Expand Down Expand Up @@ -82,7 +85,7 @@ def to_doc(self, row: dict):

def execute(self, **kwargs) -> "Dataset":
dataset = self.child().execute()
grouped = dataset.filter(filter_meta).map(Document.from_row).groupby(self._grouped_key)
grouped = dataset.filter(filter_meta).map(_cast_from_row).groupby(self._grouped_key)
aggregated = grouped.map_groups(self.group_udf)
serialized = aggregated.map(self.to_doc)
return serialized
Expand Down
8 changes: 5 additions & 3 deletions lib/sycamore/sycamore/materialize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from pathlib import Path
from typing import Any, Optional, Tuple, Union, TYPE_CHECKING, cast
from typing import Any, Literal, Optional, Tuple, Union, TYPE_CHECKING, cast
import inspect

from sycamore.context import Context
Expand Down Expand Up @@ -424,11 +424,13 @@ def execute(self, **kwargs) -> "Dataset":
partition_filter = PathPartitionFilter(
cast(PathPartitionParser, RayPathParser()), self._path_filter
)
shuffle = None if partition_filter is None else "files"
shuffle: Optional[Literal["files"]] = None if partition_filter is None else "files"

try:
assert self._root is not None

files = read_binary_files(
self._root,
str(self._root),
filesystem=self._fs,
file_extensions=["pickle"],
partition_filter=partition_filter,
Expand Down
Loading
Loading