Skip to content

Commit 01299d8

Browse files
committed
play working for offsets
1 parent 1394513 commit 01299d8

File tree

3 files changed

+43
-20
lines changed

3 files changed

+43
-20
lines changed

src/saluki/listen.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def listen(
2323
c = Consumer(
2424
{
2525
"bootstrap.servers": broker,
26-
"group.id": "saluki",
26+
"group.id": "saluki-listen",
2727
"auto.offset.reset": "latest",
2828
"enable.auto.commit": False,
2929
}

src/saluki/main.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,6 @@ def main() -> None:
9393
# saluki consume x messages of y or z schema
9494
# saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops#
9595

96-
# saluki bury - dump data on topic to file
97-
# saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile
98-
# saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile
99-
100-
# saluki dig - push data from dump generated by saluki bury to topic
101-
# saluki dig mybroker:9092/topicname -p 0 outputfile
102-
10396
play_parser = sub_parsers.add_parser(
10497
_PLAY,
10598
help="replay mode - replay data into another topic",
@@ -117,14 +110,6 @@ def main() -> None:
117110
g.add_argument(
118111
"-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2
119112
)
120-
g.add_argument(
121-
"-c",
122-
"--chunk",
123-
help="forward in chunks. ie to avoid storing a huge list in memory",
124-
default=0,
125-
type=int,
126-
required=False,
127-
)
128113

129114
if len(sys.argv) == 1:
130115
parser.print_help()
@@ -171,7 +156,6 @@ def main() -> None:
171156
dest_topic,
172157
args.offsets,
173158
args.timestamps,
174-
args.chunks,
175159
)
176160
print("replayed")
177161
elif args.command == _SNIFF:

src/saluki/play.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from confluent_kafka import Consumer, Producer
1+
import logging
2+
from confluent_kafka import Consumer, Producer, TopicPartition
3+
4+
logger = logging.getLogger("saluki")
25

36

47
def play(
@@ -8,8 +11,21 @@ def play(
811
dest_topic: str,
912
offsets: list[int] | None,
1013
timestamps: list[int] | None,
11-
chunks: int,
1214
) -> None:
15+
"""
16+
Replay data from src_topic to dest_topic between the offsets OR timestamps specified.
17+
This currently assumes contiguous data in a topic (ie. no log compaction) and only uses partition 0.
18+
19+
:param src_broker: The source broker, including port.
20+
:param src_topic: The topic to replay data from.
21+
:param dest_broker: The destination broker, including port.
22+
:param dest_topic: The topic to replay data to.
23+
:param offsets: The start and finish offsets to replay data from.
24+
:param timestamps: The start and finish timestamps to replay data from.
25+
"""
26+
27+
print(f"ARGS: {src_broker}, {src_topic}, {dest_broker}, {dest_topic}, {offsets}, {timestamps}")
28+
1329
consumer = Consumer(
1430
{
1531
"bootstrap.servers": src_broker,
@@ -21,4 +37,27 @@ def play(
2137
"bootstrap.servers": dest_broker,
2238
}
2339
)
24-
pass
40+
src_partition = 0
41+
42+
if timestamps is not None:
43+
start_offset, stop_offset = consumer.offsets_for_times([
44+
TopicPartition(src_topic, src_partition, timestamps[0]),
45+
TopicPartition(src_topic, src_partition, timestamps[1]),
46+
])
47+
else:
48+
start_offset = TopicPartition(src_topic, src_partition, offsets[0])
49+
stop_offset = TopicPartition(src_topic, src_partition, offsets[1])
50+
consumer.assign([start_offset])
51+
52+
num_messages = stop_offset.offset - start_offset.offset + 1
53+
54+
try:
55+
msgs = consumer.consume(num_messages)
56+
[producer.produce(dest_topic, message.value(), message.key()) for message in msgs]
57+
producer.flush()
58+
except Exception:
59+
logger.exception("Got exception while consuming:")
60+
finally:
61+
logger.debug(f"Closing consumer {consumer}")
62+
consumer.close()
63+

0 commit comments

Comments
 (0)