-
Notifications
You must be signed in to change notification settings - Fork 27
Add adaptive metadata upload batch sizing #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
84632f2
Add adaptive metadata upload batch sizing
guysmoilov ded5007
Handle non-retryable metadata upload failures
guysmoilov 567c830
Allow retry shrink on partial metadata batches
guysmoilov 4a767ab
Add retry backoff for metadata upload failures
guysmoilov 0f5e9d7
Honor min batch size on retry failures
guysmoilov 82e4503
Mock retry backoff sleep in upload tests
guysmoilov 996b7fc
Avoid reusing known-bad upload batch size
guysmoilov 2338c94
Extract metadata upload retry backoff constants
guysmoilov b884e9e
Align metadata upload backoff cap with schedule
guysmoilov 4c1132c
Refactor adaptive metadata upload sizing and retries
guysmoilov 6ae0a58
Fix adaptive upload expected batch sequence test
guysmoilov 1b6356b
Extract AdaptiveBatcher into dagshub.common for reuse
guysmoilov 3a944ae
Support unbounded iterables in AdaptiveBatcher
guysmoilov 6cea20f
Format config.py line length (black)
guysmoilov a52a0aa
Fix review issues in AdaptiveBatcher
guysmoilov f6dfec9
Make batch growth factor and retry backoff configurable, add tests
guysmoilov 349c25c
Fix batch size stall at bad_batch_size - 1, clear stale bounds
guysmoilov 3e0ae68
Simplify batch sizing functions with clear strategy comments
guysmoilov 3c1c817
Clear stale good/bad bounds when they become incoherent
guysmoilov 232ebbf
Improve failure fallback convergence, add bad-bound clearing test
guysmoilov 2c44d83
Fix retryable error check for wrapped exceptions in DataEngineGqlError
guysmoilov 0453a33
Align metadata upload tests with adaptive batching behavior
guysmoilov 00294fe
Handle tail-batch retry edge case and docstring mismatch
guysmoilov 5cd02a7
Raise adaptive upload max default and clarify max indirection
guysmoilov dbf7a68
Use explicit max config name for adaptive upload sizing
guysmoilov 53ca65a
Handle missing TransportConnectionFailed in supported gql versions
guysmoilov 946134f
Review fixes
guysmoilov 841972c
removed LEGACY_ nonsense
guysmoilov ebde20c
Refine adaptive batching search behavior
guysmoilov e07343c
Show adaptive batch size progress again
guysmoilov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,261 @@ | ||
| import itertools | ||
| import logging | ||
| import time | ||
| from dataclasses import dataclass | ||
| from typing import Callable, Iterable, List, Optional, Sized, TypeVar | ||
|
|
||
| import rich.progress | ||
|
|
||
| import dagshub.common.config as dgs_config | ||
| from dagshub.common.rich_util import get_rich_progress | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
| MIN_TARGET_BATCH_TIME_SECONDS = 0.01 | ||
|
|
||
|
|
||
| @dataclass | ||
| class AdaptiveBatchConfig: | ||
| max_batch_size: int | ||
| min_batch_size: int | ||
| initial_batch_size: int | ||
| target_batch_time_seconds: float | ||
| batch_growth_factor: int | ||
| retry_backoff_base_seconds: float | ||
| retry_backoff_max_seconds: float | ||
|
|
||
| @classmethod | ||
| def from_values( | ||
| cls, | ||
| max_batch_size: Optional[int] = None, | ||
| min_batch_size: Optional[int] = None, | ||
| initial_batch_size: Optional[int] = None, | ||
| target_batch_time_seconds: Optional[float] = None, | ||
| batch_growth_factor: Optional[int] = None, | ||
| retry_backoff_base_seconds: Optional[float] = None, | ||
| retry_backoff_max_seconds: Optional[float] = None, | ||
| ) -> "AdaptiveBatchConfig": | ||
| if max_batch_size is None: | ||
| max_batch_size = dgs_config.dataengine_metadata_upload_batch_size_max | ||
| if min_batch_size is None: | ||
| min_batch_size = dgs_config.dataengine_metadata_upload_batch_size_min | ||
| if initial_batch_size is None: | ||
| initial_batch_size = dgs_config.dataengine_metadata_upload_batch_size_initial | ||
| if target_batch_time_seconds is None: | ||
| target_batch_time_seconds = dgs_config.dataengine_metadata_upload_target_batch_time_seconds | ||
| if batch_growth_factor is None: | ||
| batch_growth_factor = dgs_config.adaptive_batch_growth_factor | ||
| if retry_backoff_base_seconds is None: | ||
| retry_backoff_base_seconds = dgs_config.adaptive_batch_retry_backoff_base_seconds | ||
| if retry_backoff_max_seconds is None: | ||
| retry_backoff_max_seconds = dgs_config.adaptive_batch_retry_backoff_max_seconds | ||
|
|
||
| normalized_max_batch_size = max(1, max_batch_size) | ||
| normalized_min_batch_size = max(1, min(min_batch_size, normalized_max_batch_size)) | ||
| normalized_initial_batch_size = max( | ||
| normalized_min_batch_size, | ||
| min(initial_batch_size, normalized_max_batch_size), | ||
| ) | ||
| normalized_target_batch_time_seconds = max(target_batch_time_seconds, MIN_TARGET_BATCH_TIME_SECONDS) | ||
| return cls( | ||
| max_batch_size=normalized_max_batch_size, | ||
| min_batch_size=normalized_min_batch_size, | ||
| initial_batch_size=normalized_initial_batch_size, | ||
| target_batch_time_seconds=normalized_target_batch_time_seconds, | ||
| batch_growth_factor=max(2, batch_growth_factor), | ||
| retry_backoff_base_seconds=max(0.0, retry_backoff_base_seconds), | ||
| retry_backoff_max_seconds=max(0.0, retry_backoff_max_seconds), | ||
| ) | ||
|
|
||
|
|
||
| def _clamp(value: int, lo: int, hi: int) -> int: | ||
| return max(lo, min(hi, value)) | ||
|
|
||
|
|
||
| def _next_batch_after_success( | ||
| batch_size: int, | ||
guysmoilov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| config: AdaptiveBatchConfig, | ||
| bad_batch_size: Optional[int], | ||
| ) -> int: | ||
| """Pick the next batch size after a fast successful batch. | ||
|
|
||
| Strategy: | ||
| - If we have a previous slow/failing size, binary-search toward it as a soft upper hint. | ||
| - Otherwise, multiply by the growth factor. | ||
| - If the midpoint rounds back to the current size, advance by 1 so the search | ||
| keeps moving. That may revisit the previous failing size, because these hints | ||
| are soft signals rather than permanent bans. | ||
| """ | ||
| if bad_batch_size is not None and batch_size < bad_batch_size: | ||
| # Binary search: try the midpoint between current and the soft upper hint. | ||
| candidate = (batch_size + bad_batch_size) // 2 | ||
| else: | ||
| # No upper hint (or we've already reached it): grow aggressively. | ||
| candidate = batch_size * config.batch_growth_factor | ||
|
|
||
| # Always make forward progress in the search. | ||
| candidate = max(candidate, batch_size + 1) | ||
|
|
||
| return _clamp(candidate, config.min_batch_size, config.max_batch_size) | ||
|
|
||
|
|
||
| def _next_batch_after_retryable_failure( | ||
| batch_size: int, | ||
| config: AdaptiveBatchConfig, | ||
| good_batch_size: Optional[int], | ||
| bad_batch_size: Optional[int], | ||
| ) -> int: | ||
| """Pick the next batch size after a failed or slow batch. | ||
|
|
||
| Strategy: | ||
| - If we have a known-good lower bound, binary-search between it and the | ||
| failing size. | ||
| - Otherwise, probe the midpoint between config.min_batch_size and the | ||
| largest allowed size below the failing batch. | ||
| - Must be strictly less than the current size (so we converge downward). | ||
| """ | ||
| if batch_size <= config.min_batch_size: | ||
| return config.min_batch_size | ||
|
|
||
| ceiling = batch_size - 1 # must shrink | ||
| if bad_batch_size is not None: | ||
| ceiling = min(ceiling, bad_batch_size - 1) | ||
|
|
||
| if good_batch_size is not None and good_batch_size < ceiling: | ||
| # Binary search: try the midpoint between good and failing | ||
| candidate = (good_batch_size + ceiling) // 2 | ||
| else: | ||
| # No good lower bound — probe midpoint of the valid range | ||
| candidate = (config.min_batch_size + ceiling) // 2 | ||
|
|
||
| return _clamp(candidate, config.min_batch_size, ceiling) | ||
guysmoilov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def _get_retry_delay_seconds(consecutive_retryable_failures: int, config: AdaptiveBatchConfig) -> float: | ||
| if config.retry_backoff_base_seconds <= 0.0 or config.retry_backoff_max_seconds <= 0.0: | ||
| return 0.0 | ||
|
|
||
| attempt_number = max(1, consecutive_retryable_failures) | ||
| delay = config.retry_backoff_base_seconds * (2 ** (attempt_number - 1)) | ||
| return min(delay, config.retry_backoff_max_seconds) | ||
|
|
||
|
|
||
| class AdaptiveBatcher: | ||
| """Sends items in adaptively-sized batches, growing on success and shrinking on failure.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| is_retryable: Callable[[Exception], bool], | ||
| config: Optional[AdaptiveBatchConfig] = None, | ||
| progress_label: str = "Uploading", | ||
| ): | ||
| self._config = config if config is not None else AdaptiveBatchConfig.from_values() | ||
| self._is_retryable = is_retryable | ||
| self._progress_label = progress_label | ||
|
|
||
| def run(self, items: Iterable[T], operation: Callable[[List[T]], None]) -> None: | ||
guysmoilov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| total: Optional[int] = len(items) if isinstance(items, Sized) else None | ||
| if total == 0: | ||
| return | ||
|
|
||
| config = self._config | ||
| current_batch_size = config.initial_batch_size | ||
| # Consume the source iterable incrementally across retries and successes. | ||
| it = iter(items) | ||
guysmoilov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pending: List[T] = [] | ||
|
|
||
| progress = get_rich_progress(rich.progress.MofNCompleteColumn()) | ||
| total_task = progress.add_task(f"{self._progress_label}...", total=total) | ||
|
|
||
| last_good_batch_size: Optional[int] = None | ||
| last_bad_batch_size: Optional[int] = None | ||
| consecutive_retryable_failures = 0 | ||
| processed = 0 | ||
|
|
||
| with progress: | ||
| while True: | ||
| # Draw from pending (failed-batch leftovers) first, then the source iterator | ||
| batch = pending[:current_batch_size] | ||
| pending = pending[current_batch_size:] | ||
| if len(batch) < current_batch_size: | ||
| batch.extend(itertools.islice(it, current_batch_size - len(batch))) | ||
| if not batch: | ||
| break | ||
| batch_size = len(batch) | ||
|
|
||
| logger.debug(f"{self._progress_label}: {batch_size} entries...") | ||
|
|
||
| start_time = time.monotonic() | ||
| try: | ||
| operation(batch) | ||
| except Exception as exc: | ||
| if not self._is_retryable(exc): | ||
| logger.error( | ||
| f"{self._progress_label} failed with a non-retryable error; aborting.", | ||
| exc_info=True, | ||
| ) | ||
| raise | ||
|
|
||
| exhausted_shrink = batch_size <= config.min_batch_size and batch_size == current_batch_size | ||
| if exhausted_shrink: | ||
| logger.error( | ||
| f"{self._progress_label} failed at minimum batch size ({batch_size}); aborting.", | ||
| exc_info=True, | ||
| ) | ||
| raise | ||
|
|
||
| consecutive_retryable_failures += 1 | ||
| time.sleep(_get_retry_delay_seconds(consecutive_retryable_failures, config)) | ||
|
|
||
| last_bad_batch_size = ( | ||
| batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) | ||
| ) | ||
| if last_good_batch_size is not None and last_good_batch_size >= last_bad_batch_size: | ||
| last_good_batch_size = None | ||
| if batch_size < config.min_batch_size: | ||
| # Tail batches below configured min cannot be split further. | ||
| # Retry that exact size once before treating it as exhausted. | ||
| current_batch_size = batch_size | ||
| else: | ||
| current_batch_size = _next_batch_after_retryable_failure( | ||
| batch_size, config, last_good_batch_size, last_bad_batch_size | ||
| ) | ||
| logger.warning( | ||
| f"{self._progress_label} failed for batch size {batch_size} " | ||
| f"({exc.__class__.__name__}: {exc}). Retrying with batch size {current_batch_size}." | ||
| ) | ||
| # Re-queue the failed batch items for retry with smaller batch size | ||
| pending = batch + pending | ||
| continue | ||
|
|
||
guysmoilov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # On success. | ||
| elapsed = time.monotonic() - start_time | ||
| consecutive_retryable_failures = 0 | ||
| processed += batch_size | ||
| progress.update(total_task, advance=batch_size) | ||
|
|
||
| if elapsed <= config.target_batch_time_seconds: | ||
| last_good_batch_size = ( | ||
| batch_size if last_good_batch_size is None else max(last_good_batch_size, batch_size) | ||
| ) | ||
| # Clear the soft upper hint if we succeeded fast at or above it. | ||
| if last_bad_batch_size is not None and batch_size >= last_bad_batch_size: | ||
| last_bad_batch_size = None | ||
| current_batch_size = _next_batch_after_success(batch_size, config, last_bad_batch_size) | ||
| else: | ||
| logger.debug( | ||
| f"{self._progress_label} batch size {batch_size} took {elapsed:.2f}s " | ||
| f"(target {config.target_batch_time_seconds:.2f}s); shrinking." | ||
| ) | ||
| last_bad_batch_size = ( | ||
| batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) | ||
| ) | ||
| if last_good_batch_size is not None and last_good_batch_size >= last_bad_batch_size: | ||
| last_good_batch_size = None | ||
| current_batch_size = _next_batch_after_retryable_failure( | ||
| batch_size, config, last_good_batch_size, last_bad_batch_size | ||
| ) | ||
|
|
||
| progress.update(total_task, completed=processed, total=processed, refresh=True) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could all be redone with a
default_factoryon each dataclass field:https://docs.python.org/3/library/dataclasses.html#default-factory-functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default_factorywould not cover whatfrom_values()is doing here: reading runtime config plus normalizing and clamping related fields.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would it not cover it? It is literally just a function that runs at init-time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because default_factory as far as I understand can't initialize fields based on a function of other fields - it's only a function to initialize each field individually. For interactions between fields that determine field values oyu need.... a constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only value that actually is dependent on another value in that whole constructor is
min_batch_size, everything else is derived from configuration values.If you really want to clamp
min_batch_size, you can do it in__post_init__(): https://docs.python.org/3/library/dataclasses.html#dataclasses.__post_init__