Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ jobs:
unittest:
# The type of runner that the job will run on
runs-on: ubuntu-latest
strategy:
matrix:
config: [latest]
include:
- config: latest
PYARROW_VERSION: "6.0.1"
NUMPY_VERSION: "1.21.5"
TF_VERSION: "2.8.0"
PYSPARK_VERSION: "3.0.0"
ARROW_PRE_0_15_IPC_FORMAT: "0"
PY: "3.9"

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand All @@ -40,15 +51,15 @@ jobs:
- name: build and run unit tests
run: |
sleep 30
export PYARROW_VERSION="6.0.1"
export NUMPY_VERSION="1.21.5"
export TF_VERSION="2.8.0"
export PY="3.9"
export PYSPARK_VERSION="3.0.0"
export ARROW_PRE_0_15_IPC_FORMAT="0"
export PYARROW_VERSION=${{matrix.PYARROW_VERSION}}
export NUMPY_VERSION=${{matrix.NUMPY_VERSION}}
export TF_VERSION=${{matrix.TF_VERSION}}
export PY=${{matrix.PY}}
export PYSPARK_VERSION=${{matrix.PYSPARK_VERSION}}
export ARROW_PRE_0_15_IPC_FORMAT=${{matrix.ARROW_PRE_0_15_IPC_FORMAT}}
export RUN="docker exec -e ARROW_PRE_0_15_IPC_FORMAT=$ARROW_PRE_0_15_IPC_FORMAT petastorm_ci bash /run_in_venv.sh ${PY}"
export PYTEST="pytest --timeout=360 -v --color=yes --cov=./ --cov-report xml:coverage.xml"
$RUN pip install -U pip "setuptools<70"
$RUN pip install -U pip "setuptools<70"
$RUN pip install -e /petastorm/[test,tf,torch,docs,opencv]
$RUN pip install --upgrade numpy==$NUMPY_VERSION
$RUN pip install -U pyarrow==${PYARROW_VERSION} tensorflow==${TF_VERSION} pyspark==${PYSPARK_VERSION}
Expand Down Expand Up @@ -77,6 +88,12 @@ jobs:
petastorm/tests/test_pytorch_utils.py
$RUN $PYTEST -Y --cov-append petastorm/tests/test_tf_autograph.py

- name: codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
required: true

draft_release:
needs: unittest
# Only come with a tag
Expand Down Expand Up @@ -129,4 +146,4 @@ jobs:
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
password: ${{ secrets.PYPI_API_TOKEN }}
19 changes: 16 additions & 3 deletions petastorm/arrow_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ def __init__(self, worker_id, publish_func, args):
self._transformed_schema = args[7]
self._arrow_filters = args[8]
self._shuffle_rows = args[9]
self._random_state = np.random.RandomState(seed=args[10])
self._random_seed = args[10]

# Initialize random number generator
self._rng = np.random.default_rng(self._random_seed)

if self._ngram:
raise NotImplementedError('ngrams are not supported by ArrowReaderWorker')
Expand Down Expand Up @@ -289,9 +292,19 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_

# pyarrow would fail if we request a column names that the dataset is partitioned by
table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)

# Handle row shuffling based on shuffle_rows setting
if self._shuffle_rows:
indices = self._random_state.permutation(table.num_rows)
table = table.take(indices)
if self._random_seed is not None and self._random_seed != 0:
# Deterministic randomization: use provided seed
indices = self._rng.permutation(table.num_rows)
else:
# Non-deterministic randomization: use np.random directly
indices = np.random.permutation(table.num_rows)
else:
# Deterministic natural order: shuffle_rows=False
indices = np.arange(table.num_rows)
table = table.take(indices)

# Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
# requested, pyarrow will also return partition values. Having these unexpected fields will break some
Expand Down
4 changes: 2 additions & 2 deletions petastorm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

# Ventilator guarantees that no more than workers + _VENTILATE_EXTRA_ROWGROUPS are processed at a moment by a
# worker pool. This guarantees that we don't run out of memory if data consumer is slower than the Reader.
_VENTILATE_EXTRA_ROWGROUPS = 2
_VENTILATE_EXTRA_ROWGROUPS = 3

LOCAL_DISK_CACHE = 'local-disk'
NULL_CACHE = 'null'
Expand Down Expand Up @@ -475,7 +475,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None,
self.ventilator = self._create_ventilator(filtered_row_group_indexes, shuffle_row_groups,
normalized_shuffle_row_drop_partitions,
self.num_epochs, worker_predicate,
self._workers_pool.workers_count + _VENTILATE_EXTRA_ROWGROUPS,
self._workers_pool.workers_count * (1 + _VENTILATE_EXTRA_ROWGROUPS),
seed)

# 5. Start workers pool
Expand Down
8 changes: 5 additions & 3 deletions petastorm/workers_pool/tests/test_workers_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,17 @@ def test_stop_when_result_queue_is_full(self):
SLEEP_DELTA = 0.01
TIMEOUT = 20
QUEUE_SIZE = 2
WORKERS_COUNT = 10

pool = ThreadPool(10, results_queue_size=QUEUE_SIZE)
pool = ThreadPool(WORKERS_COUNT, results_queue_size=QUEUE_SIZE)
pool.start(WorkerIdGeneratingWorker)

for _ in range(100):
for _ in range(1000):
pool.ventilate()

expected_queue_size = WORKERS_COUNT * max(5, QUEUE_SIZE // WORKERS_COUNT)
cumulative_wait = 0
while pool.results_qsize() != QUEUE_SIZE:
while pool.results_qsize() != expected_queue_size:
time.sleep(SLEEP_DELTA)
cumulative_wait += SLEEP_DELTA
# Make sure we wait no longer than the timeout. Otherwise, something is very wrong
Expand Down
64 changes: 48 additions & 16 deletions petastorm/workers_pool/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,21 @@ def __init__(self, workers_count, results_queue_size=50, profiling_enabled=False
"""
self._seed = random.randint(0, 100000)
self._workers = []
self._ventilator_queue = None
self._ventilator_queues = []
self.workers_count = workers_count
self._results_queue_size = results_queue_size
# Worker threads will watch this event and gracefully shutdown when the event is set
self._stop_event = Event()
self._profiling_enabled = profiling_enabled

# Count of items ventilated by the pool
self._ventilated_items = 0
self._ventilated_items_processed = 0
# Count of items ventilated by each worker
self._ventilated_items_by_worker = [0 for _ in range(self.workers_count)]
# Count of items processed by each worker
self._ventilated_items_processed_by_worker = [0 for _ in range(self.workers_count)]
self._ventilator = None
self._get_results_worker_id = 0

def start(self, worker_class, worker_args=None, ventilator=None):
"""Starts worker threads.
Expand All @@ -115,14 +120,19 @@ class must implement :class:`.WorkerBase` protocol
raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}'
.format(len(self._workers), self._stop_event.is_set()))

# Set up a channel to send work
self._ventilator_queue = queue.Queue()
self._results_queue = queue.Queue(self._results_queue_size)
# Set up a channel for each worker to send work
self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)]
# Set up a channel for each worker to send results
self._results_queues = [queue.Queue(5) for _ in range(self.workers_count)]
self._workers = []
for worker_id in range(self.workers_count):
worker_impl = worker_class(worker_id, self._stop_aware_put, worker_args)
new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queue,
self._results_queue, self._profiling_enabled)
# Create a closure that captures the worker_id for this specific worker
def make_publish_func(worker_id):
return lambda data: self._stop_aware_put(worker_id, data)

worker_impl = worker_class(worker_id, make_publish_func(worker_id), worker_args)
new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queues[worker_id],
self._results_queues[worker_id], self._profiling_enabled)
# Make the thread daemonic. Since it only reads it's ok to abort while running - no resource corruption
# will occur.
new_thread.daemon = True
Expand All @@ -139,8 +149,22 @@ class must implement :class:`.WorkerBase` protocol
def ventilate(self, *args, **kargs):
"""Sends a work item to a worker process. Will result in ``worker.process(...)`` call with arbitrary arguments.
"""
current_worker_id = self._ventilated_items % self.workers_count
self._ventilated_items += 1
self._ventilator_queue.put((args, kargs))
self._ventilated_items_by_worker[current_worker_id] += 1
self._ventilator_queues[current_worker_id].put((args, kargs))

def current_worker_done(self, worker_id):
# Check if the current worker has processed all the items it was assigned and if the results queue is empty
return (self._ventilated_items_processed_by_worker[worker_id] == self._ventilated_items_by_worker[worker_id]
and self._results_queues[worker_id].empty())

