Skip to content

Commit 0992de5

Browse files
Merge pull request #1 from InformaticsMatters/sc-3299-rstream
Initial rstream logic
2 parents 5b2b3a5 + 0a9fad2 commit 0992de5

File tree

10 files changed

+1049
-769
lines changed

10 files changed

+1049
-769
lines changed

.pylintrc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,7 @@
33
# Disable some distracting checks.
44
# See http://pylint.pycqa.org/en/latest/user_guide/message-control.html
55
disable = R0801,
6-
too-few-public-methods
6+
too-few-public-methods,
7+
too-many-arguments,
8+
too-many-locals,
9+
too-many-statements

README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,69 @@ by the client using the command: -
173173
the one used by RabbitMQ in the docker-compose file. If you need to change this
174174
you can provide a different AMPQ URL as a 2nd argument on the command line.
175175

176+
## Version 1
177+
Version 1 uses the `pika` package and relies on a classic exchange-based
178+
RabbitMQ Topic Queue.
179+
180+
## Version 2
181+
Version 2 uses the `rstream` package and relies on a RabbitMQ stream.
182+
183+
Version 2 also extends messages prior to forwarding them to their socket clients.
184+
It does this by appending the messages's **offset** in the stream (an `integer`,
185+
which we refer to as an **ordinal**)), and the message's **timestamp** (also an `integer`),
186+
both of which are provided by the backing RabbitMQ stream as the messages are received
187+
by the Event Stream's consumer. The **offset** can be used as a unique message identifier.
188+
189+
When received as a protobuf string the values are appended to the end of the original
190+
message using `|` as a delimiter. Here is an example, with the message split at the
191+
delimiter for clarity: -
192+
193+
accountserver.MerchantProcessingCharge
194+
|timestamp: "2025-04-30T19:20:37.926+00:00" merchant_kind: "DATA_MANAGER" [...]
195+
|offset: 2
196+
|timestamp: 1746042171620
197+
198+
JSON strings will have these values added to the received dictionary using the
199+
keys `ess_offset` and `ess_timestamp`, again, displayed here over several lines
200+
for clarity: -
201+
202+
{
203+
"ess_offset": 2,
204+
"ess_timestamp": 1746042171620,
205+
"message_type": "accountserver.MerchantProcessingCharge",
206+
"message_body": {
207+
"timestamp": "2025-04-30T19:20:37.926+00:00",
208+
"merchant_kind": "DATA_MANAGER"
209+
}
210+
}
211+
212+
## Connecting to sockets (historical events)
213+
The streaming service keeps historical events based on a maximum age and file size.
214+
Consequently you can connect to your websocket and retrieve these historical events
215+
as long as they remain in the backend streaming queue. You identify the
216+
start-time of your events by using **headers** in the websocket request for your
217+
stream's **LOCATION**.
218+
219+
If you do not provide any header value your socket will only deliver new events.
220+
221+
You can select the start of your events buy providing either an **ordinal**,
222+
**timestamp**, or **datetime** string.
223+
224+
To stream from a specific **ordinal** (**offset**), provide it as the numerical value
225+
of the header property `X-StreamOffsetFromOrdinal`.
226+
227+
To stream from a specific **timestamp**, provide it as the numerical value
228+
of the header property `X-StreamOffsetFromTimestamp`.
229+
230+
To stream from a specific **datetime**, provide the date/time string as the value
231+
of the header property `X-StreamOffsetFromDatetime`. The datetime string is extremely
232+
flexible and is interpreted by the **python-dateutil** package's `parse` function.
233+
UTC is used as the reference for messages and the string will be interpreted as a
234+
UTC value if it has no timezone specification. For example, if you are in CEST and
235+
it is `13:33` and you want to retrieve times from 13:33 (local time) then you will need
236+
to provide a **datetime** string value that has the time set to
237+
`11:33` (the UTC time for 13:33 CEST) or specify `13:33+02:00`
238+
176239
---
177240

178241
[black]: https://black.readthedocs.io/en/stable

ansible/parameters-template.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ ess_image_tag: SetMe
1111
ess_service_account: SetMe
1212
ess_ws_hostname: SetMe
1313
ess_ampq_url: SetMe
14+
15+
ess_shared_volume_volume_storageclass: SetMe
16+
ess_log_volume_volume_storageclass: SetMe

ansible/roles/app/templates/deployment.yaml.j2

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,32 @@ spec:
111111
{% if ess_api_mem_limit %}
112112
memory: {{ ess_api_mem_limit }}
113113
{% endif %}
114+
{% endif %}
115+
116+
- name: memcached
117+
image: memcached:{{ ess_memcached_tag }}
118+
ports:
119+
- containerPort: 11211
120+
name: memcached
121+
env:
122+
- name: MEMCACHED_MAX_CONNECTIONS
123+
value: '{{ ess_memcached_max_connections }}'
124+
- name: MEMCACHED_MAX_ITEM_SIZE
125+
value: '36'
126+
- name: MEMCACHED_MEMORY_LIMIT
127+
value: '{{ ess_memcached_memory_limit }}'
128+
resources:
129+
requests:
130+
cpu: {{ ess_memcached_cpu_request }}
131+
memory: {{ ess_memcached_mem_request }}
132+
{% if ess_memcached_cpu_limit or ess_memcached_mem_limit %}
133+
limits:
134+
{% if ess_memcached_cpu_limit %}
135+
cpu: {{ ess_memcached_cpu_limit }}
136+
{% endif %}
137+
{% if ess_memcached_mem_limit %}
138+
memory: {{ ess_memcached_mem_limit }}
139+
{% endif %}
114140
{% endif %}
115141

116142
volumes:

ansible/roles/app/vars/main.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,20 @@ ess_cert_manager_issuer_base: letsencrypt-{{ ess_ingress_class }}
1111
# One of 'production', 'staging' or blank (for no SSL).
1212
ess_cert_issuer:
1313

14+
# memcached image tag
15+
ess_memcached_tag: 1.6.38-alpine3.21
16+
ess_memcached_memory_limit: 200m
17+
# memcached resources
18+
ess_memcached_cpu_request: 100m
19+
ess_memcached_cpu_limit:
20+
ess_memcached_mem_request: 256Mi
21+
ess_memcached_mem_limit: 256Mi
22+
23+
# Environment
24+
# Number of connections
25+
ess_memcached_max_connections: 4
26+
# Max item size - we use uuid4() strings
27+
ess_memcached_max_item_size: 36
28+
1429
ess_priority_class:
1530
ess_api_termination_grace_period_seconds: 30

0 commit comments

Comments
 (0)