Skip to content

Commit d395b73

Browse files
maxzhengedenhill
authored andcommitted
Restructure code to be more Pythonic and adhere to PEP8 style (#340, @maxzheng)
1 parent cd90d42 commit d395b73

File tree

1 file changed

+53
-21
lines changed

1 file changed

+53
-21
lines changed

README.md

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,12 @@ Usage
3939
```python
4040
from confluent_kafka import Producer
4141

42+
4243
p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
44+
4345
for data in some_data_source:
4446
p.produce('mytopic', data.encode('utf-8'))
47+
4548
p.flush()
4649
```
4750

@@ -51,17 +54,29 @@ p.flush()
5154
```python
5255
from confluent_kafka import Consumer, KafkaError
5356

54-
c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
55-
'default.topic.config': {'auto.offset.reset': 'smallest'}})
57+
58+
c = Consumer({
59+
'bootstrap.servers': 'mybroker',
60+
'group.id': 'mygroup',
61+
'default.topic.config': {
62+
'auto.offset.reset': 'smallest'
63+
}
64+
})
65+
5666
c.subscribe(['mytopic'])
57-
running = True
58-
while running:
67+
68+
while True:
5969
msg = c.poll()
60-
if not msg.error():
61-
print('Received message: %s' % msg.value().decode('utf-8'))
62-
elif msg.error().code() != KafkaError._PARTITION_EOF:
63-
print(msg.error())
64-
running = False
70+
71+
if msg.error():
72+
if msg.error().code() == KafkaError._PARTITION_EOF:
73+
continue
74+
else:
75+
print(msg.error())
76+
break
77+
78+
print('Received message: {}'.format(msg.value().decode('utf-8')))
79+
6580
c.close()
6681
```
6782

@@ -71,12 +86,17 @@ c.close()
7186
from confluent_kafka import avro
7287
from confluent_kafka.avro import AvroProducer
7388

89+
7490
value_schema = avro.load('ValueSchema.avsc')
7591
key_schema = avro.load('KeySchema.avsc')
7692
value = {"name": "Value"}
7793
key = {"name": "Key"}
7894

79-
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
95+
avroProducer = AvroProducer({
96+
'bootstrap.servers': 'mybroker,mybroker2',
97+
'schema.registry.url': 'http://schem_registry_host:port'
98+
}, default_key_schema=key_schema, default_value_schema=value_schema)
99+
80100
avroProducer.produce(topic='my_topic', value=value, key=key)
81101
avroProducer.flush()
82102
```
@@ -88,21 +108,33 @@ from confluent_kafka import KafkaError
88108
from confluent_kafka.avro import AvroConsumer
89109
from confluent_kafka.avro.serializer import SerializerError
90110

91-
c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'})
111+
112+
c = AvroConsumer({
113+
'bootstrap.servers': 'mybroker,mybroker2',
114+
'group.id': 'groupid',
115+
'schema.registry.url': 'http://127.0.0.1:8081'})
116+
92117
c.subscribe(['my_topic'])
93-
running = True
94-
while running:
118+
119+
while True:
95120
try:
96121
msg = c.poll(10)
97-
if msg:
98-
if not msg.error():
99-
print(msg.value())
100-
elif msg.error().code() != KafkaError._PARTITION_EOF:
101-
print(msg.error())
102-
running = False
122+
103123
except SerializerError as e:
104-
print("Message deserialization failed for %s: %s" % (msg, e))
105-
running = False
124+
print("Message deserialization failed for {}: {}".format(msg, e))
125+
break
126+
127+
if msg is None:
128+
continue
129+
130+
if msg.error():
131+
if msg.error().code() == KafkaError._PARTITION_EOF:
132+
continue
133+
else:
134+
print(msg.error())
135+
break
136+
137+
print(msg.value())
106138

107139
c.close()
108140
```

0 commit comments

Comments
 (0)