Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions lib/headway_analysis/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ defmodule HeadwayAnalysis.Server do
use GenServer
require Logger

@enforce_keys [
:sign_id,
:headway_group,
:stop_ids,
:vehicles_present
]
@enforce_keys [:sign_id, :headway_group, :stop_ids, :vehicle_trips]
defstruct @enforce_keys

def start_link(config) do
Expand All @@ -28,7 +23,7 @@ defmodule HeadwayAnalysis.Server do
sign_id: config["id"],
headway_group: config["source_config"]["headway_group"],
stop_ids: Enum.map(config["source_config"]["sources"], & &1["stop_id"]),
vehicles_present: MapSet.new()
vehicle_trips: %{}
}}
end

Expand All @@ -47,22 +42,23 @@ defmodule HeadwayAnalysis.Server do

revenue_vehicles = RealtimeSigns.prediction_engine().revenue_vehicles()

new_vehicles_present =
for stop_id <- state.stop_ids,
location <- RealtimeSigns.location_engine().for_stop(stop_id),
location.status == :stopped_at,
into: MapSet.new() do
location.vehicle_id
end
new_vehicle_trips =
Enum.flat_map(state.stop_ids, &RealtimeSigns.location_engine().for_stop(&1))
|> Enum.filter(&(&1.status == :stopped_at))
|> Map.new(&{&1.vehicle_id, &1.trip_id})

if MapSet.difference(state.vehicles_present, new_vehicles_present)
|> Enum.any?(&(&1 in revenue_vehicles)) do
MapSet.difference(
MapSet.new(state.vehicle_trips, &elem(&1, 0)),
MapSet.new(new_vehicle_trips, &elem(&1, 0))
)
|> Enum.filter(&(&1 in revenue_vehicles))
|> Enum.each(fn vehicle_id ->
Logger.info(
"headway_analysis_departure: sign_id=#{state.sign_id} headway_low=#{headway_low} headway_high=#{headway_high}"
"headway_analysis_departure: sign_id=#{inspect(state.sign_id)} trip_id=#{inspect(state.vehicle_trips[vehicle_id])} headway_low=#{headway_low} headway_high=#{headway_high}"
)
end
end)

{:noreply, %{state | vehicles_present: new_vehicles_present}}
{:noreply, %{state | vehicle_trips: new_vehicle_trips}}
end

