-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathseismic-consumer.py
More file actions
44 lines (34 loc) · 1.04 KB
/
seismic-consumer.py
File metadata and controls
44 lines (34 loc) · 1.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from quixstreams import Application
import json
import time
def main():
app = Application(
# set broker(s)
broker_address="localhost:19092",
# set the logging level
loglevel="DEBUG",
# define the consumer group
consumer_group="seismic_reader",
# set the offset reset - either earliest/latest
auto_offset_reset="earliest",
)
with app.get_consumer() as consumer:
consumer.subscribe(["eu_seismic"])
while True:
msg = consumer.poll(1)
if msg is None:
print("Waiting...")
elif msg.error() is not None:
raise Exception(msg.error())
else:
key = msg.key().decode("utf8")
value = json.loads(msg.value())
offset = msg.offset()
print(f"{offset} {key} {value}")
consumer.store_offsets(msg)
time.sleep(10)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass