Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
e3b7419
feat: Initial attempt at stream-based websocket logic
Apr 25, 2025
d13aca2
feat: Partial work on stream
Apr 25, 2025
39455f3
fix: Now awaits _consume()
Apr 30, 2025
30434b7
fix: Better log and load balancer mode
Apr 30, 2025
048b00f
fix: Better log
Apr 30, 2025
d634608
fix: Avoid create stream, and use of decoder
Apr 30, 2025
79d7b87
fix: Messages now sent through socket (and attempt to stop consumer)
Apr 30, 2025
5c7c115
fix: Now closes consumer (when stopped)
Apr 30, 2025
3455e53
feat: Better (?) shutdown
Apr 30, 2025
7eb6fb7
fix: Msg now sent as string (still locking up)
Apr 30, 2025
bc819ba
fix: Experiment with timestamp offset
Apr 30, 2025
436668b
build: Add python-dateutil
May 1, 2025
294e7d0
fix: Initial support for stream headers (offset specification)
May 1, 2025
446fa00
fix: Log offset specification
May 1, 2025
7aaa55c
build: Fix dateutil package
May 1, 2025
8c3e270
fix: Fix BAD_REQUEST
May 1, 2025
2ca05e2
fix: Adjust log (offset and timestamp)
May 1, 2025
ab9f872
feat: Event stream now inserts offset and timestamp to generated mess…
May 1, 2025
5aecfd7
fix: Properties now at the end of the string and in maps using ess_ p…
May 1, 2025
4373ac6
fix: Log header values
May 1, 2025
d510ede
feat: Attempt of fix msg transmission
May 1, 2025
24fe596
fix: Revert changes
May 1, 2025
6a9a6c5
fix: Attempt to fix timestamp from datetime
May 1, 2025
706037b
fix: Attempt to handle StreamDoesNotExist
May 1, 2025
a22c807
feat: Faster handling of missing streams
May 1, 2025
c11a673
feat: Attempt to append offset and timestamp message
May 1, 2025
9e0424f
feat: Attempt to detect encoding
May 1, 2025
2439331
fix: Attempt to fix message
May 1, 2025
efe780b
fix: Remove msg type/encoding display
May 1, 2025
41b8f05
fix: Another attempt at message decoding
May 1, 2025
91066ce
feat: Attempt to patch proto string
May 1, 2025
2baade3
fix: Trim string
May 1, 2025
32ac80e
fix: Skip recoding
May 1, 2025
9a74b41
feat: Adds keys to json string (if used)
May 1, 2025
e427e08
feat: Better ESS properties in protobuf string
May 1, 2025
c552086
feat: Reduce log (no offset/timestamp)
May 1, 2025
2ce019f
docs: Doc tweak
May 2, 2025
afef082
docs: Doc tweak (headers)
May 2, 2025
6472635
fix: Better close for websockets (close rather than exception)
May 2, 2025
bfe81e2
style: Log tweak
May 2, 2025
a45ace4
style: Slimmer logging
May 2, 2025
98b8899
style: Remove some log
May 2, 2025
fa50a8f
feat: Initial memcached logic
May 21, 2025
17d9715
fix: Fix deployment typos
May 21, 2025
de186f1
fix: Use of KeepaliveOpts and memcached initialised on startup
May 21, 2025
388ebe0
fix: Better log location (memcached)
May 21, 2025
f577dc4
fix: Attempt to detect change of ES UUID
May 21, 2025
cd203cc
ci: Remove unused variable
May 21, 2025
20c8f04
style: Logging changes
May 21, 2025
3046835
style: Another log tweak
May 21, 2025
8dd5568
fix: Correct the memcache key value (unique to socket)
May 21, 2025
6ea3f37
ci: Update memcached configuration
May 21, 2025
8df2b7f
fix: Fix undefined variable
May 21, 2025
96194a9
fix: Removed confusing global variables
May 21, 2025
64e1936
fix: Fix messages and correct byte to string comparison
May 21, 2025
69e20de
style: Logging corrections
May 21, 2025
2a5b1ba
style: More logging adjustments
May 21, 2025
90f343d
fix: Fix decode of None
May 21, 2025
98614e8
fix: Fix uuid4() to string
May 21, 2025
c298640
style: Log tweak
May 21, 2025
0a9fad2
fix: Handle RuntimeError closing socket
May 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@
# Disable some distracting checks.
# See http://pylint.pycqa.org/en/latest/user_guide/message-control.html
disable = R0801,
too-few-public-methods
too-few-public-methods,
too-many-arguments,
too-many-locals,
too-many-statements
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,69 @@ by the client using the command: -
the one used by RabbitMQ in the docker-compose file. If you need to change this
you can provide a different AMPQ URL as a 2nd argument on the command line.

## Version 1
Version 1 uses the `pika` package and relies on a classic exchange-based
RabbitMQ Topic Queue.

