11import sys
22import os
33
4+
45sys .path .append (os .path .abspath (os .path .join (os .path .dirname (__file__ ), "../src" )))
56
7+ from cascade .dataflow .operator import StatefulOperator , StatelessOperator
68from cascade .dataflow .dataflow import Event , InitClass , InvokeMethod , OpNode
79from cascade .dataflow .optimization .dead_node_elim import dead_node_elimination
810from cascade .runtime .python_runtime import PythonClientSync , PythonRuntime
1113from deathstar_movie_review .entities .movie import MovieId , movie_id_op , movie_info_op , plot_op
1214from deathstar_movie_review .entities .frontend import frontend_op , text_op , unique_id_op , frontend_df_serial
1315
16+ import cascade
1417
15-
16- def test_deathstar_movie_demo_python ():
17- print ("starting" )
18+ def init_python_runtime () -> tuple [PythonRuntime , PythonClientSync ]:
1819 runtime = PythonRuntime ()
20+ for op in cascade .core .operators .values ():
21+ if isinstance (op , StatefulOperator ):
22+ runtime .add_operator (op )
23+ elif isinstance (op , StatelessOperator ):
24+ runtime .add_stateless_operator (op )
1925
20- # make sure we're running the serial version
21- prev_df = frontend_op .dataflow
22- frontend_op .dataflow = frontend_df_serial ()
23-
26+ runtime .run ()
27+ return runtime , PythonClientSync (runtime )
2428
25- print (frontend_op .dataflow .to_dot ())
26- dead_node_elimination ([], [frontend_op ])
27- print (frontend_op .dataflow .to_dot ())
29+ import time
30+ def test_deathstar_movie_demo_python ():
31+ print ("starting" )
32+ cascade .core .clear ()
33+ exec (f'import deathstar_movie_review.entities.entities' )
34+ cascade .core .init ()
2835
29- runtime .add_operator (compose_review_op )
30- runtime .add_operator (user_op )
31- runtime .add_operator (movie_info_op )
32- runtime .add_operator (movie_id_op )
33- runtime .add_operator (plot_op )
34- runtime .add_stateless_operator (frontend_op )
35- runtime .add_stateless_operator (unique_id_op )
36- runtime .add_stateless_operator (text_op )
36+ runtime , client = init_python_runtime ()
37+ user_op = cascade .core .operators ["User" ]
38+ compose_op = cascade .core .operators ["ComposeReview" ]
39+ movie_op = cascade .core .operators ["MovieId" ]
40+ frontend_op = cascade .core .operators ["Frontend" ]
3741
38- runtime . run ()
39- client = PythonClientSync ( runtime )
42+ for df in cascade . core . dataflows . values ():
43+ print ( df . to_dot () )
4044
41- init_user = OpNode (User , InitClass (), read_key_from = "username" )
42- username = "username_1"
45+ username = "myUsername"
4346 user_data = {
4447 "userId" : "user1" ,
4548 "FirstName" : "firstname" ,
@@ -48,27 +51,32 @@ def test_deathstar_movie_demo_python():
4851 "Password" : "****" ,
4952 "Salt" : "salt"
5053 }
54+
5155 print ("testing user create" )
52- event = Event (init_user , {"username" : username , "user_data" : user_data }, None )
56+
57+ event = user_op .dataflows ["__init__" ].generate_event ({"username" : username , "user_data" : user_data }, username )
5358 result = client .send (event )
54- assert isinstance (result , User ) and result .username == username
59+ print (result )
60+ assert result .username == username
5561
5662 print ("testing compose review" )
57- req_id = 1
63+ req_id = "1"
5864 movie_title = "Cars 2"
5965 movie_id = 1
6066
6167 # make the review
62- init_compose_review = OpNode (ComposeReview , InitClass (), read_key_from = "req_id" )
63- event = Event (init_compose_review , {"req_id" : req_id }, None )
68+ event = compose_op .dataflows ["__init__" ].generate_event ({"req_id" : req_id }, req_id )
6469 result = client .send (event )
6570 print ("review made" )
6671
6772
68- # make the movie
69- init_movie = OpNode (MovieId , InitClass (), read_key_from = "title" )
70- event = Event (init_movie , {"title" : movie_title , "movie_id" : movie_id }, None )
73+
74+ # # make the movie
75+ # init_movie = OpNode(MovieId, InitClass(), read_key_from="title")
76+ event = movie_op .dataflows ["__init__" ].generate_event ({"title" : movie_title , "movie_id" : movie_id }, movie_title )
7177 result = client .send (event )
78+ # event = Event(init_movie, {"title": movie_title, "movie_id": movie_id}, None)
79+ # result = client.send(event)
7280 print ("movie made" )
7381
7482 # compose the review
@@ -80,11 +88,11 @@ def test_deathstar_movie_demo_python():
8088 "text" : "good movie!"
8189 }
8290
83- event = Event (
84- frontend_op . dataflow . entry ,
85- review_data ,
86- frontend_op . dataflow )
87- result = client . send ( event )
91+ r_data = { r + "_0" : v for r , v in review_data . items ()}
92+
93+ event = frontend_op . dataflows [ "compose" ]. generate_event ( r_data )
94+ result = client . send ( event , block = False )
95+
8896 print (result )
8997 print ("review composed" )
9098
@@ -96,19 +104,19 @@ def test_deathstar_movie_demo_python():
96104 {"req_id" : req_id },
97105 None
98106 )
107+ event = compose_op .dataflows ["get_data" ].generate_event ({"req_id" : req_id }, req_id )
99108 result = client .send (event )
109+ print (result )
110+ print (runtime .statefuloperators ["ComposeReview" ].states ["1" ].review_data )
111+ # time.sleep(0.5)
112+
113+ # result = client.send(event)
100114 expected = {
101115 "userId" : user_data ["userId" ],
102116 "movieId" : movie_id ,
103117 "text" : review_data ["text" ]
104118 }
105- print (result , expected )
119+ # print(result, expected)
106120 assert "review_id" in result
107121 del result ["review_id" ] # randomly generated
108- assert result == expected
109-
110- print ("Success!" )
111-
112- # put the df back
113- frontend_op .dataflow = prev_df
114-
122+ assert result == expected
0 commit comments