1414 RawBytesMessageMapper ,
1515 RawTextMessageMapper ,
1616)
17+ from csp .utils .datetime import utc_now
1718
1819from .kafka_utils import _precreate_topic
1920
@@ -117,14 +118,14 @@ def graph(count: int):
117118 csp .stop_engine (stop )
118119
119120 count = 5
120- results = csp .run (graph , count , starttime = datetime . utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )
121+ results = csp .run (graph , count , starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True )
121122 assert len (results ["sub_data" ]) >= 5
122123 print (results )
123124 for result in results ["sub_data" ]:
124125 assert result [1 ].mapped_partition >= 0
125126 assert result [1 ].mapped_offset >= 0
126127 assert result [1 ].mapped_live is not None
127- assert result [1 ].mapped_timestamp < datetime . utcnow ()
128+ assert result [1 ].mapped_timestamp < utc_now ()
128129 assert results ["sub_data" ][- 1 ][1 ].mapped_live
129130
130131 @pytest .mark .skipif (not os .environ .get ("CSP_TEST_KAFKA" ), reason = "Skipping kafka adapter tests" )
@@ -204,9 +205,7 @@ def graph(symbols: list, count: int):
204205
205206 symbols = ["AAPL" , "MSFT" ]
206207 count = 100
207- results = csp .run (
208- graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
209- )
208+ results = csp .run (graph , symbols , count , starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True )
210209 for symbol in symbols :
211210 pub = results [f"pall_{ symbol } " ]
212211 sub = results [f"sall_{ symbol } " ]
@@ -231,7 +230,7 @@ def pub_graph():
231230 csp .stop_engine (stop )
232231 # csp.print('pub', struct)
233232
234- csp .run (pub_graph , starttime = datetime . utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )
233+ csp .run (pub_graph , starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True )
235234
236235 # grab start/end times
237236 def get_times_graph ():
@@ -251,9 +250,7 @@ def get_times_graph():
251250 # csp.print('sub', data)
252251 # csp.print('status', kafkaadapter.status())
253252
254- all_data = csp .run (get_times_graph , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )[
255- "data"
256- ]
253+ all_data = csp .run (get_times_graph , starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True )["data" ]
257254 min_time = all_data [0 ][1 ].dt
258255
259256 def get_data (start_offset , expected_count ):
@@ -276,7 +273,7 @@ def get_data(start_offset, expected_count):
276273 get_data ,
277274 KafkaStartOffset .EARLIEST ,
278275 10 ,
279- starttime = datetime . utcnow (),
276+ starttime = utc_now (),
280277 endtime = timedelta (seconds = 30 ),
281278 realtime = True ,
282279 )["data" ]
@@ -288,7 +285,7 @@ def get_data(start_offset, expected_count):
288285 get_data ,
289286 KafkaStartOffset .LATEST ,
290287 1 ,
291- starttime = datetime . utcnow (),
288+ starttime = utc_now (),
292289 endtime = timedelta (seconds = 1 ),
293290 realtime = True ,
294291 )["data" ]
@@ -306,7 +303,7 @@ def get_data(start_offset, expected_count):
306303 stime = all_data [2 ][1 ].dt + timedelta (milliseconds = 1 )
307304 expected = [x for x in all_data if x [1 ].dt >= stime ]
308305 res = csp .run (
309- get_data , stime , len (expected ), starttime = datetime . utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
306+ get_data , stime , len (expected ), starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True
310307 )["data" ]
311308 assert len (res ) == len (expected )
312309
@@ -376,9 +373,7 @@ def graph(symbols: list, count: int):
376373
377374 symbols = ["AAPL" , "MSFT" ]
378375 count = 10
379- results = csp .run (
380- graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
381- )
376+ results = csp .run (graph , symbols , count , starttime = utc_now (), endtime = timedelta (seconds = 30 ), realtime = True )
382377 # print(results)
383378 for symbol in symbols :
384379 pub = results [f"pub_{ symbol } " ]
@@ -405,7 +400,7 @@ def graph_sub():
405400
406401 # With bug this would deadlock
407402 with pytest .raises (RuntimeError ):
408- csp .run (graph_sub , starttime = datetime . utcnow (), endtime = timedelta (seconds = 2 ), realtime = True )
403+ csp .run (graph_sub , starttime = utc_now (), endtime = timedelta (seconds = 2 ), realtime = True )
409404 kafkaadapter2 = KafkaAdapterManager (** kafkaadapterkwargs )
410405
411406 def graph_pub ():
@@ -414,7 +409,7 @@ def graph_pub():
414409
415410 # With bug this would deadlock
416411 with pytest .raises (RuntimeError ):
417- csp .run (graph_pub , starttime = datetime . utcnow (), endtime = timedelta (seconds = 2 ), realtime = True )
412+ csp .run (graph_pub , starttime = utc_now (), endtime = timedelta (seconds = 2 ), realtime = True )
418413
419414 @pytest .mark .skipif (not os .environ .get ("CSP_TEST_KAFKA" ), reason = "Skipping kafka adapter tests" )
420415 def test_invalid_broker (self , kafkaadapterkwargs ):
@@ -434,7 +429,7 @@ def graph_sub():
434429
435430 # With bug this would deadlock
436431 with pytest .raises (RuntimeError ):
437- csp .run (graph_sub , starttime = datetime . utcnow (), endtime = timedelta (seconds = 2 ), realtime = True )
432+ csp .run (graph_sub , starttime = utc_now (), endtime = timedelta (seconds = 2 ), realtime = True )
438433
439434 kafkaadapter2 = KafkaAdapterManager (** dict_with_broker )
440435
@@ -444,7 +439,7 @@ def graph_pub():
444439
445440 # With bug this would deadlock
446441 with pytest .raises (RuntimeError ):
447- csp .run (graph_pub , starttime = datetime . utcnow (), endtime = timedelta (seconds = 2 ), realtime = True )
442+ csp .run (graph_pub , starttime = utc_now (), endtime = timedelta (seconds = 2 ), realtime = True )
448443
449444 @pytest .mark .skipif (not os .environ .get ("CSP_TEST_KAFKA" ), reason = "Skipping kafka adapter tests" )
450445 def test_meta_field_map_tick_timestamp_from_field (self , kafkaadapterkwargs ):
@@ -464,7 +459,7 @@ def graph_sub():
464459 )
465460
466461 with pytest .raises (ValueError ):
467- csp .run (graph_sub , starttime = datetime . utcnow (), endtime = timedelta (seconds = 2 ), realtime = True )
462+ csp .run (graph_sub , starttime = utc_now (), endtime = timedelta (seconds = 2 ), realtime = True )
468463
469464 @pytest .mark .skipif (not os .environ .get ("CSP_TEST_KAFKA" ), reason = "Skipping kafka adapter tests" )
470465 def test_conf_options (self ):
@@ -499,7 +494,7 @@ def pub_graph():
499494 stop = csp .filter (stop , stop )
500495 csp .stop_engine (stop )
501496
502- csp .run (pub_graph , starttime = datetime . utcnow (), endtime = timedelta (seconds = 5 ), realtime = True )
497+ csp .run (pub_graph , starttime = utc_now (), endtime = timedelta (seconds = 5 ), realtime = True )
503498
504499 def sub_graph ():
505500 kafkaadapter = KafkaAdapterManager (broker = kafkabroker , start_offset = KafkaStartOffset .EARLIEST )
@@ -526,7 +521,7 @@ def sub_graph():
526521 stop = csp .and_ (* stop_flags )
527522 csp .stop_engine (csp .filter (stop , stop ))
528523
529- res = csp .run (sub_graph , starttime = datetime . utcnow (), endtime = timedelta (seconds = 5 ), realtime = True )
524+ res = csp .run (sub_graph , starttime = utc_now (), endtime = timedelta (seconds = 5 ), realtime = True )
530525 burst = res ["burst" ]
531526 assert len (burst ) == 1
532527 assert isinstance (burst [0 ][1 ], list )
0 commit comments