33import os
44import time
55import csv
6- from confluent_kafka import Producer
76from timeit import default_timer as timer
87from multiprocessing import Pool
98
2019
2120
2221class DeathstarDemo ():
23- def __init__ (self , input_topic , output_topic ):
22+ def __init__ (self ):
2423 self .init_user = OpNode (User , InitClass (), read_key_from = "user_id" )
2524 self .init_hotel = OpNode (Hotel , InitClass (), read_key_from = "key" )
2625 self .init_flight = OpNode (Flight , InitClass (), read_key_from = "id" )
27- self .runtime = FlinkRuntime (input_topic , output_topic )
2826
29- def init_runtime (self ):
30- self .runtime .init (bundle_time = 100 , bundle_size = 1000 )
31- self .runtime .add_operator (FlinkOperator (hotel_op ))
32- self .runtime .add_operator (FlinkOperator (flight_op ))
33- self .runtime .add_operator (FlinkOperator (user_op ))
34- self .runtime .add_stateless_operator (FlinkStatelessOperator (search_op ))
35- self .runtime .add_stateless_operator (FlinkStatelessOperator (recommend_op ))
27+ def init_runtime (self , runtime , ** kwargs ):
28+ self .runtime = runtime
29+ self .runtime .init (** kwargs )
30+ self .runtime .add_operator (hotel_op )
31+ self .runtime .add_operator (flight_op )
32+ self .runtime .add_operator (user_op )
33+ self .runtime .add_stateless_operator (search_op )
34+ self .runtime .add_stateless_operator (recommend_op )
3635
3736
3837 def populate (self ):
@@ -176,87 +175,81 @@ def populate(self):
176175 }, None )
177176 self .runtime .send (event , flush = True )
178177
179- class DeathstarClient :
180- def __init__ (self , input_topic = "input-topic" , output_topic = "output-topic" , kafka_broker = "localhost:9092" ):
181- self .client = FlinkClientSync (input_topic , output_topic , kafka_broker , True )
182- self .producer = Producer ({'bootstrap.servers' : kafka_broker })
183-
184- def send (self , event : Event , flush : bool = False ):
185- return self .client .send (event , flush )
186-
187- def search_hotel (self ):
188- in_date = random .randint (9 , 23 )
189- out_date = random .randint (in_date + 1 , 24 )
190-
191- if in_date < 10 :
192- in_date_str = f"2015-04-0{ in_date } "
178+ def search_hotel ():
179+ in_date = random .randint (9 , 23 )
180+ out_date = random .randint (in_date + 1 , 24 )
181+
182+ if in_date < 10 :
183+ in_date_str = f"2015-04-0{ in_date } "
184+ else :
185+ in_date_str = f"2015-04-{ in_date } "
186+ if out_date < 10 :
187+ out_date_str = f"2015-04-0{ out_date } "
188+ else :
189+ out_date_str = f"2015-04-{ out_date } "
190+
191+ lat = 38.0235 + (random .randint (0 , 481 ) - 240.5 ) / 1000.0
192+ lon = - 122.095 + (random .randint (0 , 325 ) - 157.0 ) / 1000.0
193+
194+ # We don't really use the in_date, out_date information
195+ return Event (search_op .dataflow .entry , {"lat" : lat , "lon" : lon }, search_op .dataflow )
196+
197+ def recommend (req_param = None ):
198+ if req_param is None :
199+ coin = random .random ()
200+ if coin < 0.5 :
201+ req_param = "distance"
193202 else :
194- in_date_str = f"2015-04-{ in_date } "
195- if out_date < 10 :
196- out_date_str = f"2015-04-0{ out_date } "
197- else :
198- out_date_str = f"2015-04-{ out_date } "
199-
200- lat = 38.0235 + (random .randint (0 , 481 ) - 240.5 ) / 1000.0
201- lon = - 122.095 + (random .randint (0 , 325 ) - 157.0 ) / 1000.0
203+ req_param = "price"
202204
203- # We don't really use the in_date, out_date information
204- return Event ( search_op . dataflow . entry , { "lat" : lat , "lon" : lon }, search_op . dataflow )
205+ lat = 38.0235 + ( random . randint ( 0 , 481 ) - 240.5 ) / 1000.0
206+ lon = - 122.095 + ( random . randint ( 0 , 325 ) - 157.0 ) / 1000.0
205207
206- def recommend (self , req_param = None ):
207- if req_param is None :
208- coin = random .random ()
209- if coin < 0.5 :
210- req_param = "distance"
211- else :
212- req_param = "price"
213208
214- lat = 38.0235 + (random .randint (0 , 481 ) - 240.5 ) / 1000.0
215- lon = - 122.095 + (random .randint (0 , 325 ) - 157.0 ) / 1000.0
209+ return Event (recommend_op .dataflow .entry , {"requirement" : req_param , "lat" : lat , "lon" : lon }, recommend_op .dataflow )
216210
217- return Event (recommend_op .dataflow .entry , {"requirement" : req_param , "lat" : lat , "lon" : lon }, recommend_op .dataflow )
211+ def user_login (succesfull = True ):
212+ user_id = random .randint (0 , 500 )
213+ username = f"Cornell_{ user_id } "
214+ password = str (user_id ) * 10 if succesfull else ""
215+ return Event (OpNode (User , InvokeMethod ("login" ), read_key_from = "user_key" ), {"user_key" : username , "password" : password }, None )
218216
219- def user_login (self ):
220- user_id = random .randint (0 , 500 )
221- username = f"Cornell_{ user_id } "
222- password = str (user_id ) * 10
223- return Event (OpNode (User , InvokeMethod ("login" ), read_key_from = "user_key" ), {"user_key" : username , "password" : password }, None )
224217
225- def reserve (self ):
226- hotel_id = random .randint (0 , 99 )
227- flight_id = random .randint (0 , 99 )
228-
229- # user = User("user1", "pass")
230- # user.order(flight, hotel)
231- user_id = "Cornell_" + str (random .randint (0 , 500 ))
232-
233- return Event (
234- user_op .dataflows ["order" ].entry ,
235- {
236- "user_key" : user_id ,
237- "flight_key" : str (flight_id ),
238- "hotel_key" : str (hotel_id )
239- },
240- user_op .dataflows ["order" ])
241-
242- def deathstar_workload_generator (self ):
243- search_ratio = 0.6
244- recommend_ratio = 0.39
245- user_ratio = 0.005
246- reserve_ratio = 0.005
247- c = 0
248- while True :
249- coin = random .random ()
250- if coin < search_ratio :
251- yield self .search_hotel ()
252- elif coin < search_ratio + recommend_ratio :
253- yield self .recommend ()
254- elif coin < search_ratio + recommend_ratio + user_ratio :
255- yield self .user_login ()
256- else :
257- yield self .reserve ()
258- c += 1
218+ def reserve ():
219+ hotel_id = random .randint (0 , 99 )
220+ flight_id = random .randint (0 , 99 )
259221
222+ # user = User("user1", "pass")
223+ # user.order(flight, hotel)
224+ user_id = "Cornell_" + str (random .randint (0 , 500 ))
225+
226+ return Event (
227+ user_op .dataflows ["order" ].entry ,
228+ {
229+ "user_key" : user_id ,
230+ "flight_key" : str (flight_id ),
231+ "hotel_key" : str (hotel_id )
232+ },
233+ user_op .dataflows ["order" ])
234+
235+ def deathstar_workload_generator ():
236+ search_ratio = 0.6
237+ recommend_ratio = 0.39
238+ user_ratio = 0.005
239+ reserve_ratio = 0.005
240+ c = 0
241+ while True :
242+ coin = random .random ()
243+ if coin < search_ratio :
244+ yield search_hotel ()
245+ elif coin < search_ratio + recommend_ratio :
246+ yield recommend ()
247+ elif coin < search_ratio + recommend_ratio + user_ratio :
248+ yield user_login ()
249+ else :
250+ yield reserve ()
251+ c += 1
252+
260253threads = 1
261254messages_per_second = 10
262255sleeps_per_second = 10
@@ -266,8 +259,8 @@ def deathstar_workload_generator(self):
266259
267260def benchmark_runner (proc_num ) -> dict [int , dict ]:
268261 print (f'Generator: { proc_num } starting' )
269- client = DeathstarClient ("deathstar" , "ds-out" )
270- deathstar_generator = client . deathstar_workload_generator ()
262+ client = FlinkClientSync ("deathstar" , "ds-out" , "localhost:9092" , True )
263+ deathstar_generator = deathstar_workload_generator ()
271264 # futures: dict[int, dict] = {}
272265 start = timer ()
273266 for _ in range (seconds ):
@@ -281,8 +274,8 @@ def benchmark_runner(proc_num) -> dict[int, dict]:
281274 # params = event.variable_map
282275 client .send (event )
283276 # futures[event._id] = {"event": f'{func_name} {key}->{params}'}
284-
285- # styx .flush()
277+
278+ client .flush ()
286279 sec_end = timer ()
287280 lps = sec_end - sec_start
288281 if lps < 1 :
@@ -298,14 +291,14 @@ def benchmark_runner(proc_num) -> dict[int, dict]:
298291 done = False
299292 while not done :
300293 done = True
301- for event_id , fut in client .client . _futures .items ():
294+ for event_id , fut in client ._futures .items ():
302295 result = fut ["ret" ]
303296 if result is None :
304297 done = False
305298 time .sleep (0.5 )
306299 break
307- futures = client .client . _futures
308- client .client . close ()
300+ futures = client ._futures
301+ client .close ()
309302 return futures
310303
311304
@@ -318,7 +311,7 @@ def write_dict_to_csv(futures_dict, filename):
318311 filename (str): The name of the CSV file to write to.
319312 """
320313 # Define the column headers
321- headers = ["event_id" , "sent" , "sent_t" , "ret" , "ret_t" ]
314+ headers = ["event_id" , "sent" , "sent_t" , "ret" , "ret_t" , "latency" ]
322315
323316 # Open the file for writing
324317 with open (filename , mode = 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
@@ -335,13 +328,14 @@ def write_dict_to_csv(futures_dict, filename):
335328 "sent" : event_data .get ("sent" ),
336329 "sent_t" : event_data .get ("sent_t" ),
337330 "ret" : event_data .get ("ret" ),
338- "ret_t" : event_data .get ("ret_t" )
331+ "ret_t" : event_data .get ("ret_t" ),
332+ "latency" : event_data ["ret_t" ][1 ] - event_data ["sent_t" ][1 ]
339333 }
340334 writer .writerow (row )
341335
342336def main ():
343- ds = DeathstarDemo ("deathstar" , "ds-out" )
344- ds .init_runtime ()
337+ ds = DeathstarDemo ()
338+ ds .init_runtime (FlinkRuntime ( "deathstar" , "ds-out" ), bundle_time = 5 , bundle_size = 10 )
345339 ds .runtime .run (run_async = True )
346340 ds .populate ()
347341
0 commit comments