Skip to content

Commit 3ee677c

Browse files
authored
feat: add sampling functionality to fsspec indexers (#189)
* feat: add sampling functionality to fsspec indexers * add e2e test * tidy * release
1 parent 977fc0a commit 3ee677c

File tree

5 files changed

+117
-2
lines changed

5 files changed

+117
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
## 0.1.1-dev1
1+
## 0.1.1
22

33
### Enhancements
44

55
* **Update KDB.AI vectorstore integration to 1.4**
66
* **Add sqlite and postgres source connectors**
7+
* **Add sampling functionality for indexers in fsspec connectors**
78

89
## 0.1.0
910

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from unstructured_ingest.v2.processes.connectors.fsspec.fsspec import (
2+
FsspecIndexer,
3+
)
4+
5+
6+
def test_fsspec_indexer_sampling_happy_path():
7+
8+
indexer = FsspecIndexer(
9+
connection_config="fake_connection_config",
10+
index_config="fake_index_config",
11+
connector_type="fake_connector_type",
12+
)
13+
14+
all_files = [{"name": "fake_file.txt"}, {"name": "fake_file2.txt"}, {"name": "fake_file3.txt"}]
15+
16+
sampled_files = indexer.sample_n_files(all_files, 2)
17+
assert len(sampled_files) == 2
18+
for sampled_file in sampled_files:
19+
assert type(sampled_file) == dict # noqa: E721
20+
assert sampled_file["name"] in [file["name"] for file in all_files]
21+
22+
23+
def test_fsspec_indexer_sampling_no_files():
24+
indexer = FsspecIndexer(
25+
connection_config="fake_connection_config",
26+
index_config="fake_index_config",
27+
connector_type="fake_connector_type",
28+
)
29+
30+
all_files = []
31+
32+
sampled_files = indexer.sample_n_files(all_files, 2)
33+
assert len(sampled_files) == 0
34+
35+
36+
def test_fsspec_indexer_sampling_bigger_sample_size():
37+
indexer = FsspecIndexer(
38+
connection_config="fake_connection_config",
39+
index_config="fake_index_config",
40+
connector_type="fake_connector_type",
41+
)
42+
43+
all_files = [{"name": "fake_file.txt"}, {"name": "fake_file2.txt"}, {"name": "fake_file3.txt"}]
44+
45+
sampled_files = indexer.sample_n_files(all_files, 10)
46+
assert len(sampled_files) == 3
47+
for sampled_file in sampled_files:
48+
assert type(sampled_file) == dict # noqa: E721
49+
assert sampled_file["name"] in [file["name"] for file in all_files]

test_e2e/src/s3-sample.sh

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
SRC_PATH=$(dirname "$(realpath "$0")")
6+
SCRIPT_DIR=$(dirname "$SRC_PATH")
7+
cd "$SCRIPT_DIR"/.. || exit 1
8+
OUTPUT_FOLDER_NAME=s3-sample
9+
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
10+
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
11+
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
12+
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
13+
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
14+
15+
# shellcheck disable=SC1091
16+
source "$SCRIPT_DIR"/cleanup.sh
17+
# shellcheck disable=SC2317
18+
function cleanup() {
19+
cleanup_dir "$OUTPUT_DIR"
20+
cleanup_dir "$WORK_DIR"
21+
}
22+
trap cleanup EXIT
23+
24+
RUN_SCRIPT=${RUN_SCRIPT:-./unstructured_ingest/main.py}
25+
PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \
26+
s3 \
27+
--api-key "$UNS_PAID_API_KEY" \
28+
--partition-by-api \
29+
--partition-endpoint "https://api.unstructuredapp.io" \
30+
--num-processes "$max_processes" \
31+
--download-dir "$DOWNLOAD_DIR" \
32+
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \
33+
--strategy fast \
34+
--preserve-downloads \
35+
--reprocess \
36+
--output-dir "$OUTPUT_DIR" \
37+
--verbose \
38+
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
39+
--anonymous \
40+
--work-dir "$WORK_DIR" \
41+
--sample-n-files 3
42+
43+
NUM_FILES=$(find "$OUTPUT_DIR" -type f | wc -l)
44+
EXPECTED_NUM_FILES=3
45+
46+
if [ "$NUM_FILES" -ne "$EXPECTED_NUM_FILES" ]; then
47+
exit 1
48+
fi

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.1.1-dev1" # pragma: no cover
1+
__version__ = "0.1.1" # pragma: no cover

unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import random
34
from dataclasses import dataclass, field
45
from pathlib import Path
56
from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar
@@ -63,6 +64,7 @@ def __init__(self, **data):
6364

6465
class FsspecIndexerConfig(FileConfig, IndexerConfig):
6566
recursive: bool = False
67+
sample_n_files: Optional[int] = None
6668

6769

6870
class FsspecAccessConfig(AccessConfig):
@@ -128,8 +130,23 @@ def get_file_data(self) -> list[dict[str, Any]]:
128130
filtered_files = [
129131
file for file in files if file.get("size") > 0 and file.get("type") == "file"
130132
]
133+
134+
if self.index_config.sample_n_files:
135+
filtered_files = self.sample_n_files(filtered_files, self.index_config.sample_n_files)
136+
131137
return filtered_files
132138

139+
def sample_n_files(self, files: list[dict[str, Any]], n) -> list[dict[str, Any]]:
140+
if len(files) <= n:
141+
logger.warning(
142+
f"number of files to be sampled={n} is not smaller than the number"
143+
f" of files found ({len(files)}). Returning all of the files as the"
144+
" sample."
145+
)
146+
return files
147+
148+
return random.sample(files, n)
149+
133150
def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
134151
raise NotImplementedError()
135152

0 commit comments

Comments
 (0)