Skip to content

Commit 838c59c

Browse files
authored
Merge pull request #11 from delftdata/dependency_graph
Rework IR, add automatic parallelization
2 parents 8ff4224 + fc7900a commit 838c59c

File tree

104 files changed

+4991
-2817
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+4991
-2817
lines changed

.gitignore

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ __pycache__
88
*.egg-info
99
build
1010

11+
.vscode/
12+
1113
# Experiment artifacts
1214
*.png
13-
*.pkl
15+
*.pkl
16+
*.csv
17+
nohup.out
18+
*.zip

.vscode/launch.json

Lines changed: 0 additions & 17 deletions
This file was deleted.

.vscode/settings.json

Lines changed: 0 additions & 8 deletions
This file was deleted.

deathstar_hotel_reservation/demo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def user_login_workload_generator():
268268
def benchmark_runner(proc_num) -> dict[int, dict]:
269269
print(f'Generator: {proc_num} starting')
270270
client = FlinkClientSync("deathstar", "ds-out", "localhost:9092", True)
271-
deathstar_generator = user_login_workload_generator()
271+
deathstar_generator = deathstar_workload_generator()
272272
start = timer()
273273

274274
for _ in range(bursts):
Lines changed: 99 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,100 @@
11

2-
import os
3-
import sys
4-
5-
# import cascade
6-
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))
7-
8-
from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime
9-
from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime
10-
from deathstar_hotel_reservation.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login
11-
import time
12-
import pytest
13-
14-
@pytest.mark.integration
15-
def test_deathstar_demo():
16-
ds = DeathstarDemo()
17-
ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out"))
18-
ds.runtime.run(run_async=True)
19-
print("Populating, press enter to go to the next step when done")
20-
ds.populate()
21-
22-
client = FlinkClientSync("deathstardemo-test", "dsd-out")
23-
input()
24-
print("testing user login")
25-
event = user_login()
26-
client.send(event)
27-
28-
input()
29-
print("testing reserve")
30-
event = reserve()
31-
client.send(event)
32-
33-
input()
34-
print("testing search")
35-
event = search_hotel()
36-
client.send(event)
37-
38-
input()
39-
print("testing recommend (distance)")
40-
time.sleep(0.5)
41-
event = recommend(req_param="distance")
42-
client.send(event)
43-
44-
input()
45-
print("testing recommend (price)")
46-
time.sleep(0.5)
47-
event = recommend(req_param="price")
48-
client.send(event)
49-
50-
print(client._futures)
51-
input()
52-
print("done!")
53-
print(client._futures)
54-
55-
def test_deathstar_demo_python():
56-
ds = DeathstarDemo()
57-
ds.init_runtime(PythonRuntime())
58-
ds.runtime.run()
59-
print("Populating, press enter to go to the next step when done")
60-
ds.populate()
61-
62-
time.sleep(0.1)
63-
64-
client = PythonClientSync(ds.runtime)
65-
print("testing user login")
66-
event = user_login()
67-
result = client.send(event)
68-
assert result == True
69-
event = user_login(succesfull=False)
70-
result = client.send(event)
71-
assert result == False
72-
73-
print("testing reserve")
74-
event = reserve()
75-
result = client.send(event)
76-
assert result == True
77-
78-
return
79-
print("testing search")
80-
event = search_hotel()
81-
result = client.send(event)
82-
print(result)
83-
84-
print("testing recommend (distance)")
85-
time.sleep(0.5)
86-
event = recommend(req_param="distance")
87-
result = client.send(event)
88-
print(result)
89-
90-
print("testing recommend (price)")
91-
time.sleep(0.5)
92-
event = recommend(req_param="price")
93-
result = client.send(event)
94-
print(result)
95-
96-
print("done!")
97-
98-
99-
if __name__ == "__main__":
100-
test_deathstar_demo()
2+
# import os
3+
# import sys
4+
5+
# # import cascade
6+
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))
7+
8+
# from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime
9+
# from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime
10+
# from deathstar_hotel_reservation.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login
11+
# import time
12+
# import pytest
13+
14+
# @pytest.mark.integration
15+
# def test_deathstar_demo():
16+
# ds = DeathstarDemo()
17+
# ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out"))
18+
# ds.runtime.run(run_async=True)
19+
# print("Populating, press enter to go to the next step when done")
20+
# ds.populate()
21+
22+
# client = FlinkClientSync("deathstardemo-test", "dsd-out")
23+
# input()
24+
# print("testing user login")
25+
# event = user_login()
26+
# client.send(event)
27+
28+
# input()
29+
# print("testing reserve")
30+
# event = reserve()
31+
# client.send(event)
32+
33+
# input()
34+
# print("testing search")
35+
# event = search_hotel()
36+
# client.send(event)
37+
38+
# input()
39+
# print("testing recommend (distance)")
40+
# time.sleep(0.5)
41+
# event = recommend(req_param="distance")
42+
# client.send(event)
43+
44+
# input()
45+
# print("testing recommend (price)")
46+
# time.sleep(0.5)
47+
# event = recommend(req_param="price")
48+
# client.send(event)
49+
50+
# print(client._futures)
51+
# input()
52+
# print("done!")
53+
# print(client._futures)
54+
55+
# def test_deathstar_demo_python():
56+
# ds = DeathstarDemo()
57+
# ds.init_runtime(PythonRuntime())
58+
# ds.runtime.run()
59+
# print("Populating, press enter to go to the next step when done")
60+
# ds.populate()
61+
62+
# time.sleep(0.1)
63+
64+
# client = PythonClientSync(ds.runtime)
65+
# print("testing user login")
66+
# event = user_login()
67+
# result = client.send(event)
68+
# assert result == True
69+
# event = user_login(succesfull=False)
70+
# result = client.send(event)
71+
# assert result == False
72+
73+
# print("testing reserve")
74+
# event = reserve()
75+
# result = client.send(event)
76+
# assert result == True
77+
78+
# return
79+
# print("testing search")
80+
# event = search_hotel()
81+
# result = client.send(event)
82+
# print(result)
83+
84+
# print("testing recommend (distance)")
85+
# time.sleep(0.5)
86+
# event = recommend(req_param="distance")
87+
# result = client.send(event)
88+
# print(result)
89+
90+
# print("testing recommend (price)")
91+
# time.sleep(0.5)
92+
# event = recommend(req_param="price")
93+
# result = client.send(event)
94+
# print(result)
95+
96+
# print("done!")
97+
98+
99+
# if __name__ == "__main__":
100+
# test_deathstar_demo()

