Skip to content

Commit 2f14433

Browse files
committed
add skeleton for play()
1 parent 7470ef5 commit 2f14433

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

src/saluki/consume.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def consume(
2929
c = Consumer(
3030
{
3131
"bootstrap.servers": broker,
32-
"group.id": "saluki",
32+
"group.id": "saluki-consume",
3333
"session.timeout.ms": 6000,
3434
"auto.offset.reset": "latest",
3535
"enable.auto.offset.store": False,

src/saluki/main.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from saluki.consume import consume
66
from saluki.listen import listen
7+
from saluki.play import play
78
from saluki.sniff import sniff
89
from saluki.utils import parse_kafka_uri
910

@@ -88,7 +89,8 @@ def main() -> None:
8889
# saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops#
8990

9091
# saluki bury - dump data on topic to file
91-
# saluki bury mybroker:9092/topicname -p 0 -f offsetortimestamp -t offsetortimestamp outputfile
92+
# saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile
93+
# saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile
9294

9395
# saluki dig - push data from dump generated by saluki bury to topic
9496
# saluki dig mybroker:9092/topicname -p 0 outputfile
@@ -102,6 +104,7 @@ def main() -> None:
102104
g = play_parser.add_mutually_exclusive_group(required=True)
103105
g.add_argument("-o", "--offsets", help="offsets to replay between (inclusive)", type=int, nargs=2)
104106
g.add_argument("-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2)
107+
g.add_argument("-c", "--chunk", help="forward in chunks. ie to avoid storing a huge list in memory", default=0, type=int, required=False)
105108

106109
if len(sys.argv) == 1:
107110
parser.print_help()
@@ -135,7 +138,7 @@ def main() -> None:
135138
print(f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}")
136139

137140
if input("OK? (y/n)").lower() == 'y':
138-
#play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp)
141+
play(src_broker, src_topic, dest_broker, dest_topic, args.offsets, args.timestamps, args.chunks)
139142
print("replayed")
140143
elif args.command == _SNIFF:
141144
sniff(args.broker)

src/saluki/play.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from confluent_kafka import Consumer, Producer
2+
3+
4+
def play(src_broker: str, src_topic: str, dest_broker: str, dest_topic: str, offsets:list[int]|None, timestamps:list[int]|None, chunks:int) -> None:
5+
consumer = Consumer( {
6+
"bootstrap.servers": src_broker,
7+
"group.id": "saluki-play",
8+
})
9+
producer = Producer({
10+
"bootstrap.servers": dest_broker,
11+
})
12+
pass

0 commit comments

Comments
 (0)