Skip to content

Conversation

mengke-mk
Copy link
Contributor

This .md is our REP-002 to allow third-party object store as a Plasma alternative. We propose to provide a plasma-like object store with more functionalities target for different situations.

Related Issues: #22948 and #22795.

@jjyao
Copy link
Contributor

jjyao commented Mar 10, 2022

@zhe-thoughts Should we comment on this PR or create a corresponding issue for comments. What's the process here? Does merging the PR means the REP is approved?

@zhe-thoughts
Copy link
Collaborator

Thanks for the proposal @septicmk !

Should we comment on this PR or create a corresponding issue for comments. What's the process here? Does merging the PR means the REP is approved?

To @jjyao 's question: the designated Shepherd should work with the proposer to merge the PR. Then the reviews should happen on the corresponding GH issue. @septicmk you didn't suggest a shepherd, but based on the area I assigned to @ericl and @scv119 (maybe Eric and Chen can decide who should be the main shepherd here)

@scv119
Copy link
Contributor

scv119 commented Mar 11, 2022

To @jjyao 's question: the designated Shepherd should work with the proposer to merge the PR. Then the reviews should happen on the corresponding GH issue.

If I understand the process correctly, we need to first finalize the design before merging this PR.

@scv119
Copy link
Contributor

scv119 commented Mar 11, 2022

I think there are two parts not so clear to me:

We propose to define a protocol here that both vineyard and plasma should agree. For example, the protocol will define when a object adds/remove its reference, when object can be evicted/spilled. In other word The protocol defines the impact on lifecycle management in server-side when calling the above ClientImplInterface API in client-side. Since most of concerns about the integration may happens in this part, we may need further discussions about these protocols.

Can you clarify about the protocol?

Object spilling

When creating a new object with insufficient memory even with evicting possible objects, the local object manager will try to spill objects to external storage. The code path of spilling is out of Plasma (via callback) thus we can reuse it in third-party object-store.
Protocol #5: The spilling only happens in a object creation.(we trigger spilling when memory usage > threshold(triggle)) (spill)
Protocol #6: The spilling only happens when eviction can not meet the requirement. (create object will first trigger eviction)
Protocol #7: The spilling only happens in primary objects.(the spill_objects_callback will handle the spillng).

Also today plasma spilling calls back to Ray when it runs out of memory. It's not clear to me how the new API support spilling from this design doc.

@mengke-mk
Copy link
Contributor Author

Can you clarify about the protocol?

The protocols here mean the side-effect on object lifecycle of the API defined in the REP, since the lifecycle management is hidden on the server-side, we need to clarify what will happen on lifecycle when we call the client APIs. For example, the reference count will increase ONLY when we call Get() or CreateAndSpillIfNeeded(), and decrease ONLY when we call Disconnect() or Release(). So we list eight protocols below, defining how the lifecycle of a plasma object changes when we call the APIs. If we do the same on the third-party object
store, the raylet and core worker can expect it performs the same as plasma does. In another word, these protocols guarantee that if an object is deletable, evictable, or spillable in plasma, then it is also deletable, evictable, or spillable in a third-party object store.

Also today plasma spilling calls back to Ray when it runs out of memory. It's not clear to me in this design doc, and also it requires some API change as well.

Today the spilling of plasma relies on the callback. A PlasmaStoreRunner is running on a thread within raylet. Triggering the callback will directly call the methods in raylet. As a separate process, a third-party object store is unable to do so. Thus we are going to put a PlasmaProxy on a thread in Raylet instead of the PlasmaStoreRunner. When triggering the callback in the third-party object store, it will send a request to the PlasmaProxy via IPC thus it can call the methods in raylet like plasma does so. Maybe we do not require API changes in the current client interface (a third-party object store will not do spilling itself, it only triggers spilling with a given threshold. In such a situation, a third-party object store will send a request to PlasmaProxy to trigger the spilling callback).

@scv119
Copy link
Contributor

scv119 commented Mar 12, 2022

Thanks for the clarification. The reference counting part makes a lot of sense.
Should we get a bit more concrete into the spilling part as it's also important here? also cc @rkooo567 on this.

@mengke-mk
Copy link
Contributor Author

Hi, @scv119 I think we can always replace all the function call with blocking IPC request for the interaction between plasma and ray. Suppose the spilling only relies on callbacks, and the ray does not directly call plasma functions. In that case, I think it is ok to do the replacement by introducing some IPCs between PlasmaProxy and the third-party object store(trigger the spilling in PlasmProxy). In the WORST case, we can allow the third-party object store to ignore all the spilling, which means the third-party object-store never triggers spilling in object creation, I think it is still functional somehow(If we treat the spilling ability as the unique feature of plasma. Instead, the third-party will provide other features). So could you elaborate on what is your concern?

@scv119
Copy link
Contributor

scv119 commented Mar 15, 2022

