Skip to content

Commit be69f7c

Browse files
committed
- Added support for JSON batch encoding/decoding.
1 parent 3d014fe commit be69f7c

File tree

4 files changed

+228
-16
lines changed

4 files changed

+228
-16
lines changed

README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,19 @@ Check out the [CloudEvents spec](https://github.com/cloudevents/spec/blob/v1.0/s
88
This package has no dependencies beyond the Python standard library with the base install.
99
Optionally depends on the `avro` package for Avro encode/decode functionality.
1010

11+
## Features
12+
13+
* Implements CloudEvents 1.0 spec.
14+
* JSON and JSON batch encoding/decoding.
15+
* Avro encoding/decoding.
16+
* Simple API.
17+
1118
## News
1219

20+
### 0.2.3 - (*2020-09-30*)
21+
22+
* Added support for encoding/decoding batch events in JSON.
23+
1324
### 0.2.2 - (*2020-09-29*)
1425

1526
* First public release.
@@ -30,6 +41,8 @@ Install with JSON and Avro codecs:
3041

3142
## Usage:
3243

44+
### Creating Events
45+
3346
Create a CloudEvent with required attributes:
3447

3548
```python
@@ -101,6 +114,8 @@ Extension attributes can be accessed using the `attribute` method:
101114
assert event.attribute("external1") == "foo/bar"
102115
```
103116

117+
### Encoding/Decoding Events in JSON
118+
104119
Encode an event in JSON:
105120

106121
```python
@@ -111,6 +126,30 @@ encoded_event = Json.encode(event)
111126

112127
Note that blank fields won't be encoded.
113128

129+
Encode a batch of events in JSON:
130+
131+
```python
132+
from spce import CloudEvent, Json
133+
134+
event_batch = [
135+
CloudEvent(
136+
type="OximeterMeasured",
137+
source="oximeter/123",
138+
id="1000",
139+
datacontenttype="application/json",
140+
data=r'{"spo2": 99})',
141+
),
142+
CloudEvent(
143+
type="OximeterMeasured",
144+
source="oximeter/123",
145+
id="1001",
146+
datacontenttype="application/json",
147+
data=b'\x01binarydata\x02',
148+
),
149+
]
150+
encoded_batch = Json.encode(event_batch)
151+
```
152+
114153
Decode an event in JSON:
115154

116155
```python
@@ -132,6 +171,34 @@ text = """
132171
decoded_event = Json.decode(text)
133172
```
134173

174+
Decode a batch of events in JSON:
175+
176+
```python
177+
text = r'''
178+
[
179+
{
180+
"type":"OximeterMeasured",
181+
"source":"oximeter/123",
182+
"id":"1000",
183+
"specversion":"1.0",
184+
"datacontenttype": "application/json",
185+
"data": "{\"spo2\": 99}"
186+
},
187+
{
188+
"type":"OximeterMeasured",
189+
"source":"oximeter/123",
190+
"id":"1001",
191+
"specversion":"1.0",
192+
"datacontenttype": "application/json",
193+
"data_base64": "AWJpbmFyeWRhdGEC"
194+
}
195+
]
196+
'''
197+
decoded_events = Json.decode(text)
198+
```
199+
200+
### Encoding/Decoding Events in Avro
201+
135202
Encode an event in Avro:
136203

137204
```python

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
setup(
2121
name='spce',
22-
version='0.2.2',
22+
version='0.2.3',
2323
packages=['spce'],
2424
url='https://github.com/scaleplandev/spce-python',
2525
license='Apache 2.0',

spce/json.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import json
1616
from base64 import b64encode, b64decode
17+
from typing import Union, Iterable
1718

1819
from .cloudevents import CloudEvent
1920

@@ -25,24 +26,38 @@ class Json:
2526
_ENCODER = json.JSONEncoder()
2627

2728
@classmethod
28-
def encode(cls, event: CloudEvent):
29-
kvs = []
30-
encoder = cls._ENCODER
31-
for attr, value in event._attributes.items():
32-
if value:
33-
kvs.append('"%s":%s' % (attr, encoder.encode(value)))
34-
if event._data:
35-
if event._has_binary_data:
36-
kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode()))
37-
else:
38-
kvs.append('"data":%s' % encoder.encode(event._data))
39-
return "{%s}" % ",".join(kvs)
29+
def encode(cls, event: Union[CloudEvent, Iterable[CloudEvent]]) -> str:
30+
if isinstance(event, Iterable):
31+
encoded = [cls.encode(e) for e in event]
32+
return "[%s]" % ",".join(encoded)
33+
elif isinstance(event, CloudEvent):
34+
kvs = []
35+
encoder = cls._ENCODER
36+
for attr, value in event._attributes.items():
37+
if value:
38+
kvs.append('"%s":%s' % (attr, encoder.encode(value)))
39+
if event._data:
40+
if event._has_binary_data:
41+
kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode()))
42+
else:
43+
kvs.append('"data":%s' % encoder.encode(event._data))
44+
return "{%s}" % ",".join(kvs)
45+
else:
46+
raise TypeError("JSON.encode cannot encode %s" % type(event))
4047

4148
@classmethod
42-
def decode(cls, text: str) -> CloudEvent:
49+
def decode(cls, text: str) -> Union[CloudEvent, Iterable[CloudEvent]]:
4350
d = json.loads(text)
51+
if isinstance(d, dict):
52+
return CloudEvent(**cls._normalize_data(d))
53+
elif isinstance(d, Iterable):
54+
return [CloudEvent(**cls._normalize_data(it)) for it in d]
55+
else:
56+
raise TypeError("JSON.decode cannot decode %s" % type(d))
57+
58+
@classmethod
59+
def _normalize_data(cls, d: dict) -> dict:
4460
if "data_base64" in d:
4561
d["data"] = b64decode(d["data_base64"])
4662
del d["data_base64"]
47-
48-
return CloudEvent(**d)
63+
return d

tests/json_test.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,72 @@ def test_encode_extension_attribute(self):
120120
'''
121121
self.assertEqual(json.loads(target), json.loads(encoded))
122122

123+
def test_encode_batch_0_items(self):
124+
self.assertEqual("[]", Json.encode([]))
125+
126+
def test_encode_batch_1_item(self):
127+
event_batch = [
128+
CloudEvent(
129+
type="OximeterMeasured",
130+
source="oximeter/123",
131+
id="1000",
132+
datacontenttype="application/json",
133+
data=json.dumps({"spo2": 99}),
134+
)
135+
]
136+
encoded_batch = Json.encode(event_batch)
137+
target = r'''
138+
[{
139+
"type":"OximeterMeasured",
140+
"source":"oximeter/123",
141+
"id":"1000",
142+
"specversion":"1.0",
143+
"datacontenttype": "application/json",
144+
"data": "{\"spo2\": 99}"
145+
}]
146+
'''
147+
self.assertEqual(json.loads(target), json.loads(encoded_batch))
148+
149+
def test_encode_batch_2_items(self):
150+
event_batch = [
151+
CloudEvent(
152+
type="OximeterMeasured",
153+
source="oximeter/123",
154+
id="1000",
155+
datacontenttype="application/json",
156+
data=json.dumps({"spo2": 99}),
157+
),
158+
CloudEvent(
159+
type="OximeterMeasured",
160+
source="oximeter/123",
161+
id="1001",
162+
datacontenttype="application/json",
163+
data=b'\x01binarydata\x02',
164+
),
165+
]
166+
encoded_batch = Json.encode(event_batch)
167+
target = r'''
168+
[
169+
{
170+
"type":"OximeterMeasured",
171+
"source":"oximeter/123",
172+
"id":"1000",
173+
"specversion":"1.0",
174+
"datacontenttype": "application/json",
175+
"data": "{\"spo2\": 99}"
176+
},
177+
{
178+
"type":"OximeterMeasured",
179+
"source":"oximeter/123",
180+
"id":"1001",
181+
"specversion":"1.0",
182+
"datacontenttype": "application/json",
183+
"data_base64": "AWJpbmFyeWRhdGEC"
184+
}
185+
]
186+
'''
187+
self.assertEqual(json.loads(target), json.loads(encoded_batch))
188+
123189

124190
class JsonDecoderTests(unittest.TestCase):
125191

@@ -222,3 +288,67 @@ def test_decode_extension_attribute(self):
222288
)
223289
event = Json.decode(encoded_event)
224290
self.assertEqual(target, event)
291+
292+
def test_decode_batch_0_items(self):
293+
self.assertEqual([], Json.decode("[]"))
294+
295+
def test_decode_batch_1_item(self):
296+
encoded_batch = r'''
297+
[{
298+
"type":"OximeterMeasured",
299+
"source":"oximeter/123",
300+
"id":"1000",
301+
"specversion":"1.0",
302+
"datacontenttype": "application/json",
303+
"data": "{\"spo2\": 99}"
304+
}]
305+
'''
306+
target = [
307+
CloudEvent(
308+
type="OximeterMeasured",
309+
source="oximeter/123",
310+
id="1000",
311+
datacontenttype="application/json",
312+
data=json.dumps({"spo2": 99}),
313+
)
314+
]
315+
self.assertEqual(target, Json.decode(encoded_batch))
316+
317+
def test_decode_batch_2_items(self):
318+
encoded_batch = r'''
319+
[
320+
{
321+
"type":"OximeterMeasured",
322+
"source":"oximeter/123",
323+
"id":"1000",
324+
"specversion":"1.0",
325+
"datacontenttype": "application/json",
326+
"data": "{\"spo2\": 99}"
327+
},
328+
{
329+
"type":"OximeterMeasured",
330+
"source":"oximeter/123",
331+
"id":"1001",
332+
"specversion":"1.0",
333+
"datacontenttype": "application/json",
334+
"data_base64": "AWJpbmFyeWRhdGEC"
335+
}
336+
]
337+
'''
338+
target = [
339+
CloudEvent(
340+
type="OximeterMeasured",
341+
source="oximeter/123",
342+
id="1000",
343+
datacontenttype="application/json",
344+
data=json.dumps({"spo2": 99}),
345+
),
346+
CloudEvent(
347+
type="OximeterMeasured",
348+
source="oximeter/123",
349+
id="1001",
350+
datacontenttype="application/json",
351+
data=b'\x01binarydata\x02',
352+
),
353+
]
354+
self.assertEqual(target, Json.decode(encoded_batch))

0 commit comments

Comments
 (0)