-
Notifications
You must be signed in to change notification settings - Fork 28
Description
In order to make adaptive decisions about shuffling/broadcasting/etc during run time, we need to keep track of some basic metadata. In cudf-polars, this metadata is propagated through the network using a distinct Channel between every node. After #811, we will be able to use the same Channel for both data and metadata payloads. However, in order for cudf-polars to use any C++ Join/GroupBy nodes, we will (probably) still need cudf-polars and RapidsMPF to align on a common/standard metadata schema.
At the very minimum, the channel metadata should keep track of the expected local chunk count, and whether the local data coming through the channel is already broadcasted (duplicated) across all ranks. In order to avoid "re-shuffling" data unnecessarily, we also want to track the "partitioning" status of the data being sent into an actor.
Current Metadata Schema
The current Metadata definition in cudf-polars looks like:
class Metadata:
"""Metadata payload for an ETL workload."""
local_count: int
"""Local chunk-count estimate for the current rank."""
global_count: int | None
"""Global chunk-count estimate across all ranks."""
partitioning: HashPartitioned | None
"""How the data is partitioned, or None if not partitioned."""
duplicated: bool
"""Whether the data is duplicated (identical) on all workers."""
class HashPartitioned:
"""Hash-partitioning metadata.
columns: tuple[str, ...]
"""Columns the data is hash-partitioned on."""
scope: Literal["local", "global"]
"""Whether data is partitioned locally (within a rank) or globally (across all ranks)."""
count: int
"""The modulus used for hash partitioning (number of partitions)."""Partitioning metadata
An important "open" question is how to best encode partitioning status of the data being sent through a channel. For now, the partitioning attribute can either be empty (None), or it can be set to HashPartitioned. This makes it easy to keep track of data that has already been globally shuffled. However, it does not make it easy to distinguish between data that was directly shuffled into N_g global chunks, and data that was shuffled between R ranks, and then N_l local chunks.