defp schedule_update(pid) do
Expand Down
19 changes: 2 additions & 17 deletions lib/headway_analysis/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,14 @@ defmodule HeadwayAnalysis.Supervisor do
"""
use Supervisor

@sign_ids ~w(
harvard_southbound harvard_northbound
fields_corner_southbound fields_corner_northbound
wollaston_southbound wollaston_northbound
jackson_square_southbound jackson_square_northbound
wellington_southbound wellington_northbound
beachmont_westbound beachmont_eastbound
aquarium_westbound aquarium_eastbound
babcock_st_westbound babcock_st_eastbound
prudential_westbound prudential_eastbound
newton_centre_westbound newton_centre_eastbound
magoun_square_westbound magoun_square_eastbound
boylston_westbound boylston_eastbound
lechmere_green_line_westbound lechmere_green_line_eastbound
)

def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

@impl true
def init([]) do
for config <- Signs.Utilities.SignsConfig.children_config(), config["id"] in @sign_ids do
for %{"type" => "realtime", "source_config" => %{}} = config <-
Signs.Utilities.SignsConfig.children_config() do
Supervisor.child_spec({HeadwayAnalysis.Server, config},
id: {HeadwayAnalysis.Server, config["id"]}
)
Expand Down
48 changes: 41 additions & 7 deletions lib/signs/realtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ defmodule Signs.Realtime do
:current_content_bottom,
:last_update,
:tick_read,
:read_period_seconds
:read_period_seconds,
:last_message_log_time
]

defstruct @enforce_keys ++
Expand Down Expand Up @@ -70,12 +71,18 @@ defmodule Signs.Realtime do
announced_alert: boolean(),
prev_predictions: [Predictions.Prediction.t()],
uses_shuttles: boolean(),
pa_message_plays: %{integer() => DateTime.t()}
pa_message_plays: %{integer() => DateTime.t()},
last_message_log_time: DateTime.t()
}

def start_link(%{"type" => "realtime"} = config) do
source_config = config |> Map.fetch!("source_config") |> SourceConfig.parse!()

current_time_fn = fn ->
time_zone = Application.fetch_env!(:realtime_signs, :time_zone)
DateTime.utc_now() |> DateTime.shift_zone!(time_zone)
end

sign = %__MODULE__{
id: Map.fetch!(config, "id"),
pa_ess_loc: Map.fetch!(config, "pa_ess_loc"),
Expand All @@ -87,16 +94,14 @@ defmodule Signs.Realtime do
config |> Map.get("default_mode") |> then(&if(&1 == "auto", do: :auto, else: :off)),
current_content_top: "",
current_content_bottom: "",
current_time_fn: fn ->
time_zone = Application.get_env(:realtime_signs, :time_zone)
DateTime.utc_now() |> DateTime.shift_zone!(time_zone)
end,
current_time_fn: current_time_fn,
last_update: nil,
tick_read: 240 + Map.fetch!(config, "read_loop_offset"),
read_period_seconds: 240,
headway_stop_id: Map.get(config, "headway_stop_id"),
uses_shuttles: Map.get(config, "uses_shuttles", true),
pa_message_plays: %{}
pa_message_plays: %{},
last_message_log_time: current_time_fn.()
}

GenServer.start_link(__MODULE__, sign, name: :"Signs/#{sign.id}")
Expand Down Expand Up @@ -191,6 +196,7 @@ defmodule Signs.Realtime do
|> Utilities.Reader.do_announcements(messages)
|> Utilities.Reader.read_sign(messages)
|> decrement_ticks()
|> log_sign_messages(messages)
|> Map.put(:prev_predictions, all_predictions)

Process.send_after(self(), :run_loop, 1000)
Expand Down Expand Up @@ -280,6 +286,34 @@ defmodule Signs.Realtime do
end)
end

# This is some temporary logging to assist with headway accuracy analysis
defp log_sign_messages(%{source_config: {_, _}} = sign, _messages), do: sign

defp log_sign_messages(sign, messages) do
now = sign.current_time_fn.()

if DateTime.after?(now, DateTime.shift(sign.last_message_log_time, minute: 1)) do
Enum.each(messages, fn
%Message.Headway{} = message ->
Logger.info(
"sign_headway_message: sign_id=#{inspect(sign.id)} destination=#{message.destination |> to_string() |> inspect()} route=#{inspect(message.route)}"
)

%Message.Alert{} = message ->
Logger.info(
"sign_alert_message: sign_id=#{inspect(sign.id)} destination=#{message.destination |> to_string() |> inspect()} route=#{inspect(message.route)}"
)

_ ->
nil
end)

%{sign | last_message_log_time: now}
else
sign
end
end

@spec decrement_ticks(Signs.Realtime.t()) :: Signs.Realtime.t()
def decrement_ticks(sign) do
%{sign | tick_read: sign.tick_read - 1}
Expand Down
5 changes: 3 additions & 2 deletions test/headway_analysis/server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule HeadwayAnalysys.ServerTest do
sign_id: "test_sign",
headway_group: "test_headway_group",
stop_ids: ["1"],
vehicles_present: MapSet.new(["v-1"])
vehicle_trips: %{"v-1" => "trip-1"}
}

setup :verify_on_exit!
Expand All @@ -23,7 +23,8 @@ defmodule HeadwayAnalysys.ServerTest do

assert capture_log([level: :info], fn ->
HeadwayAnalysis.Server.handle_info(:update, @state)
end) =~ "headway_analysis_departure: sign_id=test_sign headway_low=3 headway_high=6"
end) =~
"headway_analysis_departure: sign_id=\"test_sign\" trip_id=\"trip-1\" headway_low=3 headway_high=6"
end

test "does not log non-revenue departures" do
Expand Down
3 changes: 2 additions & 1 deletion test/signs/realtime_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule Signs.RealtimeTest do
last_update: @fake_time,
tick_read: 1,
read_period_seconds: 100,
pa_message_plays: %{}
pa_message_plays: %{},
last_message_log_time: @fake_time
}

@mezzanine_sign %{
Expand Down