-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Description
Search before asking
- I had searched in the issues and found no similar feature requirement.
Description
By storing the objects in Plasma, data can be efficiently (0-copy) shared between Ray tasks. We propose to provide a plasma-like object store with more functionalities targeting different situations (e.g. to better support kubernetes). In other words, we can simultaneously have multiple plasma object-store variants with different functionalities. (users can choose a variant in ray.init()
)
The change will only happen in plasma clients and will not break the existing logic. We first need to expose the interface of the plasma client. Here we may add a virtual class for a specific store client to inherit. Make the PlasmaClientImpl
to inherit the IPlasmaClientImpl
. This allows other object stores (.e.g vineyard) to act like a plasma mock, thus Ray can put objects into a third-party object store instead of Plasma.
class IPlasmaClientImpl{
public:
// connect to the store server
virtual Status Connect(const std::string &store_socket_name,
const std::string &manager_socket_name, int release_delay = 0,
int num_retries = -1) = 0;
// create a blob with given size.
virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id,
const ray::rpc::Address &owner_address, int64_t data_size,
const uint8_t *metadata, int64_t metadata_size,
std::shared_ptr<Buffer> *data, plasma::flatbuf::ObjectSource source, int device_num = 0) = 0;
// create a blob immediately for some clients that can't wait.
virtual Status TryCreateImmediately(const ObjectID &object_id, int64_t data_size,
const uint8_t *metadata, int64_t metadata_size,
std::shared_ptr<Buffer> *data, plasma::flatbuf::ObjectSource source, int device_num) = 0;
// get objects by object_ids.
virtual Status Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers, bool is_from_worker) = 0;
// notify the server that this client does not need the object.
virtual Status Release(const ObjectID &object_id) = 0;
// check the existence
virtual Status Contains(const ObjectID &object_id, bool *has_object) = 0;
// Abort an unsealed object as if it was never created at all.
virtual Status Abort(const ObjectID &object_id) = 0;
// Seal the object, make it immutable.
virtual Status Seal(const ObjectID &object_id) = 0;
// Delete the objects only when (1) they exist, (2) has been sealed, (3) not used by any clients. or it will do nothing.
virtual Status Delete(const std::vector<ObjectID> &object_ids) = 0;
// Delete objects until we have freed up num_bytes bytes.
virtual Status Evict(int64_t num_bytes, int64_t &num_bytes_evicted) = 0;
// disconnect from the store server.
virtual Status Disconnect() = 0;
// query the capacity of the object store.
virtual int64_t store_capacity() = 0;
};
Use case
For example, the Plasma store is currently an internal component of the raylet, and it shares the same fate with the raylet. For systems that are not integrated with ray, they can not share objects via Plasma, thus have to share them via filesystem which will inevitably introduce heavy overheads if their data are complex (e.g. serializing a graph costs a long time.). If the core-worker can put results in a third-party object store, the out-of-ray systems can directly get and recognize them.
We are going to submit a PR to expose the interface above, if this can be accepted, we will implement new client variants (integrate with other off-the-shelf object stores) in another PR to provide more functionality.
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!