Skip to content

Commit 0862a3f

Browse files
author
Alan Christie
committed
feat: Use of params rather than header values for offsets
1 parent 8cea1b0 commit 0862a3f

File tree

3 files changed

+42
-39
lines changed

3 files changed

+42
-39
lines changed

README.md

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,38 +212,42 @@ for clarity: -
212212
}
213213
}
214214

215+
## Version 3
216+
Version 3 uses **params** for the specification of historical events. Version 2
217+
used header values to provide these values.
218+
215219
## Connecting to sockets (historical events)
216220
The streaming service keeps historical events based on a maximum age and file size.
217221
Consequently you can connect to your websocket and retrieve these historical events
218222
as long as they remain in the backend streaming queue. You identify the
219-
start-time of your events by using **headers** in the websocket request for your
223+
start-time of your events by using **params** in the websocket request for your
220224
stream's **LOCATION**.
221225

222-
If you do not provide any header value your socket will only deliver new events.
226+
If you do not provide any value your socket will only deliver new events.
223227

224228
You can select the start of your events buy providing either an **ordinal**,
225-
**timestamp**, or **datetime** string. The first event delivered will the next
229+
**timestamp**, or **datetime**. The first event delivered will be the next
226230
event after the given reference. For example, if you provide **ordinal** `100`,
227231
the next event you can expect to receive is an event with **ordinal** `101`.
228232

229233
- To stream from a specific **ordinal**, provide it as the numerical value
230-
of the header property `X-StreamOffsetFromOrdinal`.
234+
of the request parameter `stream_from_ordinal`.
231235

232236
- To stream from a specific **timestamp**, provide it as the numerical value
233-
of the header property `X-StreamOffsetFromTimestamp`.
237+
of the request parameter `stream_from_timestamp`.
234238

235239
- To stream from a specific **datetime**, provide the date/time string as the value
236-
of the header property `X-StreamOffsetFromDatetime`. The datetime string is extremely
240+
of the request parameter `stream_from_datetime`. The datetime string is extremely
237241
flexible and is interpreted by the [python-dateutil] package's `parse` function.
238242
UTC is used as the reference for messages, and the string will be interpreted as a
239243
UTC value if it has no timezone specification. If you are in CEST for example, and
240244
it is `13:33`, and you want to retrieve times from 13:33 (local time), then you
241245
will need to provide a string value that has the date you are interested in,
242246
ans the time set to `11:33` (the UTC time for 13:33 CEST) or specify `13:33+02:00`.
243247

244-
You can only provide one type of historical reference. If you provide a header
245-
value for `X-StreamOffsetFromOrdinal` for example, you cannot also provide
246-
a value for `X-StreamOffsetFromTimestamp`.
248+
You can only provide one type of historical reference. If you provide a
249+
value for `stream_from_ordinal` for example, you cannot also provide
250+
a value for `stream_from_timestamp`.
247251

248252
## Stream storage
249253
The Account Server (AS) relies on [RabbitMQ] for its event streaming service,
@@ -255,7 +259,7 @@ will just deliver the first event it has available.
255259

256260
You will know that you have lost messages if you keep a record of the most recent
257261
message **ordinal**. If you received **ordinal** `100` and then, some time later,
258-
re-connect using an `X-StreamOffsetFromOrdinal` value of `100`, and the first message
262+
re-connect using an `stream_from_ordinal` value of `100`, and the first message
259263
your receive has the **ordinal** `150` then you can assume 49 messages have been lost.
260264

261265
Having said all this the AS stream storage configuration is generous and, depending

app/app.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@
77
import time
88
import uuid as python_uuid
99
from logging.config import dictConfig
10-
from typing import Annotated, Any
10+
from typing import Any
1111
from urllib.parse import ParseResult, urlparse
1212