def all_workers_done(self):
# Check if all workers have processed all the items they were assigned and if the results queues are empty
for i in range(self.workers_count):
if not self.current_worker_done(i):
return False
return True

def get_results(self):
"""Returns results from worker pool or re-raise worker's exception if any happen in worker thread.
Expand All @@ -151,20 +175,28 @@ def get_results(self):
:return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
:class:`.EmptyResultError`.
"""

while True:
# If there is no more work to do, raise an EmptyResultError
if self._results_queue.empty() and self._ventilated_items == self._ventilated_items_processed:
if self.all_workers_done():
# We also need to check if we are using a ventilator and if it is completed
if not self._ventilator or self._ventilator.completed():
raise EmptyResultError()

# If the current worker is done, we need to get the result from the next worker
if self.current_worker_done(self._get_results_worker_id):
self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
continue

try:
result = self._results_queue.get(timeout=_VERIFY_END_OF_VENTILATION_PERIOD)
result = self._results_queues[self._get_results_worker_id].get(
block=True, timeout=_VERIFY_END_OF_VENTILATION_PERIOD
)
if isinstance(result, VentilatedItemProcessedMessage):
self._ventilated_items_processed += 1
self._ventilated_items_processed_by_worker[self._get_results_worker_id] += 1
if self._ventilator:
self._ventilator.processed_item()
# Move to the next worker
self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
continue
elif isinstance(result, Exception):
self.stop()
Expand Down Expand Up @@ -197,15 +229,15 @@ def join(self):
stats = pstats.Stats(w.prof)
stats.sort_stats('cumulative').print_stats()

def _stop_aware_put(self, data):
def _stop_aware_put(self, worker_id, data):
"""This method is called to write the results to the results queue. We use ``put`` in a non-blocking way so we
can gracefully terminate the worker thread without being stuck on :func:`Queue.put`.

The method raises :class:`.WorkerTerminationRequested` exception that should be passed through all the way up to
:func:`WorkerThread.run` which will gracefully terminate main worker loop."""
while True:
try:
self._results_queue.put(data, block=True, timeout=IO_TIMEOUT_INTERVAL_S)
self._results_queues[worker_id].put(data, block=True, timeout=IO_TIMEOUT_INTERVAL_S)
return
except queue.Full:
pass
Expand All @@ -214,7 +246,7 @@ def _stop_aware_put(self, data):
raise WorkerTerminationRequested()

def results_qsize(self):
return self._results_queue.qsize()
return sum(queue.qsize() for queue in self._results_queues)

@property
def diagnostics(self):
Expand Down
16 changes: 11 additions & 5 deletions petastorm/workers_pool/ventilator.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def __init__(self,
self._items_to_ventilate = items_to_ventilate
self._iterations_remaining = iterations
self._randomize_item_order = randomize_item_order
self._random_state = np.random.RandomState(seed=random_seed)
self._random_seed = random_seed
self._rng = np.random.default_rng(self._random_seed)
self._iterations = iterations

# For the default max ventilation queue size we will use the size of the items to ventilate
Expand Down Expand Up @@ -136,15 +137,20 @@ def reset(self):
self.start()

def _ventilate(self):
# Randomize the item order before starting the ventilation if randomize_item_order is set
if self._randomize_item_order:
if self._random_seed is not None and self._random_seed != 0:
# Deterministic randomization: use provided seed
self._items_to_ventilate = list(self._rng.permutation(self._items_to_ventilate))
else:
# Non-deterministic randomization: use np.random
self._items_to_ventilate = list(np.random.permutation(self._items_to_ventilate))

while True:
# Stop condition is when no iterations are remaining or there are no items to ventilate
if self.completed():
break

# If we are ventilating the first item, we check if we would like to randomize the item order
if self._current_item_to_ventilate == 0 and self._randomize_item_order:
self._random_state.shuffle(self._items_to_ventilate)

# Block until queue has room, but use continue to allow for checking if stop has been called
if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size:
sleep(self._ventilation_interval)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
'psutil>=4.0.0',
'pyspark>=2.1.0',
'pyzmq>=14.0.0',
'pyarrow>=0.17.1',
'pyarrow>=6.0.1',
'six>=1.5.0',
'fsspec',
'setuptools<70', # Prevent compatibility issues with newer setuptools
Expand Down
Loading