Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions neuro_san/deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ ENV AGENT_EXTERNAL_RESERVATIONS_STORAGE=""
# to use for cross-pod reservations storage.
ENV AGENT_RESERVATIONS_S3_BUCKET=""

# If AGENT_EXTERNAL_RESERVATIONS_STORAGE is set,
# then this parameter specifies the period in seconds for checking the external storage for expired reservations.
# Value of 0 means no periodic checks will be performed by the service,
# in this case it is expected that the external storage itself will handle expired reservations,
# or it will be done on demand by other tools.
ENV AGENT_RESERVATIONS_EXTERNAL_STORAGE_CHECK_PERIOD_SECONDS="0"

# A hocon file with MCP servers information to be used by LangChainMcpAdapter
# for connecting to external MCP servers with authentication and tool filtering.
ENV MCP_SERVERS_INFO_FILE=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ class AbstractReservationsStorage(ReservationsStorage, Startable):
Specific logic for adding, retrieving, and expiring reservations is left to concrete implementations.
"""

def __init__(self, storage_name: str = "", check_expirations_interval_seconds: float = 60.0):
def __init__(self, storage_name: str = "", check_expirations_interval_seconds: float = 0.0):
"""
Constructor
:param storage_name: A string name for this storage, used for logging purposes.
:param check_expirations_interval_seconds: The number of seconds between checks for expired reservations.
If set to 0 or negative, the background thread will not be started.
"""
super().__init__()
self._thread: threading.Thread = None
Expand All @@ -50,9 +51,13 @@ def __init__(self, storage_name: str = "", check_expirations_interval_seconds: f
self._name: str = storage_name

def start(self):
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
self._logger.debug("%s: Expiration cleanup thread started.", self._name)
if self._check_interval_seconds > 0:
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
self._logger.debug("%s: Expiration cleanup thread started with period %f sec.",
self._name, self._check_interval_seconds)
else:
self._logger.debug("%s: Expiration cleanup thread not started.", self._name)

def stop(self, timeout: Optional[float] = None):
"""
Expand Down
36 changes: 27 additions & 9 deletions neuro_san/service/watcher/temp_networks/s3_reservations_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,36 @@ class S3ReservationsStorage(AbstractReservationsStorage):
Stores reservations as JSON objects in an S3 bucket, with each reservation
stored in its associated agent spec as metadata.
"""
# pylint: disable=too-many-instance-attributes

def __init__(self, bucket_name: str = "", prefix: str = "reservations/",
check_expirations_interval_seconds: float = 60.0):
check_expirations_interval_seconds: float = 0.0):
"""
Initialize S3 reservations storage.

:param bucket_name: S3 bucket name (defaults to AGENT_RESERVATIONS_S3_BUCKET env var)
:param prefix: S3 key prefix for reservation objects
:param check_expirations_interval_seconds: How often to check for expired reservations
:param check_expirations_interval_seconds: How often to check for expired reservations.
If 0 or negative, expiration checks are disabled.
"""
# Our default for check_expirations_interval_seconds is 0
# because S3 expiration check is generally a significant execution load,
# and we may want to run it externally on demand rather than on a fixed schedule inside the service.
super().__init__(storage_name="s3_storage",
check_expirations_interval_seconds=check_expirations_interval_seconds)
self.logger: Logger = getLogger(self.__class__.__name__)

# Check if expiration interval is set by environment variable,
# and adjust it if so (overriding the constructor parameter)
envvar_name: str = "AGENT_RESERVATIONS_EXTERNAL_STORAGE_CHECK_PERIOD_SECONDS"
envvar_value: str = os.getenv(envvar_name, "0")
try:
expiration_check_period_seconds: float = float(envvar_value)
self._check_interval_seconds = expiration_check_period_seconds
except ValueError:
self.logger.warning(
"Invalid value for %s, must be a number. Got: %s", envvar_name, envvar_value)

# Configure bucket name from parameter or environment variable
env_bucket: str = os.getenv("AGENT_RESERVATIONS_S3_BUCKET", "")
self.bucket_name: str = bucket_name or env_bucket
Expand All @@ -80,7 +98,6 @@ def __init__(self, bucket_name: str = "", prefix: str = "reservations/",
self.last_sync_timestamp: float = 0.0
self.converter = ReservationDictionaryConverter()
self.max_keys_per_page: int = 1000 # Max allowed by S3 API for ListObjectsV2
self.logger: Logger = getLogger(self.__class__.__name__)

def start(self):
"""
Expand Down Expand Up @@ -213,12 +230,13 @@ def add_reservations(self, reservations_dict: Dict[Reservation, Any],

# Store as JSON object in S3 with proper content type
json_body: str = dumps(agent_spec, indent=4) # Pretty-printed JSON
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json_body,
ContentType="application/json"
)

put_function = partial(self.s3_client.put_object,
Bucket=self.bucket_name,
Key=key,
Body=json_body,
ContentType="application/json")
self._do_with_retries(put_function)

self.logger.debug("%s: Successfully stored reservation %s in S3", self._name, reservation_id)

Expand Down
Loading