1313
import shortuuid
1414
from dateutil.parser import parse
1515
from fastapi import (
1616
FastAPI,
17-
Header,
1817
HTTPException,
1918
WebSocket,
2019
WebSocketDisconnect,
@@ -195,9 +194,9 @@ class EventStreamGetResponse(BaseModel):
195194
async def event_stream(
196195
websocket: WebSocket,
197196
uuid: str,
198-
x_streamfromdatetime: Annotated[str | None, Header()] = None,
199-
x_streamfromordinal: Annotated[str | None, Header()] = None,
200-
x_streamfromtimestamp: Annotated[str | None, Header()] = None,
197+
stream_from_datetime: str | None = None,
198+
stream_from_ordinal: int | None = None,
199+
stream_from_timestamp: int | None = None,
201200
):
202201
"""The websocket handler for the event-stream.
203202
The UUID is returned to the AS when the web-socket is created
@@ -213,14 +212,13 @@ async def event_stream(
213212
await websocket.accept()
214213
_LOGGER.info("Accepted connection (uuid=%s)", uuid)
215214

216-
# Custom request headers.
217-
# The following are used to identify the first event in a stream: -
215+
# The following parameters are used to identify the first event in a stream: -
218216
#
219-
# X-StreamFromDatetime - an ISO8601 date/time string
220-
# X-StreamFromTimestamp - an event timestamp (integer) from a prior message
221-
# X-StreamFromOrdinal - a message ordinal (integer 0..N)
217+
# stream_from_datetime - an ISO8601 date/time string
218+
# stream_from_ordinal - a message ordinal (integer 0..N)
219+
# stream_from_timestamp - an event timestamp (integer) from a prior message
222220
#
223-
# Only one of the above is expected.
221+
# Only one of the above is permitted.
224222
num_stream_from_specified: int = 0
225223
header_value_error: bool = False
226224
header_value_error_msg: str = ""
@@ -229,11 +227,11 @@ async def event_stream(
229227
OffsetType.NEXT
230228
)
231229
# Was a streaming offset provided?
232-
if x_streamfromdatetime:
230+
if stream_from_datetime:
233231
num_stream_from_specified += 1
234232
try:
235-
_LOGGER.info("Found X-StreamFromDatetime=%s", x_streamfromdatetime)
236-
from_datetime = parse(x_streamfromdatetime)
233+
_LOGGER.info("Given stream_from_datetime=%s", stream_from_datetime)
234+
from_datetime = parse(stream_from_datetime)
237235
# We need a RabbitMQ stream timestamp,
238236
# which is milliseconds since the universal time epoch (1 Jan, 1970).
239237
# It's easy to get 'seconds', which we multiply by 1,000
@@ -243,34 +241,34 @@ async def event_stream(
243241
)
244242
except: # pylint: disable=bare-except
245243
header_value_error = True
246-
header_value_error_msg = "Unable to parse X-StreamFromDatetime value"
247-
if x_streamfromordinal:
248-
_LOGGER.info("Found X-StreamFromOrdinal=%s", x_streamfromordinal)
244+
header_value_error_msg = "Unable to parse stream_from_datetime value"
245+
if stream_from_ordinal:
246+
_LOGGER.info("Given stream_from_ordinal=%d", stream_from_ordinal)
249247
num_stream_from_specified += 1
250248
try:
251-
from_ordinal = int(x_streamfromordinal)
249+
from_ordinal = int(stream_from_ordinal)
252250
offset_specification = ConsumerOffsetSpecification(
253251
OffsetType.OFFSET, from_ordinal
254252
)
255253
except ValueError:
256254
header_value_error = True
257-
header_value_error_msg = "X-StreamFromOrdinal must be an integer"
258-
if x_streamfromtimestamp:
259-
_LOGGER.info("Found X-StreamFromTimestamp=%s", x_streamfromtimestamp)
255+
header_value_error_msg = "stream_from_ordinal must be an integer"
256+
if stream_from_timestamp:
257+
_LOGGER.info("Found stream_from_timestamp=%s", stream_from_timestamp)
260258
num_stream_from_specified += 1
261259
try:
262-
from_timestamp = int(x_streamfromtimestamp)
260+
from_timestamp = int(stream_from_timestamp)
263261
offset_specification = ConsumerOffsetSpecification(
264262
OffsetType.TIMESTAMP, from_timestamp
265263
)
266264
except ValueError:
267265
header_value_error = True
268-
header_value_error_msg = "X-StreamFromTimestamp must be an integer"
266+
header_value_error_msg = "stream_from_timestamp must be an integer"
269267

270268
# Replace any error with a 'too many values provided' error if necessary
271269
if num_stream_from_specified > 1:
272270
header_value_error = True
273-
header_value_error_msg = "Cannot provide more than one X-StreamFrom variable"
271+
header_value_error_msg = "Cannot provide more than one 'stream_from_' variable"
274272

275273
if header_value_error:
276274
await websocket.close(

ws_listener.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,20 @@
1111

1212
def main(c_args: argparse.Namespace):
1313
"""Connect to the WebSocket and just read messages."""
14-
headers = {}
14+
params = ""
1515
if c_args.datetime_offset:
16-
headers["X-StreamFromDatetime"] = c_args.datetime_offset
16+
params = f"?stream_from_datetime={c_args.datetime_offset}"
1717
elif c_args.timestamp_offset:
18-
headers["X-StreamFromTimestamp"] = c_args.timestamp_offset
18+
params = f"?stream_from_timestamp={c_args.timestamp_offset}"
1919
elif c_args.ordinal_offset:
20-
headers["X-StreamFromOrdinal"] = c_args.ordinal_offset
20+
params = f"?stream_from_ordinal={c_args.ordinal_offset}"
2121

2222
total_bytes = 0
2323
total_messages = 0
2424
min_bytes = 1_000_000
2525
max_bytes = 0
26-
ws = Client.connect(c_args.location, headers=headers)
26+
url = f"{c_args.location}{params}"
27+
ws = Client.connect(url)
2728
try:
2829
while True:
2930

0 commit comments

Comments
 (0)