Skip to content

Commit 80b2955

Browse files
committed
add streamlit example
1 parent c6ecb62 commit 80b2955

File tree

5 files changed

+239
-2
lines changed

5 files changed

+239
-2
lines changed

.github/workflows/run-examples.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ jobs:
2424
python -m pip install --upgrade pip
2525
pip install .
2626
- name: Install pytest other dependencies
27-
run: pip install pytest pytest-xdist setuptools gymnasium torch osmnx deap
27+
run: pip install pytest pytest-xdist setuptools gymnasium torch osmnx deap streamlit
2828
- name: Run examples with pytest
2929
run: pytest -n auto tests/test_examples.py --durations=0 -v
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import streamlit as st
2+
from uxsim import *
3+
import matplotlib.pyplot as plt
4+
5+
st.set_page_config(page_title="Coordinated Signal Control", layout="wide")
6+
st.title("Coordinated Signal Control")
7+
st.write(
8+
"""
9+
This is an interactive simulation that shows how traffic conditions change depending on signal control parameters.
10+
Traffic demand flows from one end of a single road to the other.
11+
There is no entry or exit along the way.
12+
The traffic flow simulator [UXsim](https://github.com/toruseo/UXsim) is used.
13+
""")
14+
15+
# Simulation model (accepts parameters)
16+
class UXsim_model:
17+
def __init__(self, demand_flow=0.3, green_split=0.5, offset1=0, offset2=0, offset3=0):
18+
self.demand_flow = demand_flow
19+
self.green_split = green_split
20+
self.offsets = [0, offset1, offset2, offset3, 0]
21+
self.n_nodes = 5
22+
self.cycle = 120
23+
self.W = None
24+
25+
def create_world(self):
26+
self.W = World(
27+
name="signal_series",
28+
deltan=5,
29+
tmax=1500,
30+
print_mode=0, save_mode=0, show_mode=1,
31+
random_seed=0
32+
)
33+
34+
# Create nodes
35+
for i in range(self.n_nodes):
36+
if i != 0 and i != self.n_nodes - 1:
37+
if i == 2:
38+
signal = [self.cycle * self.green_split, self.cycle * (1 - self.green_split)]
39+
else:
40+
signal = [self.cycle * 0.5, self.cycle * 0.5]
41+
else:
42+
signal = [0]
43+
self.W.addNode(f"node{i}", 0, i, signal=signal, signal_offset=self.cycle-self.offsets[i])
44+
45+
# Create links
46+
for i in range(self.n_nodes - 1):
47+
j = i + 1
48+
self.W.addLink(f"link{i}-{j}", f"node{i}", f"node{j}", length=300, free_flow_speed=10, signal_group=0)
49+
self.W.addLink(f"link{j}-{i}", f"node{j}", f"node{i}", length=300, free_flow_speed=10, signal_group=0)
50+
51+
# Add demand
52+
self.W.adddemand(orig="node0", dest=f"node{self.n_nodes - 1}", t_start=0, t_end=1000, flow=self.demand_flow, attribute="group1")
53+
54+
def show_network(self):
55+
"""Display network diagram"""
56+
if self.W is None:
57+
self.create_world()
58+
self.W.show_network(show_id=0, figsize=(3,3), network_font_size=6)
59+
return plt.gcf()
60+
61+
def run_simulation(self):
62+
"""Run simulation"""
63+
if self.W is None:
64+
self.create_world()
65+
self.W.exec_simulation()
66+
67+
def get_stats(self):
68+
"""Get statistics"""
69+
if len(self.W.VEHICLES) > 0:
70+
avg_travel_time = self.W.analyzer.total_travel_time / len(self.W.VEHICLES)/self.W.DELTAN
71+
else:
72+
avg_travel_time = 0
73+
74+
return {
75+
'total_travel_time': self.W.analyzer.total_travel_time,
76+
'avg_travel_time': avg_travel_time
77+
}
78+
79+
def show_time_space_diagram(self):
80+
"""Display time-space trajectory diagram"""
81+
self.W.analyzer.time_space_diagram_traj_links([f"link{i}-{i + 1}" for i in range(self.n_nodes - 1)])
82+
return plt.gcf()
83+
84+
# Initial simulation for display (default parameters)
85+
initial_sim = UXsim_model()
86+
87+
# Road network structure
88+
st.subheader("Road Network Structure")
89+
fig1 = initial_sim.show_network()
90+
st.pyplot(fig1, use_container_width=False)
91+
92+
# Sliders
93+
st.subheader("Scenario Parameter Settings")
94+
demand_flow = st.slider("Traffic Demand", min_value=0.0, max_value=1.0, value=0.3, step=0.05)
95+
green_split = st.slider("Green Time Ratio at Signal 2", min_value=0.0, max_value=1.0, value=0.5, step=0.05)
96+
o3 = st.slider("Offset at Signal 3", min_value=0, max_value=120, value=0, step=5)
97+
o2 = st.slider("Offset at Signal 2", min_value=0, max_value=120, value=0, step=5)
98+
o1 = st.slider("Offset at Signal 1", min_value=0, max_value=120, value=0, step=5)
99+
100+
# Run simulation with parameters
101+
sim = UXsim_model(demand_flow, green_split, o1, o2, o3)
102+
sim.run_simulation()
103+
104+
# Show statistics
105+
st.subheader("Statistics")
106+
stats = sim.get_stats()
107+
st.write("Total Travel Time:", f"{stats['total_travel_time']/60:.1f}", "vehicle-minutes")
108+
st.write("Average Travel Time:", f"{stats['avg_travel_time']:.1f}", "seconds")
109+
110+
# Time-space diagram
111+
st.subheader("Time-Space Trajectory Diagram")
112+
fig2 = sim.show_time_space_diagram()
113+
st.pyplot(fig2)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import streamlit as st
2+
from uxsim import *
3+
import matplotlib.pyplot as plt
4+
5+
st.set_page_config(page_title="信号の系統制御", layout="wide")
6+
st.title("信号の系統制御")
7+
st.write(
8+
"""
9+
信号制御パラメータにより,交通状況がどう変化するかをインタラクティブに試せるシミュレーションです.
10+
一本の道路の端から端まで指定した交通需要が流れます.
11+
途中の出入りはありません.
12+
交通流シミュレータ[UXsim](https://github.com/toruseo/UXsim)を使用しています.
13+
""")
14+
15+
# シミュレーションモデル(パラメータを受け取る)
16+
class UXsim_model:
17+
def __init__(self, demand_flow=0.3, green_split=0.5, offset1=0, offset2=0, offset3=0):
18+
self.demand_flow = demand_flow
19+
self.green_split = green_split
20+
self.offsets = [0, offset1, offset2, offset3, 0]
21+
self.n_nodes = 5
22+
self.cycle = 120
23+
self.W = None
24+
25+
def create_world(self):
26+
self.W = World(
27+
name="signal_series",
28+
deltan=5,
29+
tmax=1500,
30+
print_mode=0, save_mode=0, show_mode=1,
31+
random_seed=0
32+
)
33+
34+
# ノード作成
35+
for i in range(self.n_nodes):
36+
if i != 0 and i != self.n_nodes - 1:
37+
if i == 2:
38+
signal = [self.cycle * self.green_split, self.cycle * (1 - self.green_split)]
39+
else:
40+
signal = [self.cycle * 0.5, self.cycle * 0.5]
41+
else:
42+
signal = [0]
43+
self.W.addNode(f"node{i}", 0, i, signal=signal, signal_offset=self.cycle-self.offsets[i])
44+
45+
# リンク作成
46+
for i in range(self.n_nodes - 1):
47+
j = i + 1
48+
self.W.addLink(f"link{i}-{j}", f"node{i}", f"node{j}", length=300, free_flow_speed=10, signal_group=0)
49+
self.W.addLink(f"link{j}-{i}", f"node{j}", f"node{i}", length=300, free_flow_speed=10, signal_group=0)
50+
51+
# 需要追加
52+
self.W.adddemand(orig="node0", dest=f"node{self.n_nodes - 1}", t_start=0, t_end=1000, flow=self.demand_flow, attribute="group1")
53+
54+
def show_network(self):
55+
"""ネットワーク図を表示"""
56+
if self.W is None:
57+
self.create_world()
58+
self.W.show_network(show_id=0, figsize=(3,3), network_font_size=6)
59+
return plt.gcf()
60+
61+
def run_simulation(self):
62+
"""シミュレーション実行"""
63+
if self.W is None:
64+
self.create_world()
65+
self.W.exec_simulation()
66+
67+
def get_stats(self):
68+
"""統計情報を取得"""
69+
if len(self.W.VEHICLES) > 0:
70+
avg_travel_time = self.W.analyzer.total_travel_time / len(self.W.VEHICLES)/self.W.DELTAN
71+
else:
72+
avg_travel_time = 0
73+
74+
return {
75+
'total_travel_time': self.W.analyzer.total_travel_time,
76+
'avg_travel_time': avg_travel_time
77+
}
78+
79+
def show_time_space_diagram(self):
80+
"""時空間軌跡図を表示"""
81+
self.W.analyzer.time_space_diagram_traj_links([f"link{i}-{i + 1}" for i in range(self.n_nodes - 1)])
82+
return plt.gcf()
83+
84+
# 初期表示用のシミュレーション(デフォルトパラメータ)
85+
initial_sim = UXsim_model()
86+
87+
# 道路ネットワーク構成
88+
st.subheader("道路ネットワーク構成")
89+
fig1 = initial_sim.show_network()
90+
st.pyplot(fig1, use_container_width=False)
91+
92+
# スライダー
93+
st.subheader("シナリオパラメータ設定")
94+
demand_flow = st.slider("交通需要", min_value=0.0, max_value=1.0, value=0.3, step=0.05)
95+
green_split = st.slider("青時間比 at 信号2", min_value=0.0, max_value=1.0, value=0.5, step=0.05)
96+
o3 = st.slider("オフセット at 信号3", min_value=0, max_value=120, value=0, step=5)
97+
o2 = st.slider("オフセット at 信号2", min_value=0, max_value=120, value=0, step=5)
98+
o1 = st.slider("オフセット at 信号1", min_value=0, max_value=120, value=0, step=5)
99+
100+
# パラメータが設定されたシミュレーション実行
101+
sim = UXsim_model(demand_flow, green_split, o1, o2, o3)
102+
sim.run_simulation()
103+
104+
# 統計表示
105+
st.subheader("統計情報")
106+
stats = sim.get_stats()
107+
st.write("総旅行時間:", f"{stats['total_travel_time']/60:.1f}", "台分")
108+
st.write("平均旅行時間:", f"{stats['avg_travel_time']:.1f}", "秒")
109+
110+
# 時空間図
111+
st.subheader("時空間車両軌跡図")
112+
fig2 = sim.show_time_space_diagram()
113+
st.pyplot(fig2)

tests/test_examples.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,17 @@ def test_example_runs(example_script):
2727
"""Test that a Python example script runs successfully."""
2828
# Build the script path
2929
script_path = os.path.join(examples_dir, example_script)
30+
31+
if "streamlit" in example_script:
32+
# Run streamlit app in headless mode
33+
cmd = ['streamlit', 'run', script_path, '--headless', '--server.port=8501', '--server.enableCORS=false']
34+
timeout = 60
35+
else:
36+
# Regular Python script
37+
cmd = ['python', script_path]
38+
timeout = None
39+
3040
# Run the script as a separate process
31-
result = subprocess.run(['python', script_path], capture_output=True, text=True)
41+
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
3242
# Assert that the script ran successfully
3343
assert result.returncode == 0, f"Script {example_script} failed with output:\n{result.stdout}\n{result.stderr}"

uxsim/analyzer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ def time_space_diagram_traj_links(s, linkslist, figsize=(12,4), plot_signal=True
517517
plt.show()
518518
else:
519519
plt.close("all")
520+
520521

521522
@catch_exceptions_and_warn()
522523
def cumulative_curves(s, links=None, figsize=(6,4)):

0 commit comments

Comments
 (0)