Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 197 additions & 3 deletions src/forge/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
# LICENSE file in the root directory of this source tree.

from abc import ABC, abstractmethod
from typing import Any, Mapping

from monarch.actor import endpoint
from typing import Any, List, Mapping

from forge.controller import ForgeActor

from forge.types import Action, Message, Observation, Scalar, State

from monarch.actor import endpoint


class Transform(ABC):
"""Abstract base class for observation transforms.
Expand Down Expand Up @@ -90,6 +90,200 @@ async def update_weights(self):
pass


class StoreInterface(ABC):
"""
Abstract base class for a KV store. This closely follows the interface of
torchstore.
"""

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def numel(self, prefix=None) -> int:
"""Return the number of keys starting with the given prefix.
The prefix matching follows reverse domain name notation convention.

Args:
prefix (str): The prefix to match against stored keys.
For example, "xyz" matches "xyz.abc.def" but "xy" does not.
Note: None is the prefix of all keys, while "" is the prefix of keys
starting with "." and "" itself.

Returns:
int: The number of keys matching the prefix in the store.
"""
pass

@abstractmethod
async def keys(self, prefix=None) -> List[str]:
"""Return an iterable of all keys in the store matching the given prefix.
The prefix matching follows reverse domain name notation convention.

Args:
prefix (str): The prefix to match against stored keys.
For example, "xyz" matches "xyz.abc.def" but "xy" does not.
Note: None is the prefix of all keys, while "" is the prefix of keys
starting with "." and "" itself.

Returns:
Iterable[K]: An iterable containing all keys in the buffer.
"""
pass

@abstractmethod
async def put(self, key: str, value: Any) -> None:
"""
Add a key-value pair to the buffer.

Args:
key (K): The key to store the value under
val (V): The value to store in the buffer

Returns:
None
"""
pass

@abstractmethod
async def get(self, key: str) -> Any:
"""
Get a key-value pair from the store.

Args:
key (K): The key to get

Returns:
V: The value stored under the key

Raises:
KeyError: If the key does not exist in the store
"""
pass

@abstractmethod
async def exists(self, key: str) -> bool:
"""
Check if a key exists in the store.
"""
pass

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def release(self, key: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea: what's the inspiration here?

Copy link
Contributor Author

@casteryh casteryh Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say the trainer is at step 10 and will no longer need stuff from step 5. A reasonable thing to do would be simply mark all keys starting with replay_buffer.step_10 as released and move on, instead of waiting it to be actually deleted.

While from the torchstore side it's probably easier to implement this as instant deletion right now, it would be nice to have this semantics, for if and when we hit a scale where this matters.

Copy link
Contributor Author

@casteryh casteryh Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, on the other hand, since everything is implemented in Python. It's probably fast enough to just delete instantly since we don't deallocate memory when deleting. Indeed, currently all keys are held by a single process Controller actor in torchstore right now - so it makes less sense to reinvent GC ourself.

Things do get complicated if we need to shard the controller. And it's much easier to just not make any promises.

In this regard, we should probably remove the delete[_all] methods all together, as it would be a nightmare to do it correctly in a distributed setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be some perf gains here where we notify controller of delete and then let storage volumes garbage collect later.

"""
Release a key-value pair from the store.
This is a hint to the store that the key-value pair is no longer needed.
It is not guaranteed that the key-value pair will be deleted immediately.
As such, users should filter out the keys they don't want instead of relying on this method to ensure that the keys are deleted.

Args:
key (K): The key to release

Returns:
None
"""
pass

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def release_all(self, prefix: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between release and delete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And delete?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the difference between two functions release and delete

"""
Release all key-value pairs from the store that match the given prefix.
The prefix matching follows reverse domain name notation convention.

This is a hint to the store that the key-value pair is no longer needed.
It is not guaranteed that the key-value pair will be deleted immediately.
As such, users should filter out the keys they don't want instead of relying on this method to ensure that the keys are deleted.

Args:
prefix (str): The prefix to match against stored keys.
For example, "xyz" matches "xyz.abc.def" but "xy" does not.
Note: None is the prefix of all keys, while "" is the prefix of keys
starting with "." and "" itself.

Returns:
None
"""
pass

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def pop(self, key: str) -> Any:
"""
Get a key-value pair from the store, and delete it from the store.

Args:
key (K): The key to get

Returns:
V: The value stored under the key

Raises:
KeyError: If the key does not exist in the store
"""

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def pop_all(self, prefix: str | None) -> Any:
"""
Get a key-value pair from the store, and delete it from the store.

Args:
key (K): The key to get

Returns:
V: The value stored under the key

Raises:
KeyError: If the key does not exist in the store
"""

# TODO: add this to torchstore.
@abstractmethod
async def delete(self, key: str) -> None:
"""
Delete a key-value pair from the store.

Args:
key (K): The key to delete

Returns:
None

Raises:
KeyError: If the key does not exist in the store
"""
pass

# TODO(yuxuanh): add this to torchstore.
@abstractmethod
async def delete_all(self, prefix: str | None) -> None:
"""
Delete all key-value pairs from the store that match the given prefix.
The prefix matching follows reverse domain name notation convention.

This is a hint to the store that the key-value pair is no longer needed.
It is not guaranteed that the key-value pair will be deleted immediately.
As such, users should filter out the keys they don't want instead of relying on this method to ensure that the keys are deleted.

Args:
prefix (str): The prefix to match against stored keys.
For example, "xyz" matches "xyz.abc.def" but "xy" does not.
Note: None is the prefix of all keys, while "" is the prefix of keys
starting with "." and "" itself.


Args:
key (K): The key to delete

Returns:
None

Raises:
KeyError: If the key does not exist in the store
"""
pass


class BaseTokenizer(ABC):
"""
Abstract token encoding model that implements ``encode`` and ``decode`` methods.
Expand Down
Loading