-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy paths3echo_collector.py
More file actions
151 lines (131 loc) · 5.83 KB
/
s3echo_collector.py
File metadata and controls
151 lines (131 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import annotations
import minio
import workflows.recipe
from workflows.services.common_service import CommonService
from dlstbx.util import iris
from dlstbx.util.iris import get_minio_client, update_dcid_info_file
class S3EchoCollector(CommonService):
"""
A service that keeps status of uploads to S3 Echo object store and does garbage collection of unreferenced data.
"""
# Human readable service name
_service_name = "S3EchoCollector"
# Logger name
_logger_name = "dlstbx.services.s3echocollector"
# STFC S3 Echo credentials
_s3echo_credentials = "/dls_sw/apps/zocalo/secrets/credentials-echo-mx.cfg"
def initializing(self):
"""
Register callback functions to upload and download data from S3 Echo object store.
"""
self.log.info(f"{S3EchoCollector._service_name} starting")
self.minio_client: minio.Minio = get_minio_client(
S3EchoCollector._s3echo_credentials
)
self._message_delay = 5
workflows.recipe.wrap_subscribe(
self._transport,
"s3echo.start",
self.on_start,
acknowledgement=True,
log_extender=self.extend_log,
)
workflows.recipe.wrap_subscribe(
self._transport,
"s3echo.end",
self.on_end,
acknowledgement=True,
log_extender=self.extend_log,
)
def on_start(self, rw, header, message):
"""
Process request for uploading images to S3 Echo object store.
"""
# Conditionally acknowledge receipt of the message
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
rw.transport.ack(header, transaction=txn)
params = rw.recipe_step["parameters"]
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)
bucket_name = params["bucket"]
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
rpid = int(params["rpid"])
s3echo_upload_files = {}
if images := params.get("images"):
dcid = int(params["dcid"])
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, 0, rpid, self.log
)
try:
image_files = iris.get_image_files(images, self.log)
s3echo_upload_files.update(
{name: (dcid, pth) for name, pth in image_files.items()}
)
except Exception:
self.log.exception("Error uploading image files to S3 Echo")
if not response_info:
self.log.debug("Sending message to upload endpoint")
rw.send_to(
"upload", {"s3echo_upload": {dcid: image_files}}, transaction=txn
)
rw.environment.update({"s3echo_upload": s3echo_upload_files})
self.log.debug("Sending message to watch endpoint")
rw.send_to("watch", message, transaction=txn)
elif params.get("related_images"):
for dcid, image_master_file in params.get("related_images"):
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, 0, rpid, self.log
)
try:
image_files = iris.get_related_images_files_from_h5(
image_master_file, self.log
)
s3echo_upload_files.update(
{name: (dcid, pth) for name, pth in image_files.items()}
)
if not response_info:
self.log.debug("Sending message to upload endpoint")
rw.send_to(
"upload",
{"s3echo_upload": {dcid: image_files}},
transaction=txn,
)
except Exception:
self.log.exception("Error uploading image files to S3 Echo")
rw.environment.update({"s3echo_upload": s3echo_upload_files})
self.log.debug("Sending message to watch endpoint")
rw.send_to("watch", message, transaction=txn)
rw.transport.transaction_commit(txn)
def on_end(self, rw, header, message):
"""
Remove reference to image data in S3 Echo object store after end of processing.
"""
# Conditionally acknowledge receipt of the message
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
rw.transport.ack(header, transaction=txn)
params = rw.recipe_step["parameters"]
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)
bucket_name = params["bucket"]
rpid = int(params["rpid"])
for dcid, _ in params.get("related_images", [(int(params["dcid"]), None)]):
response_info = update_dcid_info_file(
minio_client, bucket_name, dcid, None, None, self.log
)
if not response_info:
self.log.warning(f"No {dcid}_info data read from the object store")
elif response_info["status"] == -1 or (
response_info["status"] == 1 and response_info["pid"] == [rpid]
):
dc_objects = {
obj.object_name
for obj in minio_client.list_objects(bucket_name)
if obj.object_name is not None
}
for obj_name in dc_objects:
if obj_name.startswith(f"{dcid}_"):
minio_client.remove_object(bucket_name, obj_name)
else:
update_dcid_info_file(
minio_client, bucket_name, dcid, None, -rpid, self.log
)
rw.transport.transaction_commit(txn)