Skip to content

Commit 47e531f

Browse files
authored
Merge pull request #189 from AllenNeuralDynamics/feat-decouple-ads-from-watchdog
Decouple watchdog from aind-data-schema
2 parents d004d7b + 9090b50 commit 47e531f

File tree

3 files changed

+154
-247
lines changed

3 files changed

+154
-247
lines changed

src/clabe/data_transfer/aind_watchdog.py

Lines changed: 59 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,35 @@
1-
import importlib.util
2-
3-
if importlib.util.find_spec("aind_data_transfer_service") is None:
4-
raise ImportError(
5-
"The 'aind_data_transfer_service' package is required to use this module. \
6-
Install the optional dependencies defined in `project.toml' \
7-
by running `pip install .[aind-services]`"
8-
)
9-
101
import datetime
11-
import importlib.metadata
122
import json
133
import logging
144
import os
155
import subprocess
166
from os import PathLike
177
from pathlib import Path, PurePosixPath
18-
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Union
8+
from typing import Callable, ClassVar, Dict, List, Literal, Optional, Union
199

2010
import aind_data_transfer_service.models.core
2111
import pydantic
2212
import requests
23-
import semver
2413
import yaml
25-
from aind_data_schema.core.metadata import CORE_FILES
14+
from aind_behavior_services import AindBehaviorSessionModel
2615
from pydantic import BaseModel, SerializeAsAny, TypeAdapter
2716
from requests.exceptions import HTTPError
28-
from typing_extensions import deprecated
2917

30-
from .. import ui
3118
from ..services import ServiceSettings
3219
from ._aind_watchdog_models import (
3320
DEFAULT_TRANSFER_ENDPOINT,
3421
BucketType,
3522
ManifestConfig,
36-
Modality,
37-
Platform,
3823
WatchConfig,
3924
)
4025
from ._base import DataTransfer
4126

42-
if TYPE_CHECKING:
43-
from ..data_mapper.aind_data_schema import Session
44-
else:
45-
Session = Any
46-
47-
4827
logger = logging.getLogger(__name__)
4928

5029
TransferServiceTask = Dict[
5130
str, Union[aind_data_transfer_service.models.core.Task, Dict[str, aind_data_transfer_service.models.core.Task]]
5231
]
5332

54-
_AIND_DATA_SCHEMA_PKG_VERSION = semver.Version.parse(importlib.metadata.version("aind_data_schema"))
55-
5633

5734
class WatchdogSettings(ServiceSettings):
5835
"""
@@ -67,18 +44,20 @@ class WatchdogSettings(ServiceSettings):
6744
destination: Path
6845
schedule_time: Optional[datetime.time] = datetime.time(hour=20)
6946
project_name: str
70-
platform: Platform = "behavior"
71-
capsule_id: Optional[str] = None
72-
script: Optional[Dict[str, List[str]]] = None
7347
s3_bucket: BucketType = "private"
74-
mount: Optional[str] = None
75-
force_cloud_sync: bool = True
48+
force_cloud_sync: bool = False
7649
transfer_endpoint: str = DEFAULT_TRANSFER_ENDPOINT
7750
delete_modalities_source_after_success: bool = False
7851
extra_identifying_info: Optional[dict] = None
7952
upload_tasks: Optional[SerializeAsAny[TransferServiceTask]] = None
8053
job_type: str = "default"
81-
extra_modalities: Optional[List[Modality]] = None
54+
extra_modality_data: Optional[Dict[str, List[Path]]] = pydantic.Field(
55+
default=None, description="Additional modality data to include in the transfer"
56+
)
57+
mount: Optional[None] = pydantic.Field(default=None, deprecated=True)
58+
platform: Literal["behavior"] = pydantic.Field(default="behavior", deprecated=True)
59+
capsule_id: Optional[None] = pydantic.Field(default=None, deprecated=True)
60+
script: Optional[Dict[str, List[str]]] = pydantic.Field(default=None, deprecated=True)
8261

8362

8463
class WatchdogDataTransferService(DataTransfer[WatchdogSettings]):
@@ -103,11 +82,9 @@ def __init__(
10382
self,
10483
source: PathLike,
10584
settings: WatchdogSettings,
106-
aind_data_schema_session: Session,
85+
session: AindBehaviorSessionModel,
10786
*,
10887
validate: bool = True,
109-
session_name: Optional[str] = None,
110-
ui_helper: Optional[ui.UiHelper] = None,
11188
email_from_experimenter_builder: Optional[
11289
Callable[[str], str]
11390
] = lambda user_name: f"{user_name}@alleninstitute.org",
@@ -118,16 +95,15 @@ def __init__(
11895
Args:
11996
source: The source directory or file to monitor
12097
settings: Configuration for the watchdog service
121-
aind_data_schema_session: The session data schema
98+
session: The session data from aind-behavior-services
12299
validate: Whether to validate the project name
123100
session_name: Name of the session
124-
ui_helper: UI helper for user prompts
125101
email_from_experimenter_builder: Function to build email from experimenter name
126102
"""
127103
self._settings = settings
128104
self._source = source
129105

130-
self._aind_data_schema_session: Session = aind_data_schema_session
106+
self._session = session
131107

132108
_default_exe = os.environ.get("WATCHDOG_EXE", None)
133109
_default_config = os.environ.get("WATCHDOG_CONFIG", None)
@@ -148,8 +124,6 @@ def __init__(
148124

149125
self._watch_config = WatchConfig.model_validate(self._read_yaml(self.config_path))
150126

151-
self._ui_helper = ui_helper or ui.DefaultUIHelper()
152-
self._session_name = session_name
153127
self._email_from_experimenter_builder = email_from_experimenter_builder
154128

155129
def transfer(self) -> None:
@@ -176,23 +150,10 @@ def transfer(self) -> None:
176150

177151
logger.debug("Creating watchdog manifest config.")
178152

179-
if _AIND_DATA_SCHEMA_PKG_VERSION.major < 2:
180-
logger.warning(
181-
"Using deprecated AIND data schema version %s. Consider upgrading.", _AIND_DATA_SCHEMA_PKG_VERSION
182-
)
183-
self._manifest_config = self._create_manifest_config_from_ads_session(
184-
ads_session=self._aind_data_schema_session,
185-
session_name=self._session_name,
186-
)
187-
else:
188-
self._manifest_config = self._create_manifest_config_from_ads_acquisition(
189-
ads_session=self._aind_data_schema_session,
190-
session_name=self._session_name,
191-
)
192-
193153
if self._watch_config is None:
194154
raise ValueError("Watchdog config is not set.")
195155

156+
self._manifest_config = self._create_manifest_from_session(session=self._session)
196157
assert self._manifest_config.name is not None, "Manifest config name must be set."
197158
_manifest_path = self.dump_manifest_config(
198159
path=Path(self._watch_config.flag_dir) / self._manifest_config.name
@@ -255,21 +216,15 @@ def is_valid_project_name(self) -> bool:
255216
project_names = self._get_project_names()
256217
return self._settings.project_name in project_names
257218

258-
def _create_manifest_config_from_ads_acquisition(
259-
self,
260-
ads_session: Session,
261-
ads_schemas: Optional[List[os.PathLike]] = None,
262-
session_name: Optional[str] = None,
263-
) -> ManifestConfig:
219+
def _create_manifest_from_session(self, session: AindBehaviorSessionModel) -> ManifestConfig:
264220
"""
265-
Creates a ManifestConfig from an aind-data-schema acquisition.
221+
Creates a ManifestConfig from an aind-behavior-services session.
266222
267-
For aind-data-schema versions >= 2.0. Converts acquisition metadata into
223+
Converts session metadata into
268224
a manifest configuration for the watchdog service.
269225
270226
Args:
271-
ads_session: The aind-data-schema acquisition data
272-
ads_schemas: Optional list of schema files
227+
session: The aind-behavior-services session data
273228
session_name: Name of the session
274229
275230
Returns:
@@ -278,10 +233,9 @@ def _create_manifest_config_from_ads_acquisition(
278233
Raises:
279234
ValueError: If the project name is invalid
280235
"""
281-
processor_full_name = ",".join(ads_session.experimenters) or os.environ.get("USERNAME", "unknown")
282236

283-
if (len(ads_session.experimenters) > 0) and self._email_from_experimenter_builder is not None:
284-
user_email = self._email_from_experimenter_builder(ads_session.experimenters[0])
237+
if (len(session.experimenter) > 0) and self._email_from_experimenter_builder is not None:
238+
user_email = self._email_from_experimenter_builder(session.experimenter[0])
285239
else:
286240
user_email = None
287241

@@ -293,97 +247,24 @@ def _create_manifest_config_from_ads_acquisition(
293247
if self._settings.project_name not in project_names:
294248
raise ValueError(f"Project name {self._settings.project_name} not found in {project_names}")
295249

296-
ads_schemas = self._find_ads_schemas(source) if ads_schemas is None else ads_schemas
297-
modalities = {
298-
str(modality.abbreviation): [Path(path.resolve()) for path in [source / str(modality.abbreviation)]]
299-
for modality in ads_session.data_streams[0].modalities
300-
}
301-
if self._settings.extra_modalities is not None:
302-
for modality in self._settings.extra_modalities:
303-
modalities[str(modality)] = [Path(path.resolve()) for path in [source / str(modality)]]
304-
305-
_manifest_config = ManifestConfig(
306-
name=session_name,
307-
modalities=modalities,
308-
subject_id=int(ads_session.subject_id),
309-
acquisition_datetime=ads_session.acquisition_start_time,
310-
schemas=[Path(value) for value in ads_schemas],
311-
destination=Path(destination),
312-
mount=self._settings.mount,
313-
processor_full_name=processor_full_name,
314-
project_name=self._settings.project_name,
315-
schedule_time=self._settings.schedule_time,
316-
platform=self._settings.platform,
317-
capsule_id=self._settings.capsule_id,
318-
s3_bucket=self._settings.s3_bucket,
319-
script=self._settings.script if self._settings.script else {},
320-
force_cloud_sync=self._settings.force_cloud_sync,
321-
transfer_endpoint=self._settings.transfer_endpoint,
322-
delete_modalities_source_after_success=self._settings.delete_modalities_source_after_success,
323-
extra_identifying_info=self._settings.extra_identifying_info,
324-
)
325-
326-
_manifest_config = self._make_transfer_args(
327-
_manifest_config,
328-
add_default_tasks=True,
329-
extra_tasks=self._settings.upload_tasks or {},
330-
job_type=self._settings.job_type,
331-
user_email=user_email,
332-
)
333-
return _manifest_config
334-
335-
@deprecated("Use _create_manifest_config_from_ads_acquisition instead. This will be removed in a future release.")
336-
def _create_manifest_config_from_ads_session(
337-
self,
338-
ads_session: Session,
339-
ads_schemas: Optional[List[os.PathLike]] = None,
340-
session_name: Optional[str] = None,
341-
) -> ManifestConfig:
342-
"""
343-
Creates a ManifestConfig from an aind-data-schema session.
344-
345-
Deprecated: Use _create_manifest_config_from_ads_acquisition instead.
346-
347-
Args:
348-
ads_session: The aind-data-schema session data
349-
ads_schemas: Optional list of schema files
350-
session_name: Name of the session
351-
352-
Returns:
353-
A ManifestConfig object
354-
355-
Raises:
356-
ValueError: If the project name is invalid
357-
"""
358-
processor_full_name = ",".join(ads_session.experimenter_full_name) or os.environ.get("USERNAME", "unknown")
359-
360-
if (len(ads_session.experimenter_full_name) > 0) and self._email_from_experimenter_builder is not None:
361-
user_email = self._email_from_experimenter_builder(ads_session.experimenter_full_name[0])
362-
else:
363-
user_email = None
364-
365-
destination = Path(self._settings.destination).resolve()
366-
source = Path(self._source).resolve()
367-
368-
if self._validate_project_name:
369-
project_names = self._get_project_names()
370-
if self._settings.project_name not in project_names:
371-
raise ValueError(f"Project name {self._settings.project_name} not found in {project_names}")
250+
_modality_candidates = self._find_modality_candidates(source)
372251

373-
ads_schemas = self._find_ads_schemas(source) if ads_schemas is None else ads_schemas
252+
if self._settings.extra_modality_data is not None:
253+
for modality, paths in self._settings.extra_modality_data.items():
254+
if modality in _modality_candidates:
255+
_modality_candidates[modality].extend(paths)
256+
else:
257+
_modality_candidates[modality] = paths
374258

375259
_manifest_config = ManifestConfig(
376-
name=session_name,
377-
modalities={
378-
str(modality.abbreviation): [Path(path.resolve()) for path in [source / str(modality.abbreviation)]]
379-
for modality in ads_session.data_streams[0].stream_modalities
380-
},
381-
subject_id=int(ads_session.subject_id),
382-
acquisition_datetime=ads_session.session_start_time,
383-
schemas=[Path(value) for value in ads_schemas],
260+
name=self._session.session_name,
261+
modalities={m: [Path(p) for p in paths] for m, paths in _modality_candidates.items()},
262+
subject_id=int(session.subject),
263+
acquisition_datetime=session.date,
264+
schemas=[Path(value) for value in self._find_schema_candidates(source)],
384265
destination=Path(destination),
385266
mount=self._settings.mount,
386-
processor_full_name=processor_full_name,
267+
processor_full_name=",".join(session.experimenter),
387268
project_name=self._settings.project_name,
388269
schedule_time=self._settings.schedule_time,
389270
platform=self._settings.platform,
@@ -509,9 +390,9 @@ def _interpolate_from_manifest(
509390
return _adapter.validate_json(updated_literal)
510391

511392
@staticmethod
512-
def _find_ads_schemas(source: PathLike) -> List[PathLike]:
393+
def _find_schema_candidates(source: PathLike) -> List[Path]:
513394
"""
514-
Finds aind-data-schema schema files in the source directory.
395+
Finds json files in the source directory
515396
516397
Args:
517398
source: The source directory to search
@@ -520,11 +401,30 @@ def _find_ads_schemas(source: PathLike) -> List[PathLike]:
520401
A list of schema file paths
521402
"""
522403
json_files = []
523-
for core_file in CORE_FILES:
524-
json_file = Path(source) / f"{core_file}.json"
525-
if json_file.exists():
526-
json_files.append(json_file)
527-
return [path for path in json_files]
404+
for file in Path(source).glob("*.json"):
405+
json_files.append(file)
406+
return json_files
407+
408+
@staticmethod
409+
def _find_modality_candidates(source: PathLike) -> Dict[str, List[Path]]:
410+
"""
411+
Finds modality files in the source directory.
412+
413+
Args:
414+
source: The source directory to search
415+
Returns:
416+
A list of modality directory paths
417+
"""
418+
# The assumption is that the modality directories are named after the modality abbreviations
419+
_candidates = aind_data_transfer_service.models.core.Modality.abbreviation_map.keys()
420+
modality_dirs = {}
421+
for _dir in Path(source).iterdir():
422+
if _dir.is_dir() and _dir.name in _candidates:
423+
modality_dirs[_dir.name] = [_dir.resolve()]
424+
continue
425+
if _dir.is_dir():
426+
logger.warning("Directory %s is not a recognized modality directory. Will not be appended", _dir.name)
427+
return modality_dirs
528428

529429
@staticmethod
530430
def _get_project_names(
@@ -660,12 +560,3 @@ def _read_yaml(path: PathLike) -> dict:
660560
"""
661561
with open(path, "r", encoding="utf-8") as f:
662562
return yaml.safe_load(f)
663-
664-
def prompt_input(self) -> bool:
665-
"""
666-
Prompts the user to confirm whether to generate a manifest.
667-
668-
Returns:
669-
True if the user confirms, False otherwise
670-
"""
671-
return self._ui_helper.prompt_yes_no_question("Would you like to generate a watchdog manifest (Y/N)?")

tests/data_transfer/test_aind_watchdog.py

Whitespace-only changes.

0 commit comments

Comments
 (0)