@@ -93,17 +93,15 @@ def main() -> None:
9393 # saluki dig - push data from dump generated by saluki bury to topic
9494 # saluki dig mybroker:9092/topicname -p 0 outputfile
9595
96- # saluki sniff - broker metadata ie. topic watermarks and num_messages.
97- # saluki sniff mybroker:9092
98-
9996 play_parser = sub_parsers .add_parser (
10097 _PLAY ,
10198 help = "replay mode - replay data into another topic" ,
102- parents = [topic_parser ],
99+ parents = [],
103100 )
104- play_parser .add_argument ("-o" , "--offset" , help = "offsets to replay between (inclusive)" , type = int , nargs = 2 )
105- play_parser .add_argument ("-t" , "--timestamp" , help = "timestamps to replay between" , type = str , nargs = 2 )
106-
101+ play_parser .add_argument ("topics" , type = str , nargs = 2 , help = "SRC topic DEST topic" )
102+ g = play_parser .add_mutually_exclusive_group (required = True )
103+ g .add_argument ("-o" , "--offsets" , help = "offsets to replay between (inclusive)" , type = int , nargs = 2 )
104+ g .add_argument ("-t" , "--timestamps" , help = "timestamps to replay between" , type = str , nargs = 2 )
107105
108106 if len (sys .argv ) == 1 :
109107 parser .print_help ()
@@ -127,8 +125,18 @@ def main() -> None:
127125 args .go_forwards ,
128126 )
129127 elif args .command == _PLAY :
130- pass
131- #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp)
128+ src_broker , src_topic = parse_kafka_uri (args .topics [0 ])
129+ dest_broker , dest_topic = parse_kafka_uri (args .topics [1 ])
130+
131+ print (f"SOURCE BROKER: { src_broker } , SOURCE TOPIC: { src_topic } , DEST BROKER: { dest_broker } , DEST TOPIC: { dest_topic } " )
132+ if args .offsets is not None :
133+ print (f"Replaying { src_broker } /{ src_topic } between offsets { args .offsets [0 ]} and { args .offsets [1 ]} to { dest_broker } /{ dest_topic } " )
134+ elif args .timestamps is not None :
135+ print (f"Replaying { src_broker } /{ src_topic } between timestamps { args .timestamps [0 ]} and { args .timestamps [1 ]} to { dest_broker } /{ dest_topic } " )
136+
137+ if input ("OK? (y/n)" ).lower () == 'y' :
138+ #play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp)
139+ print ("replayed" )
132140 elif args .command == _SNIFF :
133141 sniff (args .broker )
134142 elif args .command == _BURY :
0 commit comments