Skip to content

Commit 9332b0a

Browse files
committed
wip
1 parent e1dfa6c commit 9332b0a

File tree

16 files changed

+297
-210
lines changed

16 files changed

+297
-210
lines changed

src/epu_data_intake/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ python tests/epu_data_intake/epu_output_simulator.py ../test-dir \
9898
> for instantiating a new grid entity in the internal datastore.
9999
100100
A `watch` operation is designed to gracefully handle one of the following invocation scenarios:
101+
101102
1. watcher launched _before_ EPU starts writing to filesystem - only watcher is necessary
102103
2. watcher launched _after_ EPU starts writing to filesystem - both parser and watcher are
103104
necessary to pickup pre-existing and new writes

src/epu_data_intake/__main__.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88
from pathlib import Path
99
from watchdog.observers import Observer
1010

11+
from src.epu_data_intake.core_api_client import SmartEMCoreAPIClient
1112
from src.epu_data_intake.data_model import EpuAcquisitionSessionStore
1213
from src.epu_data_intake.fs_parser import EpuParser
1314
from src.epu_data_intake.fs_watcher import (
1415
DEFAULT_PATTERNS,
15-
RateLimitedHandler,
16+
RateLimitedFilesystemEventHandler,
1617
)
1718

1819
# Create a callback to handle the verbose flag at the root level
@@ -174,7 +175,10 @@ def watch_directory(
174175
dry_run: bool = typer.Option(
175176
False, "--dry_run", "-n", help="Enables dry run mode, writing data in-memory and not posting to Core's HTTP API"
176177
),
177-
# TODO consider providing via env but allowing override via CLI. Investigate how Win env vars work or if they do even
178+
# TODO
179+
# - consider providing via env but allowing override via CLI.
180+
# - Investigate how Win env vars work.
181+
# - Accept a hard-coded value at build time
178182
api_url: str = typer.Option(
179183
"http://127.0.0.1:8000", "--api-url", "-a", help="URL for the Core API (required unless in dry run mode)"
180184
),
@@ -196,26 +200,35 @@ def watch_directory(
196200
logging.error(f"Error: Directory {path} does not exist")
197201
raise typer.Exit(1)
198202

199-
if not dry_run and not api_url:
200-
logging.error("Error: API URL must be provided when not in dry run mode")
201-
raise typer.Exit(1)
203+
if not dry_run:
204+
try:
205+
import asyncio
202206

203-
logging.info(f"Starting to watch directory: {str(path)} (including subdirectories) for patterns: {patterns}")
207+
async def check_api():
208+
async with SmartEMCoreAPIClient(api_url) as client:
209+
status_data = await client.get_status()
210+
return status_data
204211

205-
observer = Observer()
212+
status_result = asyncio.run(check_api())
213+
logging.info(f"API is reachable at {api_url} - Status: {status_result.get('status', 'unknown')}")
214+
except Exception as e:
215+
logging.error(f"Error: API at {api_url} is not reachable: {str(e)}")
216+
raise typer.Exit(1)
217+
218+
logging.info(f"Starting to watch directory: {str(path)} (including subdirectories) for patterns: {patterns}")
206219

207220
# TODO verify verbose parameter is still needed If it's *only* used for logging verbosity - check if logging
208221
# that's already set up here is being used, making this param redundant
209-
handler = RateLimitedHandler(patterns, log_interval, verbose)
210-
handler.set_watch_dir(path)
211-
handler.init_datastore(dry_run, api_url)
222+
handler = RateLimitedFilesystemEventHandler(path, dry_run, api_url, verbose, log_interval, patterns)
212223

213224
logging.info("Parsing existing directory contents...")
214-
# TODO settle a potential race condition if one exists:
225+
# TODO settle a potential race condition between parser and watcher if one exists:
215226
handler.datastore = EpuParser.parse_epu_output_dir(handler.datastore, verbose)
216227
# TODO ^ verify verbose parameter is still needed If it's *only* used for logging verbosity - check if logging
217228
# that's already set up here is being used, making this param redundant
229+
218230
logging.info("..done! Now listening for new filesystem events")
231+
observer = Observer()
219232
observer.schedule(handler, str(path), recursive=True)
220233

221234
def handle_exit(signum, frame):

src/epu_data_intake/core_api_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class MicrographStatus(str, Enum):
4848
# Request Models
4949
class AcquisitionCreateRequest(BaseModel):
5050
name: str
51-
epu_id: str | None = None
51+
id: str | None = None
5252
status: AcquisitionStatus | None = None
5353
start_time: datetime | None = None
5454
end_time: datetime | None = None
@@ -61,7 +61,7 @@ class AcquisitionCreateRequest(BaseModel):
6161

6262
class AcquisitionUpdateRequest(BaseModel):
6363
name: str | None = None
64-
epu_id: str | None = None
64+
id: str | None = None
6565
status: AcquisitionStatus | None = None
6666
start_time: datetime | None = None
6767
end_time: datetime | None = None

src/epu_data_intake/core_api_client_adapter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def create(self, entity_type: str, entity_id: str, entity: T,
8080
# Handle acquisition creation - no parent
8181
request = AcquisitionCreateRequest(
8282
name=entity.name,
83-
epu_id=entity.id,
83+
id=entity.id,
8484
start_time=entity.start_time,
8585
storage_path=entity.storage_path,
8686
atlas_path=entity.atlas_path,

src/epu_data_intake/data_model.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from datetime import datetime
55
from pathlib import Path
66
from typing import Generic, TypeVar
7+
from uuid import uuid4
8+
from base64 import urlsafe_b64encode
79

810
T = TypeVar("T")
911

@@ -16,6 +18,10 @@ class EntityStore(Generic[T]):
1618
_in_memory_only: bool = True
1719
_parent_entity: tuple[str, str] = None # (entity_type, entity_id) tuple
1820

21+
@staticmethod
22+
def generate_uuid():
23+
return urlsafe_b64encode(uuid4().bytes).decode('ascii').rstrip('=')
24+
1925
def set_api_client(self, api_client, entity_type: str, in_memory_only: bool = True):
2026
self._api_client = api_client
2127
self._entity_type = entity_type
@@ -24,24 +30,25 @@ def set_api_client(self, api_client, entity_type: str, in_memory_only: bool = Tr
2430
def set_parent_entity(self, parent_type: str, parent_id: str):
2531
self._parent_entity = (parent_type, parent_id)
2632

27-
def add(self, id: str, item: T) -> None:
28-
is_update = id in self._items
29-
self._items[id] = item
33+
def add(self, uuid: str | None, item: T) -> None:
34+
if not uuid: uuid = EntityStore.generate_uuid()
35+
is_new = uuid not in self._items
36+
self._items[uuid] = item
3037

3138
if not self._in_memory_only and self._api_client:
3239
try:
33-
if is_update:
34-
self._api_client.update(self._entity_type, id, item, self._parent_entity)
40+
if is_new:
41+
self._api_client.create(self._entity_type, uuid, item, self._parent_entity)
3542
else:
36-
self._api_client.create(self._entity_type, id, item, self._parent_entity)
43+
self._api_client.update(self._entity_type, uuid, item, self._parent_entity)
3744
except Exception as e:
38-
logging.error(f"API sync failed for {self._entity_type}/{id}: {str(e)}")
45+
logging.error(f"API sync failed for {self._entity_type}/{uuid}: {str(e)}")
3946

40-
def get(self, id: str) -> T | None:
41-
return self._items.get(id)
47+
def get(self, uuid: str) -> T | None:
48+
return self._items.get(uuid)
4249

43-
def exists(self, id: str) -> bool:
44-
return id in self._items
50+
def exists(self, uuid: str) -> bool:
51+
return uuid in self._items
4552

4653
def ids(self) -> set[str]:
4754
return set(self._items.keys())
@@ -256,9 +263,9 @@ class AtlasData:
256263

257264
@dataclass
258265
class EpuSessionData:
259-
name: str
260-
id: str
261-
start_time: datetime
266+
name: str = "Unknown"
267+
id: str = EntityStore.generate_uuid()
268+
start_time: datetime | None = None
262269
atlas_path: str | None = None
263270
storage_path: str | None = None # Path of parent directory containing the epu session dir
264271
clustering_mode: str | None = None
@@ -307,19 +314,22 @@ class EpuAcquisitionSessionStore:
307314
root_dir: Path
308315
in_memory_only: bool = False
309316
api_client = None
317+
uuid: str | None = None
310318

311-
def __init__(self, root_dir: str, dry_run: bool = False, api_url: str = None):
319+
def __init__(self, root_dir: str, in_memory_only: bool = False, api_url: str = None):
312320
self.root_dir = Path(root_dir)
313-
self.in_memory_only = dry_run
321+
self.in_memory_only = in_memory_only
322+
self.acquisition = EpuSessionData()
314323
self.grids = EntityStore()
315324

316-
if not dry_run and api_url:
325+
if not self.in_memory_only:
317326
from src.epu_data_intake.core_api_client_adapter import ApiClientAdapter
318327
self.api_client = ApiClientAdapter(api_url)
319-
self.grids.set_api_client(self.api_client, "grid", dry_run)
328+
self.api_client.create("acquisition", self.acquisition.id, self.acquisition, None)
329+
self.grids.set_api_client(self.api_client, "grid", in_memory_only)
320330

321-
def add_grid(self, grid_id: str, grid: Grid):
322-
"""Add a grid with proper API configuration"""
331+
def add_grid(self, grid: Grid, grid_id: str | None = None):
332+
grid_id = grid_id or EntityStore.generate_uuid()
323333

324334
# Configure the grid's EntityStores for the hierarchy:
325335
# session -> grid -> gridsquare -> foilhole -> micrograph
@@ -363,7 +373,7 @@ def get_grid_by_path(self, path: str):
363373
return None
364374

365375
def __str__(self):
366-
result = f"\nEPU Acquisition Summary:\n Root dir: {self.root_dir}\n Grids: {len(self.grids)}\n"
376+
result = f"\nEPU Acquisition Summary:\n UUID: {self.uuid}\n Root dir: {self.root_dir}\n Grids: {len(self.grids)}\n"
367377

368378
# Add each grid's string representation. TODO debug this doesn't seem to work
369379
for grid_id, grid in self.grids.items():

src/epu_data_intake/fs_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ def parse_epu_output_dir(datastore: EpuAcquisitionSessionStore, verbose: bool =
695695
# Start with locating all EpuSession.dm files - init a grid for each found
696696
for epu_session_manifest in list(datastore.root_dir.glob("**/*EpuSession.dm")):
697697
grid = EpuParser.parse_grid_dir(str(Path(epu_session_manifest).parent), verbose)
698-
datastore.grids.add(grid.session_data.name, grid)
698+
datastore.add_grid(grid)
699699

700700
return datastore
701701

src/epu_data_intake/fs_watcher.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
]
4242

4343

44-
class RateLimitedHandler(FileSystemEventHandler):
44+
class RateLimitedFilesystemEventHandler(FileSystemEventHandler):
4545
"""File system event handler with rate limiting capabilities.
4646
4747
This handler processes file system events from watchdog, specifically watching
@@ -70,7 +70,8 @@ class RateLimitedHandler(FileSystemEventHandler):
7070
datastore: EpuAcquisitionSessionStore | None = None
7171
watch_dir: Path | None = None
7272

73-
def __init__(self, patterns: list[str], log_interval: float = 10.0, verbose: bool = False):
73+
# TODO test with a lower log_interval value, set lowest possible default, better naming
74+
def __init__(self, watch_dir, dry_run: bool = False, api_url: str | None = None, verbose: bool = False, log_interval: float = 10.0, patterns: list[str] = DEFAULT_PATTERNS):
7475
self.last_log_time = time.time()
7576
self.log_interval = log_interval
7677
self.patterns = patterns
@@ -82,7 +83,10 @@ def __init__(self, patterns: list[str], log_interval: float = 10.0, verbose: boo
8283
# Maintain a buffer of "orphaned" files - files that appear to belong to a grid that doesn't exist yet
8384
self.orphaned_files = {} # path -> (event, timestamp, file_stat)
8485

85-
# TODO unit test thi method
86+
self._set_watch_dir(watch_dir)
87+
self._init_datastore(dry_run, api_url)
88+
89+
# TODO unit test this method
8690
def matches_pattern(self, path: str) -> bool:
8791
try:
8892
rel_path = str(Path(path).relative_to(self.watch_dir))
@@ -91,29 +95,28 @@ def matches_pattern(self, path: str) -> bool:
9195
except ValueError:
9296
return False
9397

94-
def set_watch_dir(self, path: Path):
95-
self.watch_dir = path.absolute() # TODO this could cause problems in Win
98+
# TODO on Win there's primary and secondary output dirs - work directly with primary if possible otherwise
99+
# operate across both. Note: data is first written to primary output dir then later maybe partially copied
100+
# to secondary dir.
101+
def _set_watch_dir(self, path: Path):
102+
self.watch_dir = path.absolute() # TODO this could cause problems in Win - test!
96103

97-
def init_datastore(self, dry_run: bool = False, api_url: str = None):
98-
logging.info(f"Instantiated new datastore, dry run: {dry_run}")
104+
def _init_datastore(self, dry_run: bool = False, api_url: str = None):
99105
self.datastore = EpuAcquisitionSessionStore(str(self.watch_dir), dry_run, api_url)
106+
logging.debug(f"Instantiated new datastore, " + ("in-memory only" if self.datastore.in_memory_only else "data will be permanently saved on the backend"))
107+
if self.datastore.in_memory_only:
108+
logging.info(f"Acquisition session uuid assigned: {self.datastore.uuid}")
100109

110+
# TODO Enhancement: log all events to graylog (if reachable) for session debugging and playback
101111
def on_any_event(self, event):
102-
if self.watch_dir is None:
103-
raise RuntimeError("watch_dir not initialized - call set_watch_dir() first")
104-
if self.datastore is None:
105-
raise RuntimeError("datastore not initialized - call init_datastore() first")
106-
107-
# Enhancement: record all events to graylog (if reachable) for session debugging and playback
108-
109112
if event.is_directory or not self.matches_pattern(event.src_path):
110113
if event.is_directory:
111-
logging.info(f"Skipping non-matching path: {event.src_path}")
114+
logging.debug(f"Skipping non-matching path: {event.src_path}")
112115
return
113116

114117
if event.event_type not in self.watched_event_types:
115118
if event.is_directory:
116-
logging.info(f"Skipping non-matching event type: {event.event_type}")
119+
logging.debug(f"Skipping non-matching event type: {event.event_type}")
117120
return
118121

119122
current_time = time.time()
@@ -135,18 +138,15 @@ def on_any_event(self, event):
135138
if new_file_detected and re.search(EpuParser.session_dm_pattern, event.src_path):
136139
assert self.datastore.get_grid_by_path(event.src_path) is None # guaranteed because is a new file
137140
grid = Grid(str(Path(event.src_path).parent.resolve()))
138-
session_data = EpuParser.parse_epu_session_manifest(
139-
str(Path(event.src_path).resolve())
140-
) # just to get the name really, techdebt
141-
self.datastore.grids.add(session_data.name, grid)
141+
self.datastore.add_grid(grid)
142142

143143
# try to work out which grid the touched file relates to
144144
grid_id = self.datastore.get_grid_by_path(event.src_path)
145145
if grid_id is None:
146146
# This must be an orphaned file since it matched one of patterns for files we are interested in,
147147
# but a containing grid doesn't exist yet - store it for when we have the grid.
148148
if self.verbose:
149-
logging.info(
149+
logging.debug(
150150
f"Could not determine which grid this data belongs to: {event.src_path}, adding to orphans"
151151
)
152152
self.orphaned_files[event.src_path] = (event, current_time, file_stat)
@@ -169,27 +169,27 @@ def on_any_event(self, event):
169169
self._on_micrograph_detected(path, grid_id, new_file_detected)
170170

171171
def _on_session_detected(self, path: str, grid_id, is_new_file: bool = True):
172-
logging.info(f"Session manifest {'detected' if is_new_file else 'updated'}: {path}")
172+
logging.debug(f"Session manifest {'detected' if is_new_file else 'updated'}: {path}")
173173
session_data = EpuParser.parse_epu_session_manifest(path)
174174
gridstore = self.datastore.grids.get(grid_id)
175175

176176
if gridstore and session_data != gridstore.session_data:
177177
# Create the acquisition first if we're not in dry run mode
178178
if not self.datastore.in_memory_only and self.datastore.api_client:
179-
success = self.datastore.api_client.create("acquisition", session_data.id, session_data)
179+
success = self.datastore.api_client.create("acquisition", self.datastore.uuid, session_data)
180180
if success:
181-
logging.info(f"Created acquisition for session {session_data.name}")
181+
logging.info(f"Created acquisition for session #{self.datastore.uuid} {session_data.name}")
182182

183183
gridstore.session_data = session_data
184-
logging.info(f"Updated session data for grid {grid_id}")
184+
logging.debug(f"Updated session data for grid {grid_id}")
185185
logging.info(gridstore.session_data)
186186

187187
def _process_orphaned_files(self, grid_id):
188188
"""Process any orphaned files that belong to this grid"""
189189
for path, (event, timestamp, file_stat) in self.orphaned_files.items():
190190
# Check if this orphaned file belongs to the new grid
191191
if self.datastore.get_grid_by_path(path) == grid_id:
192-
logging.info(f"Processing previously orphaned file: {path}")
192+
logging.debug(f"Processing previously orphaned file: {path}")
193193
self.on_any_event(event) # Process the file as if we just received the event
194194

195195
# Create a new dictionary excluding the processed files
@@ -198,15 +198,15 @@ def _process_orphaned_files(self, grid_id):
198198
}
199199

200200
def _on_atlas_detected(self, path: str, grid_id, is_new_file: bool = True):
201-
logging.info(f"Atlas {'detected' if is_new_file else 'updated'}: {path}")
201+
logging.debug(f"Atlas {'detected' if is_new_file else 'updated'}: {path}")
202202
gridstore = self.datastore.grids.get(grid_id)
203203
atlas_data = EpuParser.parse_atlas_manifest(path)
204204
if atlas_data != gridstore.atlas_data:
205205
gridstore.atlas_data = atlas_data
206206
logging.info(gridstore.atlas_data)
207207

208208
def _on_gridsquare_metadata_detected(self, path: str, grid_id, is_new_file: bool = True):
209-
logging.info(f"Gridsquare metadata {'detected' if is_new_file else 'updated'}: {path}")
209+
logging.debug(f"Gridsquare metadata {'detected' if is_new_file else 'updated'}: {path}")
210210

211211
gridsquare_id = EpuParser.gridsquare_dm_file_pattern.search(path).group(1)
212212
assert gridsquare_id is not None, f"gridsquare_id should not be None: {gridsquare_id}"

0 commit comments

Comments
 (0)