Skip to content

Commit 7f31937

Browse files
committed
get timestamps working for replay and consume
1 parent a72ad7d commit 7f31937

File tree

5 files changed

+19
-23
lines changed

5 files changed

+19
-23
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ name = "saluki"
77
dynamic = ["version"]
88
dependencies = [
99
"ess-streaming-data-types",
10-
"confluent-kafka",
10+
"confluent-kafka>=2.12.1", # for produce_batch in play()
1111
"python-dateutil",
1212
"tzdata"
1313
]

src/saluki/consume.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def consume(
1616
offset: int | None = None,
1717
go_forwards: bool = False,
1818
schemas_to_filter_to: list[str] | None = None,
19+
timestamp: int|None = None,
1920
) -> None:
2021
"""
2122
consume from a topic and deserialise each message
@@ -27,6 +28,7 @@ def consume(
2728
:param offset: offset to consume from/to
2829
:param go_forwards: whether to consume forwards or backwards
2930
:param schemas_to_filter_to: schemas in messages to filter to
31+
:param timestamp: optionally a timestamp as a starting point
3032
:return: None
3133
"""
3234
c = Consumer(
@@ -41,6 +43,12 @@ def consume(
4143
}
4244
)
4345

46+
if timestamp is not None:
47+
offset = c.offsets_for_times(
48+
[TopicPartition(topic, partition, timestamp)]
49+
)[0].offset
50+
logger.debug(f"offset for timestamp {timestamp} is {offset}")
51+
4452
if go_forwards:
4553
if offset is None:
4654
raise ValueError("Can't go forwards without an offset")

src/saluki/main.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,13 @@ def main() -> None:
7171
required=False,
7272
default=1,
7373
)
74-
consumer_mode_parser.add_argument(
75-
"-o", "--offset", help="offset to consume from", type=int, required=False
76-
)
74+
7775
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
76+
cg = consumer_mode_parser.add_mutually_exclusive_group(required=False)
77+
cg.add_argument(
78+
"-o", "--offset", help="offset to consume from", type=int,
79+
)
80+
cg.add_argument("-t", "--timestamp", help="timestamp to consume from", type=dateutil_parsable_or_unix_timestamp)
7881

7982
listen_parser = sub_parsers.add_parser( # noqa: F841
8083
_LISTEN,
@@ -118,7 +121,7 @@ def main() -> None:
118121
elif args.command == _CONSUME:
119122
broker, topic = parse_kafka_uri(args.topic)
120123
consume(
121-
broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter
124+
broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter, args.timestamp
122125
)
123126
elif args.command == _PLAY:
124127
src_broker, src_topic = parse_kafka_uri(args.topics[0])

src/saluki/play.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22
import uuid
3-
from time import sleep
43
from confluent_kafka import Consumer, Producer, TopicPartition
54

65
logger = logging.getLogger("saluki")
@@ -17,6 +16,7 @@ def play(
1716
"""
1817
Replay data from src_topic to dest_topic between the offsets OR timestamps specified.
1918
This currently assumes contiguous data in a topic (ie. no log compaction) and uses partition 0.
19+
It also does not copy message timestamps.
2020
2121
:param src_broker: The source broker, including port.
2222
:param src_topic: The topic to replay data from.
@@ -62,27 +62,12 @@ def play(
6262

6363
num_messages = stop_offset.offset - start_offset.offset + 1
6464

65-
def delivery_report(err, msg):
66-
""" Called once for each message produced to indicate delivery result.
67-
Triggered by poll() or flush()."""
68-
if err is not None:
69-
logger.error('Message delivery failed: {}'.format(err))
70-
else:
71-
logger.debug('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
72-
7365
try:
7466
msgs = consumer.consume(num_messages)
7567
logger.debug(f"finished consuming {num_messages} messages")
7668
consumer.close()
77-
# logger.debug(f"{msgs}")
78-
for message in msgs:
79-
producer.poll(0)
80-
producer.produce(dest_topic, message.value(), message.key(), callback=delivery_report)
81-
# producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs])
82-
# producer.poll()
69+
producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs])
8370
logger.debug(f"flushing producer. len(p): {len(producer)}")
84-
# while len(producer): producer.flush()
85-
8671
producer.flush(timeout=10)
8772

8873
logger.debug(f"length after flushing: {len(producer)}")

src/saluki/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def dateutil_parsable_or_unix_timestamp(inp: str) -> int:
101101
try:
102102
try:
103103
return int(round(parse(inp).timestamp() * 1000))
104-
except ParserError:
104+
except (ParserError, OverflowError):
105105
logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp")
106106
return int(inp)
107107
except ValueError:

0 commit comments

Comments
 (0)