Yup; we need to decide where the spilling happens.

  • option 1, spilling happens above the storage implementation (as of today). If that's the case, we need define the API that the storage that triggers spilling.
  • option 2, spilling happens inside the storage. For this we need revisit current Ray's assumption around spilling. There are a bunch of features build around spilling happens outside of storage (such as object transfer, spill manager etc) and they may or may not break.

@mengke-mk
Copy link
Contributor Author

We do not intend to change the today spilling mechanism of ray, since it is a little heavy to do so in these PRs. Thus option 1 looks more attractive to me. So, does option 1 mean the client-side will take initiative to trigger spilling? I think the current API may be the CreateAndSpillIfNeeded() which will trigger spilling when space is not enough during object creation. A third-party object-store will check the usage of storage can decide whether to trigger the spilling. Is this API enough for spilling? As for vineyard, we will add a new handle to process the CreateAndSpill request in the vineyard codebase.


#### Object spilling

When creating a new object with insufficient memory even with evicting possible objects, the local object manager will try to spill objects to external storage. The code path of spilling is out of Plasma (via callback) thus we can reuse it in third-party object-store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today the object spilling works in following way:

  • The underlining object store provides an API to query if an object is spillable.
  • When we create plasma store, we need to pass a callback to it to allow it triggers spilling.

We need to capture this in the storage interface, otherwise it's likely the new storage will unlikely to work.

Copy link
Contributor Author

@mengke-mk mengke-mk Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is confusing here since the invocation of this API is in the spilling_callback(here). Can we deprecate this API and make the query happen directly on the server-side (which means when store pass a list of objects to the spilling_callback, it should guarantee all the objects are spillable)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't each object store have their own spilling implementation (e.g. where to spill, whether encryption is needed), clients only control what is spillable or not?

**Protocol #8:** `Delete()`a object only when (1) it exists, (2) has been sealed, (3) not used by any clients. or it will do nothing.

We believe that if a third-party object store client can follow the above protocol when implementation the `ClientImplInterface`. Then the ray logic can be seamlessly built on top of the third-party object store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to spill callback that allows the storage trigger spilling,
there are other 3 callbacks including
object_store_full_callback, add_object_callback, as well as delete_object_callback passed to the storage.

Without implementing those callbacks Ray will not work properly.

Copy link
Contributor Author

@mengke-mk mengke-mk Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep these callbacks. Since the third-party object store is another process instead of a thread within Ray (and the third-party object should not be launched by raylet, when raylet starts, the third-party object store is already there.) In such a situation, it is unrealistic to pass these callbacks to third-party object store. Thus we have to process these callbacks in the ray-side. Thus we can introduce a PlasmaProxy(which is a thread in raylet) here to process the callbacks. (When a third-party store triggers the callback, it will send a request to the PlasmaProxy, thus the PlasmaProxy can execute these callbacks).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, @septicmk can we add a section for that in the doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add descriptions for that in step 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For add_object_callback, is this triggered only when the object is added by Ray?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct.

- Deprecation
- No API will be deprecated.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we need also touch upon is how do we run this storage with Raylet, how do we ensure fate-sharing between thme.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third-party object store may not share fate with Raylet. For example, the vineyard (another plasma-like object store) is a daemon pod in k8s cluster, this means that the data is still stored in the vineyard when the raylet exits, thus other systems running out of ray can get the data in ray world in a zero-copy way, and this is what we intend to bring to ray with the vineyard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, should Raylet handle the error (i.e. crash itself) if vineyard is running into issues? Otherwise the workers running into SEGFAULT issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the PlasmaProxy will share fate with raylet, if vineyard is running into issues, PlasmaProxy will complain that (we can use heartbeat here). I think ray can treat the PlasmaProxy as the third-party object store instance, it will throw exceptions just like the original plasma runner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this break all invariants within Ray when you start a new raylet though (there could be lots of duplicated object ids for example)? Do you plan to not restart a new raylet? Maybe we need to at least make object id not deterministic to make this work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a section about failure modes: what happen if raylet crashes, can worker still get objects from third-party object store, what happen if third-party object store crashes, etc. I'm worried that a different failure mode than what ray code is depending on now may cause problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the summary below.


The Ray keeps two type of "reference count", one is `ref_count` in plasma to track the local plasma object, and the other is the `reference_count` in core_worker to track the primary copy. In this proposal, we will not change the logic of `reference_count`, and the vineyard server will have its own `ref_count`.

**Protocol #1**: A object will increase its `ref_count` iff the `Get(...)` or `CreateAndSpillIfNeeded(...)` is invoked. ([Create](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L193), [Get](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L106))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about TryCreateImmediately ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix this in the next commit.

@rkooo567
Copy link
Contributor

I think the proposal itself sounds reasonable. But my biggest concern is if the existing behaviors / APIs are stable enough to settle on protocol / APIs.

It seems like there are some arbitrary APIs. For example, the semantics of Delete or Abort vs Delete. The protocol itself also seems to just fit into existing implementation (which is reasonable); but my question is, do we plan to improve this sooner or later? @scv119


