Skip to content

Commit e25df4d

Browse files
committed
Do not pass around the queue object, but initialize one for the object
1 parent bdef7f9 commit e25df4d

File tree

3 files changed

+122
-26
lines changed

3 files changed

+122
-26
lines changed

caso/messenger/ssm.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,17 @@ def __init__(self):
6868
except Exception as err:
6969
LOG.error(f"Failed to create path {CONF.ssm.output_path} because {err}")
7070
raise err
71+
self.queue = dirq.QueueSimple.QueueSimple(CONF.ssm.output_path)
7172

72-
def _push_message_cloud(
73-
self, queue: dirq.QueueSimple.QueueSimple, entries: typing.List[str]
74-
):
73+
def _push_message_cloud(self, entries: typing.List[str]):
7574
"""Push a compute message, formatted following the CloudRecord."""
7675
message = f"APEL-cloud-message: v{self.version_cloud}\n"
7776
aux = "\n%%\n".join(entries)
7877
message += f"{aux}\n"
79-
queue.add(message.encode("utf-8"))
78+
self.queue.add(message.encode("utf-8"))
8079

8180
def _push_message_json(
8281
self,
83-
queue: dirq.QueueSimple.QueueSimple,
8482
entries: typing.List[str],
8583
msg_type: str,
8684
version: str,
@@ -91,25 +89,19 @@ def _push_message_json(
9189
"Version": version,
9290
"UsageRecords": [json.loads(r) for r in entries],
9391
}
94-
queue.add(json.dumps(message))
92+
self.queue.add(json.dumps(message))
9593

96-
def _push_message_ip(
97-
self, queue: dirq.QueueSimple.QueueSimple, entries: typing.List[str]
98-
):
94+
def _push_message_ip(self, entries: typing.List[str]):
9995
"""Push an IP message."""
100-
self._push_message_json(
101-
queue, entries, "APEL Public IP message", self.version_ip
102-
)
96+
self._push_message_json(entries, "APEL Public IP message", self.version_ip)
10397

104-
def _push_message_accelerator(
105-
self, queue: dirq.QueueSimple.QueueSimple, entries: typing.List[str]
106-
):
98+
def _push_message_accelerator(self, entries: typing.List[str]):
10799
"""Push an accelerator message."""
108100
self._push_message_json(
109-
queue, entries, "APEL-accelerator-message", self.version_accelerator
101+
entries, "APEL-accelerator-message", self.version_accelerator
110102
)
111103

112-
def _push_message_storage(self, queue, entries):
104+
def _push_message_storage(self, entries):
113105
"""Push a storage message."""
114106
ns = {"xmlns:sr": "http://eu-emi.eu/namespaces/2011/02/storagerecord"}
115107
root = ETree.Element("sr:StorageUsageRecords", attrib=ns)
@@ -136,31 +128,29 @@ def _push_message_storage(self, queue, entries):
136128
ETree.SubElement(sr, "sr:EndTime").text = record.measure_time.isoformat()
137129
capacity = str(int(record.capacity * 1073741824)) # 1 GiB = 2^30
138130
ETree.SubElement(sr, "sr:ResourceCapacityUsed").text = capacity
139-
queue.add(ETree.tostring(root))
131+
self.queue.add(ETree.tostring(root))
140132

141133
def _push(self, entries_cloud, entries_ip, entries_accelerator, entries_storage):
142134
"""Push all messages, dividing them into smaller chunks.
143135
144136
This method gets lists of messages to be pushed in smaller chucks as per GGUS
145137
ticket 143436: https://ggus.eu/index.php?mode=ticket_info&ticket_id=143436
146138
"""
147-
queue = dirq.QueueSimple.QueueSimple(CONF.ssm.output_path)
148-
149139
for i in range(0, len(entries_cloud), CONF.ssm.max_size):
150140
entries = entries_cloud[i : i + CONF.ssm.max_size] # noqa(E203)
151-
self._push_message_cloud(queue, entries)
141+
self._push_message_cloud(entries)
152142

153143
for i in range(0, len(entries_ip), CONF.ssm.max_size):
154144
entries = entries_ip[i : i + CONF.ssm.max_size] # noqa(E203)
155-
self._push_message_ip(queue, entries)
145+
self._push_message_ip(entries)
156146

157147
for i in range(0, len(entries_accelerator), CONF.ssm.max_size):
158148
entries = entries_accelerator[i : i + CONF.ssm.max_size] # noqa(E203)
159-
self._push_message_accelerator(queue, entries)
149+
self._push_message_accelerator(entries)
160150

161151
for i in range(0, len(entries_storage), CONF.ssm.max_size):
162152
entries = entries_storage[i : i + CONF.ssm.max_size] # noqa(E203)
163-
self._push_message_storage(queue, entries)
153+
self._push_message_storage(entries)
164154

165155
def push(self, records):
166156
"""Push all records to SSM.

caso/tests/conftest.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,40 @@ def expected_entries_cloud():
310310
return ssm_entries
311311

312312

313+
@pytest.fixture
314+
def expected_message_cloud():
315+
"""Get a fixture for a complete Cloud message."""
316+
message = (
317+
"APEL-cloud-message: v0.4\n"
318+
"CloudComputeService: Fake Cloud Service\n"
319+
f"CloudType: {cloud_type}\nCpuCount: 8\nCpuDuration: 3456000\n"
320+
"Disk: 250\nEndTime: 1685044746\nFQAN: VO 1 FQAN\nGlobalUserName: User DN\n"
321+
"ImageId: b39a8ed9-e15d-4b71-ada2-daf88efbac0a\n"
322+
"LocalGroupId: 03b6a6c4-cf2b-48b9-82f1-69c52b9f30af\n"
323+
"LocalUserId: a4519d7d-f60a-4908-9d63-7d9e17422188\nMachineName: VM Name 1\n"
324+
"Memory: 16\nPublicIPCount: 7\nSiteName: TEST-Site\nStartTime: 1684612746\n"
325+
"Status: started\nVMUUID: 721cf1db-0e0f-4c24-a5ea-cd75e0f303e8\n"
326+
"WallDuration: 432000\n"
327+
"%%"
328+
"\nCloudComputeService: Fake Cloud Service\n"
329+
f"CloudType: {cloud_type}\nCpuCount: 8\nCpuDuration: 3456000\n"
330+
"Disk: 250\nEndTime: 1685044746\nFQAN: VO 2 FQAN\nGlobalUserName: User DN\n"
331+
"ImageId: b39a8ed9-e15d-4b71-ada2-daf88efbac0a\n"
332+
"LocalGroupId: 03b6a6c4-cf2b-48b9-82f1-69c52b9f30af\n"
333+
"LocalUserId: a4519d7d-f60a-4908-9d63-7d9e17422188\nMachineName: VM Name 2\n"
334+
"Memory: 16\nPublicIPCount: 7\nSiteName: TEST-Site\nStartTime: 1684526346\n"
335+
"Status: completed\nVMUUID: a53738e1-13eb-4047-800c-067d14ce3d22\n"
336+
"WallDuration: 432000\n"
337+
)
338+
return message.encode("utf-8")
339+
340+
313341
@pytest.fixture
314342
def expected_entries_ip():
315343
"""Get a fixture for all IP entries."""
316344
ssm_entries = [
317345
'{"SiteName": "TEST-Site", '
318-
'"CloudType": "caso/4.1.0.0rc1 (OpenStack)", '
346+
f'"CloudType": "{cloud_type}", '
319347
'"CloudComputeService": "Fake Cloud Service", '
320348
'"uuid": "e3c5aeef-37b8-4332-ad9f-9d068f156dc2", '
321349
'"LocalUser": "a4519d7d-f60a-4908-9d63-7d9e17422188", '
@@ -326,7 +354,7 @@ def expected_entries_ip():
326354
'"IPVersion": 4, '
327355
'"IPCount": 10}',
328356
'{"SiteName": "TEST-Site", '
329-
'"CloudType": "caso/4.1.0.0rc1 (OpenStack)", '
357+
f'"CloudType": "{cloud_type}", '
330358
'"CloudComputeService": "Fake Cloud Service", '
331359
'"uuid": "5c50720e-a653-4d70-9b0e-d4388687fcbc", '
332360
'"LocalUser": "3391a44e-3728-478d-abde-b86c25356571", '
@@ -338,3 +366,35 @@ def expected_entries_ip():
338366
'"IPCount": 20}',
339367
]
340368
return ssm_entries
369+
370+
371+
@pytest.fixture
372+
def expected_message_ip():
373+
"""Get a fixture for a complete IP message."""
374+
message = (
375+
'{"Type": "APEL Public IP message", "Version": "0.2", "UsageRecords": ['
376+
'{"SiteName": "TEST-Site", '
377+
f'"CloudType": "{cloud_type}", '
378+
'"CloudComputeService": "Fake Cloud Service", '
379+
'"uuid": "e3c5aeef-37b8-4332-ad9f-9d068f156dc2", '
380+
'"LocalUser": "a4519d7d-f60a-4908-9d63-7d9e17422188", '
381+
'"GlobalUserName": "User 1 DN", '
382+
'"LocalGroup": "03b6a6c4-cf2b-48b9-82f1-69c52b9f30af", '
383+
'"FQAN": "VO 1 FQAN", '
384+
'"MeasurementTime": 1685044746, '
385+
'"IPVersion": 4, '
386+
'"IPCount": 10}, '
387+
'{"SiteName": "TEST-Site", '
388+
f'"CloudType": "{cloud_type}", '
389+
'"CloudComputeService": "Fake Cloud Service", '
390+
'"uuid": "5c50720e-a653-4d70-9b0e-d4388687fcbc", '
391+
'"LocalUser": "3391a44e-3728-478d-abde-b86c25356571", '
392+
'"GlobalUserName": "User 2 DN", '
393+
'"LocalGroup": "2dae43c4-1889-4e63-b172-d4e99381e30a", '
394+
'"FQAN": "VO 2 FQAN", '
395+
'"MeasurementTime": 1685044746, '
396+
'"IPVersion": 6, '
397+
'"IPCount": 20}'
398+
"]}"
399+
)
400+
return message

caso/tests/test_ssm.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def test_empty_records_does_nothing(monkeypatch):
2626
"""Test that empty records do nothing."""
2727
with monkeypatch.context() as m:
2828
m.setattr("caso.utils.makedirs", lambda x: None)
29+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: None)
2930
messenger = ssm.SSMMessenger()
3031

3132
assert messenger.push([]) is None
@@ -35,6 +36,7 @@ def test_weird_record_raises(monkeypatch):
3536
"""Test that empty records do nothing."""
3637
with monkeypatch.context() as m:
3738
m.setattr("caso.utils.makedirs", lambda x: None)
39+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: None)
3840
messenger = ssm.SSMMessenger()
3941

4042
with pytest.raises(caso.exception.CasoError):
@@ -49,6 +51,7 @@ def mock_push(entries_cloud, entries_ip, entries_accelerator, entries_storage):
4951

5052
with monkeypatch.context() as m:
5153
m.setattr("caso.utils.makedirs", lambda x: None)
54+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: None)
5255
messenger = ssm.SSMMessenger()
5356

5457
m.setattr(messenger, "_push", mock_push)
@@ -63,6 +66,7 @@ def mock_push(entries_cloud, entries_ip, entries_accelerator, entries_storage):
6366

6467
with monkeypatch.context() as m:
6568
m.setattr("caso.utils.makedirs", lambda x: None)
69+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: None)
6670
messenger = ssm.SSMMessenger()
6771

6872
m.setattr(messenger, "_push", mock_push)
@@ -84,7 +88,49 @@ def mock_push(entries_cloud, entries_ip, entries_accelerator, entries_storage):
8488

8589
with monkeypatch.context() as m:
8690
m.setattr("caso.utils.makedirs", lambda x: None)
91+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: None)
8792
messenger = ssm.SSMMessenger()
8893

8994
m.setattr(messenger, "_push", mock_push)
9095
messenger.push(cloud_record_list + ip_record_list)
96+
97+
98+
class _MockQueue:
99+
@staticmethod
100+
def add(message):
101+
"""Add a message to the fake queue."""
102+
pass
103+
104+
105+
def test_complete_cloud_message(
106+
monkeypatch, expected_entries_cloud, expected_message_cloud
107+
):
108+
"""Test a complete cloud message."""
109+
110+
def mock_add(message):
111+
assert message == expected_message_cloud
112+
113+
with monkeypatch.context() as m:
114+
m.setattr("caso.utils.makedirs", lambda x: None)
115+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: _MockQueue())
116+
messenger = ssm.SSMMessenger()
117+
118+
m.setattr(messenger.queue, "add", mock_add)
119+
messenger._push_message_cloud(expected_entries_cloud)
120+
121+
122+
def test_complete_ip_message(monkeypatch, expected_entries_ip, expected_message_ip):
123+
"""Test a complete cloud message."""
124+
125+
def mock_add(message):
126+
print(message)
127+
print(expected_message_ip)
128+
assert message == expected_message_ip
129+
130+
with monkeypatch.context() as m:
131+
m.setattr("caso.utils.makedirs", lambda x: None)
132+
m.setattr("dirq.QueueSimple.QueueSimple", lambda x: _MockQueue())
133+
messenger = ssm.SSMMessenger()
134+
135+
m.setattr(messenger.queue, "add", mock_add)
136+
messenger._push_message_ip(expected_entries_ip)

0 commit comments

Comments
 (0)