-
Notifications
You must be signed in to change notification settings - Fork 51
feat: support IndexRange and PartitionBlock seed selection strategy #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
| read_query = f""" | ||
| SELECT * EXCLUDE (row_num) FROM ( | ||
| SELECT *, row_number() OVER () as row_num | ||
| FROM '{self._dataset_uri}' | ||
| ) sub | ||
| WHERE row_num > {self._index_range.start} AND row_num <= {self._index_range.end + 1} | ||
| {shuffle_query} | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the sampling strategy is "shuffle", does this shuffle the entire dataset before indexing, or does it shuffle the indexed array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it shuffles the indexed array because the where clause is most likely applied to filter first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that what we want? Seems like "shuffle" should mean "shuffle the entire dataset" so that all records are equally likely to be present in a given indexed array. I'm not sure it actually matters too much, though, since all records are treated independently – assuming the entire seed dataset is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://duckdb.org/docs/stable/sql/query_syntax/orderby
It's applied after the where clause filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be weird to shuffle first and then give exact indices ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's think about the user's desired outcomes:
- Shuffled, no repeat: "Just give me one entry per row, I don't care about anything else"
- Shuffled, repeat: "I just want a bunch of random outcomes, I'm using my inputs as another sampler."
- Ordered, no repeat: "I NEED a one-to-one, in order correspondence with this data."
- Ordered, repeat: "I want multiple epochs/passes over this data, and the same number of them per sample."
How this relates to PartitionBlock:
- Shuffled, no repeat: Split into partitions, randomize order within partition, no problem.
- Shuffled, repeat: Depends on how exactly we want to preserve statistics. True stat would be shuffle and then split. If the partitions were exactly the same size, then it would be okay, but that is not true in general (esp. for the last Partition which might have some remainder size). However, there is a fix -- we can change the number of samples produced from each
PartitionBlockaccording to the proportion of the size of the Block with respect to the full dataset. That should converge us to the same distribution of the original shuffle. - Ordered, no repeat: Split into partitions, create a record for each row, no problem.
- Ordered, repeat: Split into partitions, create N records for each row, no problem.
I think that doing the proportional sampling per PartitionBlock post-partitioning would be desirable since it parallelizes easily vs needing to do an op on the full dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exact partition that is chosen is hidden from the user.
We resolve to indices automatically for the user, but it's in order based on how they define partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity, we chatted on Slack about this. To solve for a weird distribution getting stuck in the the last partition ( the case of shuffle), we'd need to pre-shuffle the whole dataset. It is an expensive operation.
I ran a test to see how long it would take for duckdb to shuffle a really large dataset. As an example, we took the web step dataset that's about 164 GB.
TL;DR
duckdb took about a half hour to shuffle the entire dataset and write to 1000 partitions in a different destination:
srun Execution
srun --account=llmservice_sdg_research --partition=cpu --nodes=1 --ntasks=1 --cpus-per-task=8 --mem=128G --time=2:00:00 bash -c "source /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/.venv/bin/activate && python shuffle.py"
Output
Starting shuffle and write...
Configuring DuckDB settings...
Reading and shuffling data...
Source: /lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet
Destination: /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data
Initial partitioning complete.
Shuffle and write completed in 1733.94 seconds
Output written to: /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data
shuffle.py
import duckdb
import time
start = time.perf_counter()
print("Starting shuffle and write...", flush=True)
# Connect to DuckDB
conn = duckdb.connect()
# Configure DuckDB for memory-efficient operation
# Conservative memory settings to avoid OOM
print("Configuring DuckDB settings...", flush=True)
conn.execute("SET memory_limit='110GB'") # Leave headroom for system
conn.execute("SET threads=4") # Reduce threads to save memory
conn.execute("SET temp_directory='/tmp/duckdb_temp'")
conn.execute("SET preserve_insertion_order=false")
# Enable disk-based operations for large datasets
conn.execute("SET max_memory='110GB'")
# Source and destination paths
data_path = "/lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet"
output_path = "/lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data"
print("Reading and shuffling data...", flush=True)
print(f"Source: {data_path}", flush=True)
print(f"Destination: {output_path}", flush=True)
# Memory-efficient shuffle: just add random partition key, no ORDER BY
# The random partitioning itself provides the shuffle effect
query = f"""
COPY (
SELECT *, (random() * 1000)::INTEGER as _partition_key
FROM '{data_path}'
) TO '{output_path}' (
FORMAT PARQUET,
PARTITION_BY (_partition_key),
OVERWRITE_OR_IGNORE true
)
"""
conn.execute(query)
print("Initial partitioning complete.", flush=True)
elapsed = time.perf_counter() - start
print(f"Shuffle and write completed in {elapsed:.2f} seconds")
print(f"Output written to: {output_path}")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I've optimized the select query to be based on limit and offset followed by order by. The previous implementation was still doing a full table scan. 483363d
Example run to pull a 1000 item slice from the 164GB partitioned dataset took <1s:
Starting select...
Start: 435971.227842381
End: 10999
Limit: 1000
Offset: 10000
Query:
SELECT * from (
SELECT * from '/lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet' LIMIT 1000 OFFSET 10000
) ORDER BY RANDOM()
Executing query...
Fetched 1000 rows
Select completed in 0.88 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Added new docstring to |
johnnygreco
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice @nabinchha! Good to go right after we merge #11 (hopefully this doesn't cause any strange conflicts for you)
minor conflicts! I'll mergeafter I have an approval |

SeedConfig(works for either of the sampling strategy)IndexRangeallows you to directly specify the start/end (inclusive) indices in the seed dataset that should be used for sampling.PartitionBlockallows you to specify the above but via a block address (index). By telling data designer how many blocks there are and what the index of the block is.Example notebook with workflow
Example preview output
Using the same query against a HF repo