Skip to content

Commit de4de15

Browse files
committed
Fix record format, and use correct message format for publication
IP records are to be sent as JSON messages, with the format defined in the following schema [1]. This change fixes both the record and the SSM publisher, so that we use the correct versions and message formats. [1]: https://github.com/gregcorbett/apel/blob/ip_accounting/apel/common/message_schemas.py
1 parent 073ca07 commit de4de15

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

caso/messenger/ssm.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# under the License.
1616

1717
import abc
18+
import json
1819
import warnings
1920

2021
import dirq.QueueSimple
@@ -47,13 +48,26 @@
4748

4849
@six.add_metaclass(abc.ABCMeta)
4950
class _SSMBaseMessenger(caso.messenger.BaseMessenger):
50-
version = None
51-
separator = "%%"
51+
compute_version = None
52+
ip_version = None
5253

5354
def __init__(self):
5455
# FIXME(aloga): try except here
5556
utils.makedirs(CONF.ssm.output_path)
56-
self.header = "APEL-cloud-message: v%s" % self.version
57+
58+
def push_compute_message(self, queue, entries):
59+
message = f"APEL-cloud-message: v{self.compute_version}\n"
60+
aux = "%%\n".join(entries)
61+
message += f"{aux}\n"
62+
queue.add(message)
63+
64+
def push_ip_message(self, queue, entries):
65+
message = {
66+
"Type": "APEL Public IP message",
67+
"Version": self.ip_version,
68+
"UsageRecords": entries,
69+
}
70+
queue.add(json.dumps(message))
5771

5872
def push(self, records):
5973
if not records:
@@ -65,42 +79,34 @@ def push(self, records):
6579
if isinstance(record, caso.record.CloudRecord):
6680
aux = ""
6781
for k, v in six.iteritems(record.as_dict(
68-
version=self.version)):
82+
version=self.compute_version)):
6983
if v is not None:
7084
aux += "%s: %s\n" % (k, v)
7185
entries_cloud.append(aux)
7286
else:
73-
entries_ip.append(record.as_json(version=self.version))
74-
75-
# FIXME(aloga): try except here
76-
queue = dirq.QueueSimple.QueueSimple(CONF.ssm.output_path)
77-
78-
# Divide message into smaller chunks as per GGUS #143436
79-
# https://ggus.eu/index.php?mode=ticket_info&ticket_id=143436
80-
for i in range(0, len(entries_cloud), CONF.ssm.max_size):
81-
message = "%s\n" % self.header
87+
entries_ip.append(record.as_dict(version=self.ip_version))
8288

83-
sep = "%s\n" % self.separator
84-
message += "%s" % sep.join(entries_cloud[i:i +
85-
CONF.ssm.max_size])
86-
87-
queue.add(message)
88-
89-
# Send entries for IP record
90-
for i in range(0, len(entries_ip), CONF.ssm.max_size):
91-
message = "IP message v%s\n" % self.version
89+
# FIXME(aloga): try except here
90+
queue = dirq.QueueSimple.QueueSimple(CONF.ssm.output_path)
9291

93-
message += "%s\n" % entries_ip[i:i + CONF.ssm.max_size]
92+
# Divide message into smaller chunks as per GGUS #143436
93+
# https://ggus.eu/index.php?mode=ticket_info&ticket_id=143436
94+
for i in range(0, len(entries_cloud), CONF.ssm.max_size):
95+
entries = entries_cloud[i:i + CONF.ssm.max_size]
96+
self.push_compute_message(queue, entries)
9497

95-
queue.add(message)
98+
for i in range(0, len(entries_ip), CONF.ssm.max_size):
99+
entries = entries_ip[i:i + CONF.ssm.max_size]
100+
self.push_ip_message(queue, entries)
96101

97102

98103
class SSMMessengerV02(_SSMBaseMessenger):
99-
version = "0.2"
104+
compute_version = "0.2"
100105

101106

102107
class SSMMessengerV04(_SSMBaseMessenger):
103-
version = "0.4"
108+
compute_version = "0.4"
109+
ip_version = "0.2"
104110

105111

106112
class SsmMessenger(SSMMessengerV02):

caso/record.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ class IPRecord(object):
240240
241241
"""
242242

243-
version = "0.4"
243+
version = "0.2"
244244

245245
_V02_fields = [
246246
"MeasurementTime",
@@ -256,7 +256,7 @@ class IPRecord(object):
256256
]
257257

258258
_version_field_map = {
259-
"0.4": _V02_fields,
259+
"0.2": _V02_fields,
260260
}
261261

262262
def __init__(
@@ -316,8 +316,8 @@ def map(self):
316316
),
317317
'SiteName': self.site,
318318
'CloudType': self.cloud_type,
319-
'LocalUserId': self.user_id,
320-
'LocalGroupId': self.group_id,
319+
'LocalUser': self.user_id,
320+
'LocalGroup': self.group_id,
321321
'FQAN': self.fqan,
322322
'GlobalUserName': self.user_dn,
323323
'IPVersion': self.ip_version,
@@ -327,4 +327,5 @@ def map(self):
327327
return d
328328

329329
def as_json(self, version=None):
330+
print(self.map)
330331
return json.dumps(self.as_dict(version=version))

0 commit comments

Comments
 (0)