diff --git a/winton_kafka_streams/processor/_record_collector.py b/winton_kafka_streams/processor/_record_collector.py index cf76f71..7c2d358 100644 --- a/winton_kafka_streams/processor/_record_collector.py +++ b/winton_kafka_streams/processor/_record_collector.py @@ -35,6 +35,10 @@ def send(self, topic, key, value, timestamp, while not produced: try: + if isinstance(timestamp, tuple) and timestamp: + timestamp = timestamp[-1] + if isinstance(timestamp, float): + timestamp = int(timestamp) self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp) self.producer.poll(0) # Ensure previous message's delivery reports are served produced = True