deathstar_movie_review/demo.py

Lines changed: 23 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from typing import Literal
2+
import cascade
3+
from cascade.dataflow.dataflow import DataflowRef
24
from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination
5+
from cascade.dataflow.optimization.parallelization import parallelize_until_if
36
from cascade.runtime.flink_runtime import FlinkRuntime
4-
5-
from .entities.user import user_op
6-
from .entities.compose_review import compose_review_op
7-
from .entities.frontend import frontend_df_parallel, frontend_df_serial, frontend_op, text_op, unique_id_op
8-
from .entities.movie import movie_id_op, movie_info_op, plot_op
7+
from tests.integration.flink.utils import create_topics, init_flink_runtime
98

109
import os
11-
from confluent_kafka.admin import AdminClient, NewTopic
1210

1311
KAFKA_BROKER = "localhost:9092"
1412
KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different
@@ -17,67 +15,32 @@
1715
OUT_TOPIC = "ds-movie-out"
1816
INTERNAL_TOPIC = "ds-movie-internal"
1917

20-
EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = os.getenv("EXPERIMENT", "baseline")
21-
22-
def create_topics(*required_topics):
23-
conf = {
24-
"bootstrap.servers": KAFKA_BROKER
25-
}
26-
27-
admin_client = AdminClient(conf)
28-
29-
# Fetch existing topics
30-
existing_topics = admin_client.list_topics(timeout=5).topics.keys()
31-
32-
# Find missing topics
33-
missing_topics = [topic for topic in required_topics if topic not in existing_topics]
34-
35-
if missing_topics:
36-
print(f"Creating missing topics: {missing_topics}")
37-
38-
# Define new topics (default: 1 partition, replication factor 1)
39-
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in missing_topics]
40-
41-
# Create topics
42-
futures = admin_client.create_topics(new_topics)
43-
44-
# Wait for topic creation to complete
45-
for topic, future in futures.items():
46-
try:
47-
future.result() # Block until the operation is complete
48-
print(f"Topic '{topic}' created successfully")
49-
except Exception as e:
50-
print(f"Failed to create topic '{topic}': {e}")
51-
else:
52-
print("All required topics exist.")
18+
EXPERIMENT: Literal["baseline", "parallel"] = os.getenv("EXPERIMENT", "baseline")
5319

5420

5521
def main():
5622
create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC)
5723

58-
runtime = FlinkRuntime(IN_TOPIC, OUT_TOPIC, internal_topic=INTERNAL_TOPIC)
59-
runtime.init(kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10)
60-
61-
if EXPERIMENT == "baseline":
62-
frontend_op.dataflow = frontend_df_serial()
63-
elif EXPERIMENT == "pipelined":
64-
frontend_op.dataflow = frontend_df_serial()
65-
dead_node_elimination([], [frontend_op])
66-
elif EXPERIMENT == "parallel":
67-
frontend_op.dataflow = frontend_df_parallel()
68-
69-
print(frontend_op.dataflow.to_dot())
24+
runtime = init_flink_runtime("deathstar_movie_review.entities.entities", IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC, kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10, thread_mode=True, parallelism=None)
25+
7026
print(f"Creating dataflow [{EXPERIMENT}]")
7127

72-
runtime.add_operator(compose_review_op)
73-
runtime.add_operator(user_op)
74-
runtime.add_operator(movie_info_op)
75-
runtime.add_operator(movie_id_op)
76-
runtime.add_operator(plot_op)
77-
runtime.add_stateless_operator(frontend_op)
78-
runtime.add_stateless_operator(unique_id_op)
79-
runtime.add_stateless_operator(text_op)
80-
28+
# for parallel experiment
29+
df_baseline = cascade.core.dataflows[DataflowRef("Frontend", "compose")]
30+
df_parallel, _ = parallelize_until_if(df_baseline)
31+
df_parallel.name = "compose_parallel"
32+
cascade.core.dataflows[DataflowRef("Frontend", "compose_parallel")] = df_parallel
33+
runtime.add_dataflow(df_parallel)
34+
35+
# for prefetch experiment
36+
df_baseline = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch")]
37+
df_parallel, _ = parallelize_until_if(df_baseline)
38+
df_parallel.name = "upload_movie_prefetch_parallel"
39+
cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] = df_parallel
40+
runtime.add_dataflow(df_parallel)
41+
42+
print(cascade.core.dataflows.keys())
43+
8144
runtime.run()
8245

8346
if __name__ == "__main__":

0 commit comments

Comments
 (0)