## Version 2
Version 2 uses the `rstream` package and relies on a RabbitMQ stream.

Version 2 also extends messages prior to forwarding them to their socket clients.
It does this by appending the messages's **offset** in the stream (an `integer`,
which we refer to as an **ordinal**)), and the message's **timestamp** (also an `integer`),
both of which are provided by the backing RabbitMQ stream as the messages are received
by the Event Stream's consumer. The **offset** can be used as a unique message identifier.

When received as a protobuf string the values are appended to the end of the original
message using `|` as a delimiter. Here is an example, with the message split at the
delimiter for clarity: -

accountserver.MerchantProcessingCharge
|timestamp: "2025-04-30T19:20:37.926+00:00" merchant_kind: "DATA_MANAGER" [...]
|offset: 2
|timestamp: 1746042171620

JSON strings will have these values added to the received dictionary using the
keys `ess_offset` and `ess_timestamp`, again, displayed here over several lines
for clarity: -

{
"ess_offset": 2,
"ess_timestamp": 1746042171620,
"message_type": "accountserver.MerchantProcessingCharge",
"message_body": {
"timestamp": "2025-04-30T19:20:37.926+00:00",
"merchant_kind": "DATA_MANAGER"
}
}

## Connecting to sockets (historical events)
The streaming service keeps historical events based on a maximum age and file size.
Consequently you can connect to your websocket and retrieve these historical events
as long as they remain in the backend streaming queue. You identify the
start-time of your events by using **headers** in the websocket request for your
stream's **LOCATION**.

If you do not provide any header value your socket will only deliver new events.

You can select the start of your events buy providing either an **ordinal**,
**timestamp**, or **datetime** string.

To stream from a specific **ordinal** (**offset**), provide it as the numerical value
of the header property `X-StreamOffsetFromOrdinal`.

To stream from a specific **timestamp**, provide it as the numerical value
of the header property `X-StreamOffsetFromTimestamp`.

To stream from a specific **datetime**, provide the date/time string as the value
of the header property `X-StreamOffsetFromDatetime`. The datetime string is extremely
flexible and is interpreted by the **python-dateutil** package's `parse` function.
UTC is used as the reference for messages and the string will be interpreted as a
UTC value if it has no timezone specification. For example, if you are in CEST and
it is `13:33` and you want to retrieve times from 13:33 (local time) then you will need
to provide a **datetime** string value that has the time set to
`11:33` (the UTC time for 13:33 CEST) or specify `13:33+02:00`

---

[black]: https://black.readthedocs.io/en/stable
Expand Down
3 changes: 3 additions & 0 deletions ansible/parameters-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ ess_image_tag: SetMe
ess_service_account: SetMe
ess_ws_hostname: SetMe
ess_ampq_url: SetMe

ess_shared_volume_volume_storageclass: SetMe
ess_log_volume_volume_storageclass: SetMe
26 changes: 26 additions & 0 deletions ansible/roles/app/templates/deployment.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ spec:
{% if ess_api_mem_limit %}
memory: {{ ess_api_mem_limit }}
{% endif %}
{% endif %}

- name: memcached
image: memcached:{{ ess_memcached_tag }}
ports:
- containerPort: 11211
name: memcached
env:
- name: MEMCACHED_MAX_CONNECTIONS
value: '{{ ess_memcached_max_connections }}'
- name: MEMCACHED_MAX_ITEM_SIZE
value: '36'
- name: MEMCACHED_MEMORY_LIMIT
value: '{{ ess_memcached_memory_limit }}'
resources:
requests:
cpu: {{ ess_memcached_cpu_request }}
memory: {{ ess_memcached_mem_request }}
{% if ess_memcached_cpu_limit or ess_memcached_mem_limit %}
limits:
{% if ess_memcached_cpu_limit %}
cpu: {{ ess_memcached_cpu_limit }}
{% endif %}
{% if ess_memcached_mem_limit %}
memory: {{ ess_memcached_mem_limit }}
{% endif %}
{% endif %}

volumes:
Expand Down
15 changes: 15 additions & 0 deletions ansible/roles/app/vars/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,20 @@ ess_cert_manager_issuer_base: letsencrypt-{{ ess_ingress_class }}
# One of 'production', 'staging' or blank (for no SSL).
ess_cert_issuer:

# memcached image tag
ess_memcached_tag: 1.6.38-alpine3.21
ess_memcached_memory_limit: 200m
# memcached resources
ess_memcached_cpu_request: 100m
ess_memcached_cpu_limit:
ess_memcached_mem_request: 256Mi
ess_memcached_mem_limit: 256Mi

# Environment
# Number of connections
ess_memcached_max_connections: 4
# Max item size - we use uuid4() strings
ess_memcached_max_item_size: 36

ess_priority_class:
ess_api_termination_grace_period_seconds: 30
Loading