diff --git a/tests/test_other_functions.py b/tests/test_other_functions.py index 31dad37..b2eba38 100644 --- a/tests/test_other_functions.py +++ b/tests/test_other_functions.py @@ -946,6 +946,30 @@ def test_change_jam_density(): #W.analyzer.time_space_diagram_traj_links(["link1","link2","link3"]) +def test_get_linkstats(): + W = World(name="simple", tmax=2000, show_mode=1) + + W.addNode("start", x=0, y=0) + W.addNode("end", x=7500, y=0) + + l = W.addLink("road", start_node="start", end_node="end", length=10000, free_flow_speed=10) + + W.adddemand(orig="start", dest="end", t_start=100, t_end=600, volume=500) + + W.exec_simulation() + + t = 50 + assert equal_tolerance(l.num_vehicles_t(t), 0) + assert equal_tolerance(l.average_density(t), 0.0) + assert equal_tolerance(l.average_speed(t), 10.0) + assert equal_tolerance(l.average_flow(t), 0.0) + + t = 1000 + assert equal_tolerance(l.num_vehicles_t(t), 500) + assert equal_tolerance(l.average_density(t), 0.05) + assert equal_tolerance(l.average_speed(t), 10.0) + assert equal_tolerance(l.average_flow(t), 0.5) + def test_user_functions(): # define custom user_functions diff --git a/uxsim/DTAsolvers/DTAsolvers.py b/uxsim/DTAsolvers/DTAsolvers.py index a7bdd5d..c1f2c4c 100644 --- a/uxsim/DTAsolvers/DTAsolvers.py +++ b/uxsim/DTAsolvers/DTAsolvers.py @@ -39,9 +39,10 @@ def __init__(s, func_World): There are some theoretical background for it; but intuitively speaking, the steady state can be considered as a realistic state that people's rational behavior will reach. This method is based on the following literature: - Ishihara, M., & Iryo, T. (2015). Dynamic Traffic Assignment by Markov Chain. Journal of Japan Society of Civil Engineers, Ser. D3 (Infrastructure Planning and Management), 71(5), I_503-I_509. (in Japanese). https://doi.org/10.2208/jscejipm.71.I_503 - Iryo, T., Urata, J., & Kawase, R. (2024). Traffic Flow Simulator and Travel Demand Simulators for Assessing Congestion on Roads After a Major Earthquake. In APPLICATION OF HIGH-PERFORMANCE COMPUTING TO EARTHQUAKE-RELATED PROBLEMS (pp. 413-447). https://doi.org/10.1142/9781800614635_0007 - Iryo, T., Watling, D., & Hazelton, M. (2024). Estimating Markov Chain Mixing Times: Convergence Rate Towards Equilibrium of a Stochastic Process Traffic Assignment Model. Transportation Science. https://doi.org/10.1287/trsc.2024.0523 + + - Ishihara, M., & Iryo, T. (2015). Dynamic Traffic Assignment by Markov Chain. Journal of Japan Society of Civil Engineers, Ser. D3 (Infrastructure Planning and Management), 71(5), I_503-I_509. (in Japanese). https://doi.org/10.2208/jscejipm.71.I_503 + - Iryo, T., Urata, J., & Kawase, R. (2024). Traffic Flow Simulator and Travel Demand Simulators for Assessing Congestion on Roads After a Major Earthquake. In APPLICATION OF HIGH-PERFORMANCE COMPUTING TO EARTHQUAKE-RELATED PROBLEMS (pp. 413-447). https://doi.org/10.1142/9781800614635_0007 + - Iryo, T., Watling, D., & Hazelton, M. (2024). Estimating Markov Chain Mixing Times: Convergence Rate Towards Equilibrium of a Stochastic Process Traffic Assignment Model. Transportation Science. https://doi.org/10.1287/trsc.2024.0523 """ s.func_World = func_World s.W_sol = None #final solution @@ -346,7 +347,9 @@ def __init__(s, func_World): Apart from these point, this algorithm is almost the same to `SolverDUE` and thus expected to have the desirable properties discussed in the papers cited above. This algorithm is a loose generalization of "DSO game" of the following paper. + - Satsukawa, K., Wada, K., & Watling, D. (2022). Dynamic system optimal traffic assignment with atomic users: Convergence and stability. Transportation Research Part B: Methodological, 155, 188-209. + In this paper, it is theoretically guaranteed that their DSO game algorithm converges to the global optimal DSO state. However, it may take a long time. This `SolverDSO_D2D` speeds up the solution process significantly, but it weakens the convergence guarantee. """ @@ -1362,370 +1365,3 @@ def plot_vehicle_stats(s, orig=None, dest=None): plt.ylabel("travel time") plt.legend() plt.show() - -# does not work very well -# class SolverDUE_departure_time_choice: -# def __init__(s, func_World, desired_arrival_time, time_windows, alpha=1, beta=0.3, gamma=0.9, insensitive_ratio=0.333): -# """ -# Solve quasi Dynamic User Equilibrium (DUE) problem with departure time choice using day-to-day dynamics. WIP - -# Parameters -# ---------- -# func_World : function -# function that returns a World object with nodes, links, and demand specifications -# desired_arrival_time : float -# desired arrival time to the destination in second -# time_windows : list of float -# Time windows for departure time. List of the start time of each time window in second. The final element denotes the end of the windows. -# For example, `time_windows=[0,600,1200,1800]' means there are 3 windows, namely, 0-600 s, 600-1200 s, and 1200-1800 s. -# alpha : float -# travel time penalty coefficient per second -# beta : float -# early arrival penalty coefficient per second -# gamma : float -# late arrival penalty coefficient per second -# insensitive_ratio : float -# travelers become insensitive to the cost difference smaller than this ratio (i.e, bounded rationality, epsilon-Nash equilibirum) - -# Notes -# ----- -# This function computes a near dynamic user equilibirum state as a steady state of day-to-day dynamical routing game. - -# This considers departure time choice as well as route choice. - -# Specifically, on day `i`, vehicles choose their route based on actual travel time on day `i-1` with the same departure time. -# If there are shorter travel time route, they will change with probability `swap_prob`. -# This process is repeated until `max_iter` day. -# It is expected that this process eventually reach a steady state. -# Due to the problem complexity, it does not necessarily reach or converge to Nash equilibrium or any other stationary points. -# However, in the literature, it is argued that the steady state can be considered as a reasonable proxy for Nash equilibrium or dynamic equilibrium state. -# There are some theoretical background for it; but intuitively speaking, the steady state can be considered as a realistic state that people's rational behavior will reach. - -# This method is based on the following literature: -# Route choice equilibrium: -# Ishihara, M., & Iryo, T. (2015). Dynamic Traffic Assignment by Markov Chain. Journal of Japan Society of Civil Engineers, Ser. D3 (Infrastructure Planning and Management), 71(5), I_503-I_509. (in Japanese). https://doi.org/10.2208/jscejipm.71.I_503 -# Iryo, T., Urata, J., & Kawase, R. (2024). Traffic Flow Simulator and Travel Demand Simulators for Assessing Congestion on Roads After a Major Earthquake. In APPLICATION OF HIGH-PERFORMANCE COMPUTING TO EARTHQUAKE-RELATED PROBLEMS (pp. 413-447). https://doi.org/10.1142/9781800614635_0007 -# Iryo, T., Watling, D., & Hazelton, M. (2024). Estimating Markov Chain Mixing Times: Convergence Rate Towards Equilibrium of a Stochastic Process Traffic Assignment Model. Transportation Science. https://doi.org/10.1287/trsc.2024.0523 - -# Departure time choice equilibrium: -# Satsukawa, K., Wada, K., & Iryo, T. (2024). Stability analysis of a departure time choice problem with atomic vehicle models. Transportation Research Part B: Methodological, 189, 103039. https://doi.org/10.1016/j.trb.2024.103039 - -# """ -# s.func_World = func_World - -# s.desired_arrival_time = desired_arrival_time -# s.time_windows = time_windows -# s.alpha = alpha -# s.beta = beta -# s.gamma = gamma -# s.insensitive_ratio = insensitive_ratio - -# s.W_sol = None #final solution -# s.W_intermid_solution = None #latest solution in the iterative process. Can be used when a user terminates the solution algorithm -# s.W_minimum_cost_gap = None #solution with minimum cost gap, considered as one closest to equilibrium state. -# s.dfs_link = [] - -# warnings.warn("DTA solver is experimental and may not work as expected. It is functional but unstable. `SolverDUE_departure_time_choice` is alpha-stage.") - -# def solve(s, max_iter, n_routes_per_od=10, swap_prob=0.05, print_progress=True, callback=None): -# """ -# Solve quasi Dynamic User Equilibrium (DUE) problem using day-to-day dynamics. WIP - -# Parameters -# ---------- -# max_iter : int -# maximum number of iterations -# n_routes_per_od : int -# number of routes to enumerate for each OD pair -# swap_prob : float -# probability of route swap -# print_progress : bool -# whether to print the information - -# Returns -# ------- -# W : World -# World object with quasi DUE solution (if properly converged) - -# Notes -# ----- -# `self.W_sol` is the final solution. -# `self.W_intermid_solution` is the latest solution in the iterative process. Can be used when a user terminates the solution algorithm. -# """ -# s.start_time = time.time() - -# W_orig = s.func_World() -# if print_progress: -# W_orig.print_scenario_stats() - -# # enumerate routes for each OD pair -# n_routes_per_od = n_routes_per_od - -# dict_od_to_vehid = defaultdict(lambda: []) -# for key, veh in W_orig.VEHICLES.items(): -# o = veh.orig.name -# d = veh.dest.name -# dict_od_to_vehid[o,d].append(key) - -# # dict_od_to_routes = {} -# # for o,d in dict_od_to_vehid.keys(): -# # routes = enumerate_k_shortest_routes(W_orig, o, d, k=n_routes_per_od) -# # dict_od_to_routes[o,d] = routes - -# if W_orig.finalized == False: -# W_orig.finalize_scenario() -# dict_od_to_routes = enumerate_k_random_routes(W_orig, k=n_routes_per_od) - -# if print_progress: -# print(f"number of OD pairs: {len(dict_od_to_routes.keys())}, number of routes: {sum([len(val) for val in dict_od_to_routes.values()])}, number of departure time windows: {len(s.time_windows)}") - -# # day-to-day dynamics -# s.ttts = [] -# s.n_swaps = [] -# s.potential_swaps = [] -# s.cost_gaps = [] -# s.route_log = [] -# s.dep_t_log = [] -# s.cost_log = [] -# cost_gap_minimum = 9999999999999999999999 -# swap_prob = swap_prob -# max_iter = max_iter - -# print("solving DUE...") -# for i in range(max_iter): -# W = s.func_World() -# if i != max_iter-1: -# W.vehicle_logging_timestep_interval = 3 - -# if i != 0: -# for key in W.VEHICLES: -# if key in routes_specified: -# W.VEHICLES[key].enforce_route(routes_specified[key]) -# if key in dep_time_specified: -# W.VEHICLES[key].departure_time = int(dep_time_specified[key]/W.DELTAT) -# W.VEHICLES[key].departure_time_in_second = dep_time_specified[key] -# W.VEHICLES[key].log_t_link[0][0] = int(dep_time_specified[key]/W.DELTAT) - -# route_set = defaultdict(lambda: []) #routes[o,d] = [Route, Route, ...] -# for o,d in dict_od_to_vehid.keys(): -# for r in dict_od_to_routes[o,d]: -# route_set[o,d].append(W.defRoute(r)) - -# # simulation -# W.exec_simulation() - -# # results -# W.analyzer.print_simple_stats() -# #W.analyzer.network_average() - -# # attach route choice set to W object for later re-use at different solvers like DSO-GA -# W.dict_od_to_routes = dict_od_to_routes - -# s.W_intermid_solution = W - -# s.dfs_link.append(W.analyzer.link_to_pandas()) - -# # route swap -# routes_specified = {} -# dep_time_specified = {} -# route_actual = {} -# dep_time_actual = {} -# cost_actual = {} -# n_swap = 0 -# total_cost_gap = 0 -# potential_n_swap = 0 -# for key,veh in W.VEHICLES.items(): -# o = veh.orig.name -# d = veh.dest.name -# r, ts = veh.traveled_route() -# dep_time = veh.departure_time*W.DELTAT -# travel_time = ts[-1]-ts[0] - -# def schedule_cost(arr_t): -# if arr_t > 0: #negative is null (not arrived) -# if arr_t < s.desired_arrival_time: -# arr_time_penalty = s.beta * (s.desired_arrival_time-arr_t) -# else: -# arr_time_penalty = s.gamma * (arr_t-s.desired_arrival_time) -# else: -# arr_time_penalty = (s.beta+s.gamma)*W.TMAX + W.TMAX*s.alpha -# return arr_time_penalty - - -# route_actual[key] = [rr.name for rr in r] -# dep_time_actual[key] = veh.departure_time*W.DELTAT -# cost_actual[key] = s.alpha*travel_time + schedule_cost(veh.arrival_time*W.DELTAT) - -# veh.departure_time_choice_cost = cost_actual[key] -# veh.departure_time_choice_cost_tt = s.alpha*travel_time -# veh.departure_time_choice_cost_sc = schedule_cost(veh.arrival_time*W.DELTAT) - -# if veh.state != "end": -# continue - -# flag_changed = False -# flag_potential_change = False -# route_changed = None -# dep_t_changed = None - -# #stabilization technique: portion change -# change_possibility = random.random() < swap_prob -# #stabilization technique: travelers with earlier departure time is unlikely to change -# # dep_t_ratio = (dep_time_actual[key]-s.time_windows[0])/(s.time_windows[-1]-s.time_windows[0]) -# # if random.random() > dep_t_ratio+0.1: -# # change_possibility = False - -# actual_cost = s.alpha*r.actual_travel_time(ts[0]) + schedule_cost(dep_time_actual[key]+r.actual_travel_time(ts[0])) - -# best_cost = actual_cost #現在の選択と最適選択のコスト差cost_gap算出用.均衡に近いとコスト差が小さい - -# for j,t1 in enumerate(s.time_windows[:-1]): -# t2 = s.time_windows[j+1] #time window [t1, t2] -# alt_t_dep = W.rng.random()*(t2-t1) + t1 - -# #stabilization technique: departure time change is relatively rare. choose only 1 time slot on average -# change_possibility_time = False -# if random.random() < 1/len(s.time_windows[:-1]): -# change_possibility_time = True - -# for alt_route in route_set[o,d]: -# alt_cost = s.alpha*alt_route.actual_travel_time(alt_t_dep) + schedule_cost(alt_t_dep+alt_route.actual_travel_time(alt_t_dep)) - -# if alt_cost < best_cost: -# best_cost = alt_cost - -# # if i > 50: -# # print("actual:", dep_time_actual[key], "; cost:", actual_cost) -# # print(f"change: {alt_t_dep:.1f}, ; cost: {alt_cost:.1f}", "\t", actual_cost - alt_cost > actual_cost*s.insensitive_ratio, change_possibility, change_possibility_time) - -# if actual_cost - alt_cost > actual_cost*s.insensitive_ratio: -# #stabilization technique: bounded rational behavior. insensitve to difference smaller than x% -# flag_potential_change = 1 - -# if change_possibility and change_possibility_time: -# flag_changed = True -# route_changed = alt_route -# dep_t_changed = alt_t_dep - -# total_cost_gap += actual_cost-best_cost -# routes_specified[key] = r -# dep_time_specified[key] = dep_time -# if flag_potential_change: -# potential_n_swap += W.DELTAN -# if flag_changed: -# n_swap += W.DELTAN -# routes_specified[key] = route_changed -# dep_time_specified[key] = dep_t_changed - -# veh.departure_time_choice_changed = dep_time_specified[key] - -# cost_gap_per_vehicle = total_cost_gap/len(W.VEHICLES) -# if print_progress: -# print(f' iter {i}: cost gap: {cost_gap_per_vehicle:.1f}, potential change: {potential_n_swap}, change: {n_swap}, total travel time: {W.analyzer.total_travel_time: .1f}, delay ratio: {W.analyzer.average_delay/W.analyzer.average_travel_time: .3f}') - -# if cost_gap_per_vehicle < cost_gap_minimum: -# s.W_minimum_cost_gap = W -# cost_gap_minimum = cost_gap_per_vehicle - -# if callback: -# callback(i, W) - - -# s.route_log.append(route_actual) -# s.dep_t_log.append(dep_time_actual) -# s.cost_log.append(cost_actual) - -# s.ttts.append(int(W.analyzer.total_travel_time)) -# s.n_swaps.append(n_swap) -# s.potential_swaps.append(potential_n_swap) -# s.cost_gaps.append(cost_gap_per_vehicle) - - -# s.end_time = time.time() - -# print("DUE summary:") -# last_iters = int(max_iter/4) -# print(f" total travel time: initial {s.ttts[0]:.1f} -> average of last {last_iters} iters {np.average(s.ttts[-last_iters:]):.1f}") -# print(f" number of potential changes: initial {s.potential_swaps[0]:.1f} -> average of last {last_iters} iters {np.average(s.potential_swaps[-last_iters:]):.1f}") -# print(f" cost gap: initial {s.cost_gaps[0]:.1f} -> average of last {last_iters} iters {np.average(s.cost_gaps[-last_iters:]):.1f}") -# print(f" minimum cost gap {cost_gap_minimum:.1f}") -# print(f" computation time: {s.end_time - s.start_time:.1f} seconds") - - -# s.W_sol = W -# return s.W_sol - -# def plot_convergence(s): -# # iteration plot -# plt.figure(figsize=(6,2)) -# plt.title("total travel time") -# plt.plot(s.ttts) -# plt.xlabel("iter") - -# plt.figure(figsize=(6,2)) -# plt.title("number of route change") -# plt.plot(s.n_swaps) -# plt.ylim(0,None) -# plt.xlabel("iter") - -# plt.figure(figsize=(6,2)) -# plt.title("travel time difference between chosen route and minimum cost route") -# plt.plot(s.t_gaps) -# plt.ylim(0,None) -# plt.xlabel("iter") - -# plt.show() - -# # plt.figure() -# # plot_multiple_y(ys=[s.ttts, s.n_swaps, s.potential_swaps, s.total_t_gaps, np.array(s.total_t_gaps)/np.array(s.potential_swaps)], labels=["total travel time", "number of route change", "number of potential route change", "time gap for potential route change", "time gap per potential route change"]) -# # plt.xlabel("iter") -# # plt.show() - -# def plot_link_stats(s): -# plt.figure() -# plt.title("traffic volume") -# for i in range(len(s.dfs_link[0])): -# vols = [df["traffic_volume"][i] for df in s.dfs_link] -# plt.plot(vols, label=s.dfs_link[0]["link"][i]) -# plt.xlabel("iteration") -# plt.ylabel("volume (veh)") -# plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) - -# plt.figure() -# plt.title("average travel time") -# for i in range(len(s.dfs_link[0])): -# vols = [df["average_travel_time"][i] for df in s.dfs_link] -# plt.plot(vols, label=s.dfs_link[0]["link"][i]) -# plt.xlabel("iteration") -# plt.ylabel("time (s)") -# plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) - -# plt.show() - -# def plot_vehicle_stats(s, orig=None, dest=None): -# ave_TT = [] -# std_TT = [] -# depature_time = [] -# for vehid in s.route_log[0].keys(): -# if (s.W_sol.VEHICLES[vehid].orig.name == orig or orig == None) and (s.W_sol.VEHICLES[vehid].dest.name == dest or dest == None): -# length = len(s.route_log) -# ts = [s.cost_log[day][vehid] for day in range(int(length/2), length)] -# ave_TT.append(np.average(ts)) -# std_TT.append(np.std(ts)) -# depature_time.append(s.W_sol.VEHICLES[vehid].departure_time_in_second) - -# plt.figure() -# orig_ = orig -# if orig == None: -# orig_ = "any" -# dest_ = dest -# if dest == None: -# dest_ = "any" -# plt.title(f"orig: {orig_}, dest: {dest_}") -# plt.errorbar(x=depature_time, y=ave_TT, yerr=std_TT, -# fmt='bx', ecolor="#aaaaff", capsize=0, label="travel time (mean $pm$ std)") -# plt.xlabel("departure time of vehicle") -# plt.ylabel("travel time") -# plt.legend() -# plt.show() - diff --git a/uxsim/uxsim.py b/uxsim/uxsim.py index 3000453..0974bcc 100644 --- a/uxsim/uxsim.py +++ b/uxsim/uxsim.py @@ -632,7 +632,7 @@ def set_traveltime_instant(s): s.traveltime_instant.append(s.traveltime_instant[-1]) - def arrival_count(s, t): + def arrival_count(s, t: float) -> float: """ Get cumulative vehicle count of arrival to this link on time t. @@ -653,7 +653,7 @@ def arrival_count(s, t): return s.cum_arrival[0] return s.cum_arrival[tt] - def departure_count(s, t): + def departure_count(s, t: float) -> float: """ Get cumulative vehicle count of departure from this link on time t. @@ -673,8 +673,40 @@ def departure_count(s, t): if tt < 0: return s.cum_departure[0] return s.cum_departure[tt] + + def num_vehicles_t(s, t: float) -> float: + """ + Get number of vehicles on this link on time t. + + Parameters + ---------- + t : float + Time in seconds. - def instant_travel_time(s, t): + Returns + ------- + float + The number of vehicles. + """ + return s.arrival_count(t)-s.departure_count(t) + + def average_density(s, t: float) -> float: + """ + Get average density of this link on time t. + + Parameters + ---------- + t : float + Time in seconds. + + Returns + ------- + float + The average density. + """ + return float(s.num_vehicles_t(t)/s.length) + + def instant_travel_time(s, t: float) -> float: """ Get instantaneous travel time of this link on time t. @@ -694,8 +726,8 @@ def instant_travel_time(s, t): if tt < 0: return s.traveltime_instant[0] return s.traveltime_instant[tt] - - def actual_travel_time(s, t): + + def actual_travel_time(s, t: float) -> float: """ Get actual travel time of vehicle who enters this link on time t. Note that small error may occur due to fractional processing. @@ -716,6 +748,38 @@ def actual_travel_time(s, t): return s.traveltime_actual[0] return s.traveltime_actual[tt] + def average_speed(s, t: float) -> float: + """ + Get average speed (=inverse of instantaneous travel time) of this link on time t. + + Parameters + ---------- + t : float + Time in seconds. + + Returns + ------- + float + The instantaneous travel time. + """ + return float(s.length/s.instant_travel_time(t)) + + def average_flow(s, t: float) -> float: + """ + Get average flow of this link on time t. + + Parameters + ---------- + t : float + Time in seconds. + + Returns + ------- + float + The average flow. + """ + return float(s.average_speed(t)*s.average_density(t)) + #getter/setter @property def speed(s):