|
| 1 | +import csv |
| 2 | + |
| 3 | +from confluent_kafka import Producer |
| 4 | +from confluent_kafka.serialization import SerializationContext, MessageField, StringSerializer |
| 5 | +from confluent_kafka.schema_registry import SchemaRegistryClient |
| 6 | +from confluent_kafka.schema_registry.avro import AvroSerializer |
| 7 | + |
| 8 | + |
| 9 | +class Review(object): |
| 10 | + """ |
| 11 | + Product Review Record |
| 12 | +
|
| 13 | + Args: |
| 14 | + id (int): Review id |
| 15 | +
|
| 16 | + product_id (str): Id of the product purchased |
| 17 | +
|
| 18 | + user_id (str): User id |
| 19 | +
|
| 20 | + timestamp(int): timestamp of purchase |
| 21 | +
|
| 22 | + summary(str): Review summary |
| 23 | + """ |
| 24 | + def __init__(self, id, product_id, user_id, timestamp, summary): |
| 25 | + self.id = id |
| 26 | + self.product_id = product_id |
| 27 | + self.user_id = user_id |
| 28 | + self.timestamp = timestamp |
| 29 | + self.summary = summary |
| 30 | + |
| 31 | + |
| 32 | + |
| 33 | +def reading_to_dict(review, ctx): |
| 34 | + """ |
| 35 | + Returns a dict representation of a Review instance for serialization. |
| 36 | +
|
| 37 | + Args: |
| 38 | + review (Review): Review instance. |
| 39 | +
|
| 40 | + ctx (SerializationContext): Metadata pertaining to the serialization |
| 41 | + operation. |
| 42 | +
|
| 43 | + Returns: |
| 44 | + dict: Dict populated with product review attributes to be serialized. |
| 45 | + """ |
| 46 | + return dict(id=review.id, |
| 47 | + product_id=review.product_id, |
| 48 | + user_id=review.user_id, |
| 49 | + timestamp=review.timestamp, |
| 50 | + summary=review.summary) |
| 51 | + |
| 52 | + |
| 53 | +def delivery_report(err, msg): |
| 54 | + """ |
| 55 | + Reports the failure or success of a message delivery. |
| 56 | +
|
| 57 | + Args: |
| 58 | + err (KafkaError): The error that occurred on None on success. |
| 59 | +
|
| 60 | + msg (Message): The message that was produced or failed. |
| 61 | +
|
| 62 | + Note: |
| 63 | + In the delivery report callback the Message.key() and Message.value() |
| 64 | + will be the binary format as encoded by any configured Serializers and |
| 65 | + not the same object that was passed to produce(). |
| 66 | + If you wish to pass the original object(s) for key and value to delivery |
| 67 | + report callback we recommend a bound callback or lambda where you pass |
| 68 | + the objects along. |
| 69 | + in this case, msg.key() will return the sensor device id, since, that is set |
| 70 | + as the key in the message. |
| 71 | + """ |
| 72 | + if err is not None: |
| 73 | + print("Delivery failed for Review record {}: {}".format(msg.key(), err)) |
| 74 | + return |
| 75 | + print('Review record with Id {} successfully produced to Topic:{} Partition: [{}] at offset {}'.format( |
| 76 | + msg.key(), msg.topic(), msg.partition(), msg.offset())) |
| 77 | + |
| 78 | + |
| 79 | +def main(): |
| 80 | + topic = 'product_reviews' |
| 81 | + schema = 'review.avsc' |
| 82 | + |
| 83 | + cc_config = { |
| 84 | + 'bootstrap.servers': '<BOOTSTRAP SERVERS ENDPOINT>', |
| 85 | + 'security.protocol': 'SASL_SSL', |
| 86 | + 'sasl.mechanisms': 'PLAIN', |
| 87 | + 'sasl.username': '<KAFKA API KEY>', |
| 88 | + 'sasl.password': '<KAFKA API SECRET>' |
| 89 | + } |
| 90 | + |
| 91 | + sr_config = { |
| 92 | + 'url': '<SR ENDPOINT URL>', |
| 93 | + 'basic.auth.user.info': '<SR API KEY>:<SR API SECRET>' |
| 94 | + } |
| 95 | + |
| 96 | + with open(f"{schema}") as f: |
| 97 | + schema_str = f.read() |
| 98 | + |
| 99 | + schema_registry_conf = sr_config |
| 100 | + schema_registry_client = SchemaRegistryClient(schema_registry_conf) |
| 101 | + |
| 102 | + avro_serializer = AvroSerializer(schema_registry_client, |
| 103 | + schema_str, |
| 104 | + reading_to_dict) |
| 105 | + string_serializer = StringSerializer('utf_8') |
| 106 | + |
| 107 | + producer = Producer(cc_config) |
| 108 | + |
| 109 | + print("Producing review records to topic {}. ^C to exit.".format(topic)) |
| 110 | + |
| 111 | + with open('reviews.csv', 'r') as f: |
| 112 | + next(f) |
| 113 | + reader = csv.reader(f, delimiter=',') |
| 114 | + for column in reader: |
| 115 | + review = Review(id=int(column[0]), |
| 116 | + product_id=column[1], |
| 117 | + user_id=column[2], |
| 118 | + timestamp=int(column[3]), |
| 119 | + summary=column[4]) |
| 120 | + |
| 121 | + producer.produce(topic=topic, |
| 122 | + key=string_serializer(str(review.product_id), SerializationContext(topic=topic, field=MessageField.KEY)), |
| 123 | + value=avro_serializer(review, SerializationContext(topic, MessageField.VALUE)), |
| 124 | + on_delivery=delivery_report) |
| 125 | + |
| 126 | + producer.poll(10000) |
| 127 | + producer.flush() |
| 128 | + |
| 129 | + |
| 130 | +if __name__ == '__main__': |
| 131 | + main() |
0 commit comments