Skip to content

Commit 1351a30

Browse files
Various fixes for instrument server problems (#409)
Addresses a few issues that have come up when running the web UI and instrument server. * There is no longer a client so all references to ClientEnvironment are changed to Session * Register processing job referred to a murfey_session_id which didn't exist * em-spa-refine processing jobs were not made by the instrument server * Copied the prometheus transferred file count function into the multigrid watcher as it was only in the client
1 parent 672d18c commit 1351a30

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,13 @@ def rsync_result(update: RSyncerUpdate):
223223
self.rsync_processes[update.base_path].enqueue(update.file_path)
224224

225225
self.rsync_processes[source].subscribe(rsync_result)
226+
self.rsync_processes[source].subscribe(
227+
partial(
228+
self._increment_transferred_files_prometheus,
229+
destination=destination,
230+
source=str(source),
231+
)
232+
)
226233
self.rsync_processes[source].subscribe(
227234
partial(
228235
self._increment_transferred_files,
@@ -399,6 +406,7 @@ def _start_dc(self, json, from_form: bool = False):
399406
"em-spa-extract",
400407
"em-spa-class2d",
401408
"em-spa-class3d",
409+
"em-spa-refine",
402410
):
403411
capture_post(
404412
f"{str(self._environment.url.geturl())}/visits/{str(self._environment.visit)}/{self.session_id}/register_processing_job",
@@ -465,6 +473,33 @@ def _increment_file_count(
465473
}
466474
requests.post(url, json=data)
467475

476+
# Prometheus can handle higher traffic so update for every transferred file rather
477+
# than batching as we do for the Murfey database updates in _increment_transferred_files
478+
def _increment_transferred_files_prometheus(
479+
self, update: RSyncerUpdate, source: str, destination: str
480+
):
481+
if update.outcome is TransferResult.SUCCESS:
482+
url = f"{str(self._environment.url.geturl())}/visits/{str(self._environment.visit)}/increment_rsync_transferred_files_prometheus"
483+
data_files = (
484+
[update]
485+
if update.file_path.suffix in self._data_suffixes
486+
and any(
487+
substring in update.file_path.name
488+
for substring in self._data_substrings
489+
)
490+
else []
491+
)
492+
data = {
493+
"source": source,
494+
"destination": destination,
495+
"session_id": self.session_id,
496+
"increment_count": 1,
497+
"bytes": update.file_size,
498+
"increment_data_count": len(data_files),
499+
"data_bytes": sum(f.file_size for f in data_files),
500+
}
501+
requests.post(url, json=data)
502+
468503
def _increment_transferred_files(
469504
self, updates: List[RSyncerUpdate], source: str, destination: str
470505
):

src/murfey/server/__init__.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,15 +171,15 @@ def get_job_ids(tilt_series_id: int, appid: int) -> JobIDs:
171171
db.ProcessingJob,
172172
db.DataCollection,
173173
db.DataCollectionGroup,
174-
db.ClientEnvironment,
174+
db.Session,
175175
)
176176
.where(db.TiltSeries.id == tilt_series_id)
177177
.where(db.DataCollection.tag == db.TiltSeries.tag)
178178
.where(db.ProcessingJob.id == db.AutoProcProgram.pj_id)
179179
.where(db.AutoProcProgram.id == appid)
180180
.where(db.ProcessingJob.dc_id == db.DataCollection.id)
181181
.where(db.DataCollectionGroup.id == db.DataCollection.dcg_id)
182-
.where(db.ClientEnvironment.session_id == db.TiltSeries.session_id)
182+
.where(db.Session.id == db.TiltSeries.session_id)
183183
).all()
184184
return JobIDs(
185185
dcgid=results[0][4].id,
@@ -1745,11 +1745,7 @@ def _register_3d_batch(message: dict, _db=murfey_db, demo: bool = False):
17451745
other_options = dict(feedback_params)
17461746

17471747
visit_name = (
1748-
_db.exec(
1749-
select(db.ClientEnvironment).where(
1750-
db.ClientEnvironment.session_id == message["session_id"]
1751-
)
1752-
)
1748+
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
17531749
.one()
17541750
.visit
17551751
)
@@ -2739,6 +2735,7 @@ def feedback_callback(header: dict, message: dict) -> None:
27392735
_transport_object.transport.ack(header)
27402736
return None
27412737
elif message["register"] == "processing_job":
2738+
murfey_session_id = message["session_id"]
27422739
logger.info("registering processing job")
27432740
assert isinstance(global_state["data_collection_ids"], dict)
27442741
dc = murfey_db.exec(

0 commit comments

Comments
 (0)