**[solution]** We can keep these callbacks. The third-party object store is another process instead of a thread within Ray , this means the third-party object should not be launched by raylet, when raylet starts, the third-party object store is already there. Since we have to process these callbacks in the ray-side. Thus we can introduce a `PlasmaProxy`(which is a thread in raylet) here to process the callbacks.

We put the `PlasmaProxy` on a thread in Raylet instead of the `PlasmaStoreRunner`. When triggering the callback in the third-party object store, it will send a request to the `PlasmaProxy` via IPC thus it can call the methods in raylet like plasma does so. Maybe we do not require API changes in the current client interface (a third-party object store will not do spilling itself, it only triggers spilling with a given threshold. In such a situation, a third-party object store will send a request to `PlasmaProxy` to trigger the spilling callback).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to actually pin the object until objects are spilled. so we might need an additional protocol like "shouldn't unpin objects when spilling is initiated".

Release will be called by raylet once the spilling is completed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the server should pin the object until objects are spilled. I will add this in the next commit.


## Summary - Plasma Alternative
### General Motivation
Ray is a general-purpose and powerful computing framework and makes it simple to scale any compute-intensive Python workload. By storing the objects in Plasma, data can be efficiently (0-copy) shared between Ray tasks. We propose to provide a plasma-like object store with more functionalities target for different situations (e.g. to better support [[Kubernetes](https://kubernetes.io/zh/)](https://kubernetes.io/zh/)). In other words, we can simultaneously have multiple plasma object-store variants with different functionalities. (e.g. users can choose a variant at `ray.init()`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: https://kubernetes.io/zh/ -> https://kubernetes.io

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix this in the next commit.


To share the objects with out-of-Ray systems, vineyard provides API to persist a Ray object into an object that external systems can recognize(e.g. vineyard object in this proposal). The conversion should be zero-copy. The conversion APIs are provided by vineyard.
```python
# users get objects from ray then put (zero-copy) them into vineyard or vice versa.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put is not zero-copy right? Get is zero-copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The put here is to make a ray object visible to other vineyard clients. Since the ray object is already on top of vineyard store in this case, thus the put is zero-copy.

import ray
import vineyard as v6d

ray.init(plasma_store_impl="vineyard")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more design on this API as well: a single plasma_store_impl string is probably not enough, for example how do we know which vineyard server we should connect to, other configs needed to create a vineyard client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, I will add related parameters in the next commit.


**Protocol #1**: A object will increase its `ref_count` iff the `Get(...)` or `CreateAndSpillIfNeeded(...)` is invoked. ([Create](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L193), [Get](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L106))

**Protocol #2:** A object will decrease its `refer_count` iff the `release(...)` or `Disconnect(...)`is invoked. ([Release](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L268), [DIsconnect](https://github.com/ray-project/ray/blob/master/src/ray/object_manager/plasma/store.cc#L337))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refer_count -> ref_count, DIsconnect -> Disconnect

Also it's better to have code link to a particular commit otherwise it will change in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix this in the next commit.


#### Object spilling

When creating a new object with insufficient memory even with evicting possible objects, the local object manager will try to spill objects to external storage. The code path of spilling is out of Plasma (via callback) thus we can reuse it in third-party object-store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't each object store have their own spilling implementation (e.g. where to spill, whether encryption is needed), clients only control what is spillable or not?


**[solution]** We can keep these callbacks. The third-party object store is another process instead of a thread within Ray , this means the third-party object should not be launched by raylet, when raylet starts, the third-party object store is already there. Since we have to process these callbacks in the ray-side. Thus we can introduce a `PlasmaProxy`(which is a thread in raylet) here to process the callbacks.

We put the `PlasmaProxy` on a thread in Raylet instead of the `PlasmaStoreRunner`. When triggering the callback in the third-party object store, it will send a request to the `PlasmaProxy` via IPC thus it can call the methods in raylet like plasma does so. Maybe we do not require API changes in the current client interface (a third-party object store will not do spilling itself, it only triggers spilling with a given threshold. In such a situation, a third-party object store will send a request to `PlasmaProxy` to trigger the spilling callback).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to define the PlasmaProxy IPC protocol in this doc as well. Is PlasmaProxy generic for all object stores or we will have XXXPlasmaProxy for each object store?

Also the callback will have an IPC overhead instead of current in-process function call, we may need to discuss performance as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the summary below.

- Deprecation
- No API will be deprecated.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a section about failure modes: what happen if raylet crashes, can worker still get objects from third-party object store, what happen if third-party object store crashes, etc. I'm worried that a different failure mode than what ray code is depending on now may cause problems.

**Protocol #8:** `Delete()`a object only when (1) it exists, (2) has been sealed, (3) not used by any clients. or it will do nothing.

We believe that if a third-party object store client can follow the above protocol when implementation the `ClientImplInterface`. Then the ray logic can be seamlessly built on top of the third-party object store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For add_object_callback, is this triggered only when the object is added by Ray?

@@ -0,0 +1,185 @@

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think besides the explicitly defined PlasmaClientInterface, we also need to find all the hidden/implicit assumptions/dependencies Ray has on the object store. One thing I can think of is the scheduler, currently Ray treats object store memory as a resource and it will try to avoid scheduling tasks on node where object store is almost full. Does this still work with third-party object store especially when other systems will add objects to it that's out of the awareness of Ray.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the summary below.

@mengke-mk
Copy link
Contributor Author

mengke-mk commented Mar 17, 2022

A summary to the concerns above:

Q1: Visbility control

Wouldn't this break all invariants within Ray when you start a new raylet though (there could be lots of duplicated object ids for example)? Do you plan to not restart a new raylet? Maybe we need to at least make object id not deterministic to make this work? (by @rkooo567)

I think third-party object stores should not break the invariants within Ray.

[with visibility control] For a vineyard-like third-party object store that has a session/visibility/isolation mechanism, we can open a new session for each raylet (it is ok to have duplicated object ids that are separated in different sessions.)

[w/o visibility control] For other third-party object stores which do not provide such isolation control. we can add prefixes to the duplicated object ids to distinguish them (this should be handled in xxxClinetImpl which inherits the ClientImplInterface).

Q2: Failure

Can we add a section about failure modes: what happen if raylet crashes, can worker still get objects from the third-party object store, what happens if third-party object-store crashes, etc. I'm worried that a different failure mode than what ray code is depending on now may cause problems. (by @jjyao )

I think the third-party objects' failure mode should be consistent with plasma: which means it should behave the same as plasma when object-store/raylet crashes. We introduce XXXPlasmaProxy to handle the failure. The Third-party should provide an internal mechanism to block the connection when XXXPlasmaProxy runs into issues.

[Situation 1] When raylet crashes, workers can not get objects from the third-party object-store. Since the third-party object store will know it via the heartbeat of XXXPlasmaProxy, then it will close the connection of the related client (the same session of XXXPlasmaProxy ). This is a recoverable failure for the vineyard, when the raylet restart, it can connect to the previous session.

[Situation 2] When the third-party store crashes, the worker will not able to use the objects. The XXXPlasmaProxy will also know the fate of third-party object-store just like Situation 1. Since the XXXPlasmaProxy is running on a thread of raylet, it can complain just like plasma runner crashes. This is an unrecoverable failure for the vineyard. Since Raylet can not control the fate of the vineyard.

[Situation 3] When the XXXPlasmaProxy crashes, the third-party store will behave like the raylet crashes, it will close the connection, forbid the clients in the same session to get objects. the raylet will behave like the object store crashes, it will throw exceptions just like the original plasma runner. This is a recoverable failure for the vineyard, raylet can restart PlasmaProxy, then connect to the previous session.

Q3: Spilling

Shouldn't each object store have their own spilling implementation (e.g. where to spill, whether encryption is needed), clients only control what is spillable or not? (by @jjyao )

As for now, I think we have two options for spilling implementation:

[Option 1] Follow the current implementation. spilling is provided by raylet (in the form of callback), and the spillable query is provided by object-store. We can delegate the XXXPlasmaProxy to execute the callback in a thread of raylet. Third-party object stores send requests to XXXPlasmaProxy when the space is not enough. We have a lot of callbacks here, which means we do have to introduce IPCs here. I think this is a straightforward way to achieve our goal.

[Option 2] Refactor the current implementation and move all the spilling code-path to the store-side. clients only control what is spillable. I think this is more clear for the object store interface, since spilling an object falls within the ambit of an object store rather than the object manager. However, I think we should not include the change of spilling mechanism in this REP, since it is orthogonal to our proposal. Maybe someone can start another proposal? we are also willing to get involved in the spilling refactoring.

Q4: PlasmaProxy

I think we need to define the PlasmaProxy IPC protocol in this doc as well. Is PlasmaProxy generic for all object stores or we will have XXXPlasmaProxy for each object store? (by @jjyao )

Also the callback will have an IPC overhead instead of current in-process function call, we may need to discuss performance as well. (by @jjyao )

Yes, we will have XXXPlasmaProxy for each object store, I think this is more maintainable and clear to implement the integration of the third-party object-store. Maybe we can have a virtual class PlasmaProxyInterface here (should be added to this REP later). As for vineyard, VineyardPlasmaProxy will take the callbacks(spilling, add, delete, ...) into the Start() method, and execute them when required.

[callback overhead], I think the cost of a callback depends on the underlying store implementation. The abstraction here can be compatible with different object-store. For plasma, the PlasmaProxy will execute the callback via in-process function calls, for standalone store services like vineyard, the VineyardPlasmaProy will execute the callback via IPC. Supporting the third-party object store will not incur a performance hit for the current code path (plasma-version).

Q5: hidden/implicit assumptions/dependencies

I think besides the explicitly defined PlasmaClientInterface, we also need to find all the hidden/implicit assumptions/dependencies Ray has on the object-store. One thing I can think of is the scheduler, currently, Ray treats object-store memory as a resource and it will try to avoid scheduling tasks on the node where the object store is almost full. Does this still work with third-party object-store especially when other systems will add objects to it that's out of the awareness of Ray. (by @jjyao)

Yes, that is why I introduce the protocols in this REP, trying to clear the assumptions Ray has on the object-store. For now, if Ray wants to interact with the underlying object store, it only has three kinds of ways:

[Client API] Raylet or core worker send requests to server-side and get what they want in reply. The third-party object-store can do the same as the interface/protocol defines in this REP, which just fits into the existing implementation (as mentioned by @rkooo567). I think the APIs/protocol in this REP is enough to be consistent with the plasma CLIENT.

[Callbacks] Raylet pass callbacks to the plasma server, thus server-side can execute the raylet methods. This can be handled by the XXXPlasmaProxy, as mentioned above

[Call method directly] Raylet directly call plasma server methods. This can also be handled by XXXPlasmaProxy we can implement all the PlasmaStoreRunner functions in XXXPlasmaProxy, and forward them to a third-party store in this case.

For the scheduler case mentioned by @jjyao, I believe that raylet must have to query (or compute) the usage of object store via the above ways. We can do the same in the third-party object store, either by adding an API in the client-side or a method in XXXPlasmaProxy. Anyway, I think we can always achieve the same functionality as plasma does for hidden/implicit assumptions/dependencies. But I really need your help to exhaust all cases that Ray has hidden dependencies on object store which are not captured by Client APIs, and We can implement these cases as unit tests for consistency check.

We will add more sections in REP later.

@rkooo567
Copy link
Contributor

[Call method directly] Raylet directly call plasma server methods. This can also be handled by XXXPlasmaProxy we can implement all the PlasmaStoreRunner functions in XXXPlasmaProxy, and forward them to a third-party store in this case.

It is more of a warning/heads-up (I think it is difficult to solve it when the object store is in a separate proc, and the communication is done by IPC), but I believe we use this to obtain the memory usage of the plasma store, and the reason why we use the direct function call is to avoid to obtain the stale memory usage. If this returns very stale memory usage, there's a possibility it will cause some issues in the memory management

[with visibility control] For a vineyard-like third-party object store that has a session/visibility/isolation mechanism, we can open a new session for each raylet (it is ok to have duplicated object ids that are separated in different sessions.)

[Situation 1] When raylet crashes, workers can not get objects from the third-party object-store. Since the third-party object store will know it via the heartbeat of XXXPlasmaProxy, then it will close the connection of the related client (the same session of XXXPlasmaProxy ). This is a recoverable failure for the vineyard, when the raylet restart, it can connect to the previous session.

Aren't these two conflicting? The first section seems to suggest opening a new session for all raylet restarts whereas the second section says to connect to the previous section. Also, the wording "session" is not well-defined and seems to be Vineyard-specific terminologies.

Overall, the new failure model is still a bit confusing to me (e.g., raylet failure is recoverable for Vineyard perspective given what you are saying whereas plasma store doesn't have these semantics). It'll be great if we can get more details here with more high-level generalized invariants. For example, one of invariants we should keep is "when the raylet is restarted, the storage should guarantee the object id from the previous raylet should be isolated to the new one".

@mengke-mk
Copy link
Contributor Author

mengke-mk commented Mar 18, 2022

but I believe we use this to obtain the memory usage of the plasma store, and the reason why we use the direct function call is to avoid to obtain the stale memory usage. If this returns very stale memory usage, there's a possibility it will cause some issues in the memory management.

If the IPC of third-party object-store really obstacles the performance of ray scheduler (e.g. obtain the memory usage of the plasma store, and today ray design adopts the direct function call is to avoid to obtain the stale memory usage). We can share meta between ray and third-party object store via SHM. For example, the third-party object-store writes the memory usage to SHM after each creation/deletion, and the ray read the memory usage from SHM when required.

The first section seems to suggest opening a new session for all raylet restarts whereas the second section says to connect to the previous section.

The first section here means third-party object-store should keep "when the raylet is restarted, the storage should guarantee the object id from the previous raylet should be isolated to the new one". Vineyard can meet the requirement due to the session/isolation mechanism.

The second section here means that if we restart a raylet when crashes. If we set the plasma_store_socket_name to the socket name of the previous session of vineyard, we can still get the objects before the crash happens. If the raylet restarts in a new session, then it can not see the objects before failure, and vinyeard has GC to handle the dangling objects for the previous session.

Overall, the new failure model is still a bit confusing to me (e.g., raylet failure is recoverable for Vineyard perspective given what you are saying whereas plasma store doesn't have these semantics).

By saying ”the failure is recoverable for vineyard“ means vineyard can recover only when raylet and XXXPlasmaProxy crashes, but it can not recover when the vineyard itself crashes, because the vineyard is running on another process and ray can not restart it. This may be different from the plasma store.

**Opportunities**: Vineyard can enhance the ray object store with more functionalities for modern data-intensive applications. To be specific, the benefits are four-fold:

- O1: Sharing data with systems out of Ray. Vineyard is a daemon in cluster that any system can put/get objects into/from it.
- O2: Composability. Objects will not be sealed twice when we create a new object form a existing one(e.g. add a new column for dataframe).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

form -> from

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fixed in the next commit.


## Summary - Plasma Alternative
### General Motivation
Ray is a general-purpose and powerful computing framework and makes it simple to scale any compute-intensive Python workload. By storing the objects in Plasma, data can be efficiently (0-copy) shared between Ray tasks. We propose to provide a plasma-like object store with more functionalities target for different situations (e.g. to better support [Kubernetes](https://kubernetes.io/)). In other words, we can simultaneously have multiple plasma object-store variants with different functionalities. (e.g. users can choose a variant at `ray.init()`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target -> targeted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fixed in the next commit.

### Shepherd of the Proposal (should be a senior committer)
To make the review process more productive, the owner of each proposal should identify a **shepherd** (should be a senior Ray committer). The shepherd is responsible for working with the owner and making sure the proposal is in good shape (with necessary information) before marking it as ready for broader review.

TBF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the shepherds are @scv119 and @rkooo567, according to ray-project/ray#22948

- (**Step 1**) Add a new option named `plasma_store_impl` to not launch a plasma runner in third-party mode.
- (**Step 1**) Add a new virtual class named `ClientImplInterface` for third-party object store to implement.
- (**Step 2**) Add a new `VineyardClientImp` to implement the `ClientImplInterface` and follw the above protocols.
- (**Step 3**) Add a new `PlasmaRunnerProxy ` as a helper to execute the callback of raylet.
Copy link
Contributor

@scv119 scv119 Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a bit clarity on how PlasmaRunnerProxy being integrated into Ray. My suggestion would be replace ObjectStoreRunner depending on the configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ObjectStoreRunner will be replaced in a third-party mode.


The Ray keeps two type of "reference count", one is `ref_count` in plasma to track the local plasma object, and the other is the `reference_count` in core_worker to track the primary copy. In this proposal, we will not change the logic of `reference_count`, and the vineyard server will have its own `ref_count`.

**Protocol #1**: A object will increase its `ref_count` iff the `Get(...)` , `TryCreateImmediately(...)`or `CreateAndSpillIfNeeded(...)` is invoked. ([Create](https://github.com/ray-project/ray/blob/bb4ff42eeca50a5b91dd569caafdd71bf771b66e/src/ray/object_manager/plasma/store.cc#L198), [Get](https://github.com/ray-project/ray/blob/bb4ff42eeca50a5b91dd569caafdd71bf771b66e/src/ray/object_manager/plasma/store.cc#L110))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would Invariant be a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Invariant now.


#### Object Delete

**Protocol #9:** `Delete()`a object only when (1) it exists, (2) has been sealed, (3) not used by any clients. or it will do nothing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also invariant on object_added and object_deleted callbacks. i.e. When an object is added the object_added callback is called, and object_deleted callback is called when an sealed object is deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Invariant #11 to guarantee all callbacks should be called when they reach their conditions

@scv119
Copy link
Contributor

scv119 commented Mar 23, 2022

@rkooo567 @jjyao any more comments?


We believe that if a third-party object store client can follow the above invariants when implementation the `ClientImplInterface`. Then the ray logic can be seamlessly built on top of the third-party object store.

#### Dupilicate IDs in multiple raylet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more like Object with same IDs from different Ray clusters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace of object id is cluster not raylet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed in next commit.

@mengke-mk mengke-mk force-pushed the plasma_alternative branch from 2a7e3e1 to f7170e8 Compare March 24, 2022 16:23
@mengke-mk
Copy link
Contributor Author

Hi, @scv119 @rkooo567 @jjyao, thanks for your comments! Do we reach a consensus or still need more discussion about this proposal?

Copy link
Contributor

@scv119 scv119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's see if @jjyao or @rkooo567 has more comment.

@scv119 scv119 merged commit dc2366e into ray-project:main Mar 30, 2022
@mengke-mk mengke-mk deleted the plasma_alternative branch March 31, 2022 01:20
@ericl
Copy link
Contributor

ericl commented Apr 9, 2022

Hmm, we should probably not merge REPs until they are approved by committer vote in the future.

(To be clear, this REP has not been accepted. A committer vote needs to be organized.)


**Opportunities**: Vineyard can enhance the ray object store with more functionalities for modern data-intensive applications. To be specific, the benefits are four-fold:

- O1: Sharing data with systems out of Ray. Vineyard is a daemon in cluster that any system can put/get objects into/from it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what it can do with Vineyard, but could you share more details about the benefit of this? Some real world user journey will help. And also could we build it on top of the current implementation instead of making the plasma store pluggable?

Comment on lines +149 to +153
**Invariant #5:** The spilling only happens in a object creation.(we trigger spilling when memory usage > threshold([triggle](https://github.com/ray-project/ray/blob/bb4ff42eeca50a5b91dd569caafdd71bf771b66e/src/ray/object_manager/plasma/store.cc#L174))) ([spill](https://github.com/ray-project/ray/blob/bb4ff42eeca50a5b91dd569caafdd71bf771b66e/src/ray/object_manager/plasma/create_request_queue.cc#L97))

**Invariant #6:** The spilling only happens when eviction can not meet the requirement. ([create object will first trigger eviction](https://github.com/ray-project/ray/blob/bb4ff42eeca50a5b91dd569caafdd71bf771b66e/src/ray/object_manager/plasma/object_lifecycle_manager.cc#L191))

**Invariant #7:** The spilling only happens in primary objects.(the spill_objects_callback will handle the spillng).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these invariants are too strict we should try to avoid them. I actually see some benefit there. For example, without #5, we can’t spill object on demand. The user might want to spill the object and persist it so it can be reused later across sessions. Or the system will spill the object in advance because the node is idle and the object is used in the other node which is very busy and that node has memory pressure. The same for #6.

Also for #7, it’s not necessary to restrict spilling only on primary objects. Theoretically it can happen in anywhere. Maybe the node with more CPU and IO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for us to define the lifecycle in ray so that it can be decoupled with the underline storage? Like the underline storage is just a key-value store.

@fishbone
Copy link
Contributor

I also have concerns about the increasing complexity of the system. Right now, plasma store can just be an embedding store in Raylet which is simple and clean. Although, it’s like a service, we actually can make it just a component (easier to manage). With this change, it means we need to treat the plasma store as an independent service on each node so the lifecycle of these two will change dramatically.

I could like the system define a clean abstraction before implementating so that the maintenance cost is controllable. Could you add more details about this?

@wuisawesome
Copy link

I have 2 questions here.

  1. Is this meant to be an internal refactor? Or is this meant for users to plug different stores in?

  2. If users are supposed to plug something in, how do they do that? Do they have to statically link? Dynamically link? Do we have to maintain ABI compatibility for a c++ interface?

My concern here is that this seems very difficult to maintain.

@kfstorm
Copy link
Member

kfstorm commented Apr 15, 2022

I'm also concerned about the maintainability of code and the stability of the invariants.

Previously, the invariants are just implementation details, and they did change various times. Now there's an external component that depends on this. What if we want to change the invariants in the future? How should we corporate? I'm worried the evolvement of object store may slow down due to this external dependency.

On the other hand, one thing I can think of that would benefit Ray but not mentioned in the REP, is that we can avoid cross-node data transfer on the same host if the Ray cluster is deployed on Kubernetes. Maybe we can go deeper about this.

Also curious about the real use cases.

@mengke-mk
Copy link
Contributor Author

@iycheng @wuisawesome @kfstorm. Thanks for your comments! I can see that most of the concerns are about maintainability. Actually, I think we have to make tradeoffs among maintainability, performance, and invariance when considering this proposal. Maintainability refers to good modularization and decoupling. Performance refers to fewer IPCs and fresher information for the scheduler. Invariance refers to keeping the behavior consistent with the current situation of plasma. IMHO, I think we can not achieve maintainability, performance, and invariance at the same time. So we have to make a choice, and this is what we are doing these weeks, trying to figure out what we have to retain and what we can release.

In the previous discussion, most of concerns are about invariance, such as to keep the same behaviors in failure model and spilling. So we try to list the invariances as @rkooo567 suggests.

It'll be great if we can get more details here with more high-level generalized invariants. For example, one of the invariants we should keep is "when the raylet is restarted, the storage should guarantee the object id from the previous raylet should be isolated to the new one". (@rkooo567)

Another concern is about performance as @rkooo567 and @jjyao said:

and the reason why we use the direct function call is to avoid obtaining the stale memory usage. If this returns very stale memory usage, there's a possibility it will cause some issues in the memory management.(@rkooo567)

Also the callback will have an IPC overhead instead of a current in-process function call, we may need to discuss performance as well. (@jjyao)

So we try to minimize IPC calls and avoid getting "stale memory usage". This is why we introduce the XXXPlasmaProxy to retain the callbacks.

And now we come to the maintainability. But before addressing the concerns above, let's forget the current REP temporarily and think about how to make trade-offs among maintainability, performance, and invariance for integrating with a third-party object store. As mentioned above, we have three options here by choosing two of these three goals:

  • [Maintainability + Performance] As @iycheng suggests, we can first decouple plasma from raylet, which means that before the integration with third-party object store, we need to move the code path of callbacks, spilling, life-cycle out of plasma (maybe into ObjectManager) and just make the plasma to be a clean and pure local object-store. This is a HUGE change that obviously breaks the invariance, but it is easy to maintain and easy to optimize for higher performance. If this option is chosen, we are also willing to get involved in the refactoring.
  • [Maintainability + Invariance] If you can tolerate more IPCs and stale memory usage, then we do have another option that does not change the plasma client interface nor add a new XXXPlasmaProxy. We can just process the plasma client requests in the plasma store by forwarding the request to the vineyard server (or third-party object store). This means we need two IPC calls instead of one (request path: plasma client ---> plasma store/vineyard client ---> vineyard server). In this option, the change only happens in the plasma store, thus it is easy to maintain and naturally follows the invariance of the current situation.
  • [Invariance + Performance] This is the current solution of this REP, we do not intend to change any logic of plasma, and just follow what plasma does in raylet like spilling, lifecycle, and failure model. Plasma store today is not like @iychen said: "can just be an embedding store in Raylet which is simple and clean". Actually, the Plasma store is a tightly-coupled with raylet, and it is running as a thread, passing callbacks in and out (ray/#8897), and I don't know the evolution direction of plasma store, is it going to be more tight-coupling with raylet or completely the opposite? therefore the only thing I can do is to follow the current restriction.

I think we have to Make a Choice among the three options and reach a consensus on the trade-offs since I think it is hard to come up with a perfect plan (If you have any better ideas about the integration, please let me know, that will be quite helpful).


Now, we can address the concerns for option 3:

also could we build it on top of the current implementation instead of making the plasma store pluggable? (@iycheng)

Third-party object stores may have a variety of benefits based on their design, making the plasma store pluggable may be easier to keep their code path independent. If we can loosen the restriction of invariance, a third-party may be allowed to not do anything when memory is full (just panic). However, the third-party object store is only for special scenarios(and they may do better in these scenarios), users can always choose the default plasma store. (users must be aware of what they want at what cost by switching to a third-party object-store.)

Is it possible for us to define the lifecycle in ray so that it can be decoupled with the underline storage? Like the underline storage is just a key-value store. (@iycheng)

If we choose option 1 and make the plasma store only a clean key-value store. This will be ok.

I think these invariants are too strict we should try to avoid them. I actually see some benefit there. For example, without # 5, we can’t spill objects on demand. The user might want to spill the object and persist it so it can be reused later across sessions. Or the system will spill the object in advance because the node is idle and the object is used in the other node which is very busy and that node has memory pressure. The same for # 6). (@iycheng)

Also for # 7, it’s not necessary to restrict spilling only on primary objects. Theoretically, it can happen anywhere. Maybe the node with more CPU and IO? (@iycheng)

It is ok to loosen the restriction, but your suggestions conflict with the shepherd (@scv119). Please decide whether the third-party object store has to do spilling or not.

Is this meant to be an internal refactor? Or is this meant for users to plug different stores in? (@wuisawesome)

This proposal is to provide ray with new abilities by integrating with a third-party object store, we want to make the vineyard the first one to integrate. The integration will do some internal refactor to make a third-party object store easier to integrate, and also allow users to choose different stores.

If users are supposed to plug something in, how do they do that? Do they have to statically link? Dynamically link? Do we have to maintain ABI compatibility for a c++ interface? (@wuisawesome)

Both static link and dynamic link can be ok, maybe we can export C interfaces for better ABI compatibility. For cooperation, we will also continuously maintain the code to keep the compatibility.

What if we want to change the invariants in the future? How should we corporate? I'm worried the evolvement of object stores may slow down due to this external dependency. (@kfstorm)

I think this depends on the trade-offs that ray people should make, In this proposal, we treat the invariants as something that won't be changed easily in the future.

@scv119
Copy link
Contributor

scv119 commented Apr 25, 2022

Hey @septicmk. Thanks for submitting the rep and contributing to the Ray community. After voting in Ray committers, we concluded that we won't accept this REP at this time. Concretely:

  1. The ray committers are generally in favor of the direction of having a clean interface for plasma.
  2. The main concern is the feasibility and maintainability of the proposed interface; this requires a fully working prototype to validate.
  3. Also, there are questions about if we have fully explored the design spaces and if there exists a better alternative.

Again, thanks for putting up the efforts to come up with the REP. As we discussed offline, Vineyard and Ray team will collaborate in the following mode:

  1. Ray will not support an alternative plasma store at the moment, and there is no guarantee that the interface won't change.
  2. Instead of proposed REP, Vineyard team will prototype and maintain the integration code as addon/plug-in/patch in one of their repos. As they prototype, they might work with Ray team and submit PRs to clean up Ray's plasma interface that brings net benefit to Ray.
  3. If the community accepts the API changes, and a net benefit to Ray can be demonstrated, and the introduction of Vineyard does not complicate the Ray development process, we may reconsider the plasma alternative REP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants