|
| 1 | +import argparse |
| 2 | +import json |
| 3 | +import logging |
| 4 | +import copy |
| 5 | + |
| 6 | +from ..sample_files.validation_schema import connected_work_zone_feed_v10 |
| 7 | + |
| 8 | +from ..tools import date_tools, cwz_translator, uuid_tools |
| 9 | + |
| 10 | +PROGRAM_NAME = "CWZPlannedEventsTranslator" |
| 11 | +PROGRAM_VERSION = "1.0" |
| 12 | + |
| 13 | + |
| 14 | +def main(): |
| 15 | + input_file, output_file = parse_planned_events_arguments() |
| 16 | + |
| 17 | + planned_events_obj = json.loads(open(input_file, "r").read()) |
| 18 | + cwz_feed = cwz_creator(planned_events_obj) |
| 19 | + |
| 20 | + if not cwz_feed: |
| 21 | + print("Error: CWZ message generation failed, see logs for more information.") |
| 22 | + else: |
| 23 | + with open(output_file, "w") as f: |
| 24 | + f.write(json.dumps(cwz_feed, indent=2)) |
| 25 | + print( |
| 26 | + "Your connected work zone message was successfully generated and is located here: " |
| 27 | + + str(output_file) |
| 28 | + ) |
| 29 | + |
| 30 | + |
| 31 | +# parse script command line arguments |
| 32 | +def parse_planned_events_arguments() -> tuple[str, str]: |
| 33 | + """Parse command line arguments for Planned Event data translation |
| 34 | +
|
| 35 | + Returns: |
| 36 | + tuple[str, str]: Planned Event file path, output file path |
| 37 | + """ |
| 38 | + parser = argparse.ArgumentParser(description="Translate Planned Event data to CWZ") |
| 39 | + parser.add_argument( |
| 40 | + "--version", action="version", version=f"{PROGRAM_NAME} {PROGRAM_VERSION}" |
| 41 | + ) |
| 42 | + parser.add_argument("plannedEventsFile", help="planned_events file path") |
| 43 | + parser.add_argument( |
| 44 | + "--outputFile", |
| 45 | + required=False, |
| 46 | + default="planned_events_cwz_translated_output_message.geojson", |
| 47 | + help="output file path", |
| 48 | + ) |
| 49 | + |
| 50 | + args = parser.parse_args() |
| 51 | + return args.plannedEventsFile, args.outputFile |
| 52 | + |
| 53 | + |
| 54 | +def cwz_creator(message: dict, info: dict | None = None) -> dict | None: |
| 55 | + """Translate Planned Event data to CWZ |
| 56 | +
|
| 57 | + Args: |
| 58 | + message (dict): Planned Event data |
| 59 | + info (dict, optional): CWZ info object. Defaults to None. |
| 60 | +
|
| 61 | + Returns: |
| 62 | + dict: CWZ object |
| 63 | + """ |
| 64 | + if not message: |
| 65 | + return None |
| 66 | + event_type = message["event"]["type"] |
| 67 | + |
| 68 | + # verify info obj |
| 69 | + if not info: |
| 70 | + info = cwz_translator.initialize_info() |
| 71 | + if not cwz_translator.validate_info(info): |
| 72 | + return None |
| 73 | + |
| 74 | + if event_type == "work-zone": |
| 75 | + wzd = cwz_translator.initialize_feed_object(info) |
| 76 | + feature = parse_work_zone(message) |
| 77 | + else: |
| 78 | + logging.warning(f"Unrecognized event type: {message['event']['type']}") |
| 79 | + return None |
| 80 | + |
| 81 | + if feature: |
| 82 | + wzd.get("features", []).append(feature) |
| 83 | + if not wzd.get("features"): |
| 84 | + return None |
| 85 | + wzd = cwz_translator.add_ids(wzd, event_type) |
| 86 | + |
| 87 | + if not cwz_translator.validate_feed( |
| 88 | + wzd, connected_work_zone_feed_v10.connected_work_zone_feed_v10_schema_string |
| 89 | + ): |
| 90 | + logging.warning("CWZ message failed validation") |
| 91 | + return None |
| 92 | + |
| 93 | + return wzd |
| 94 | + |
| 95 | + |
| 96 | +def get_vehicle_impact(lanes: list[dict]) -> str: |
| 97 | + """Determine the impact of lane closures on vehicle traffic |
| 98 | +
|
| 99 | + Args: |
| 100 | + lanes (list[dict]): List of lane objects |
| 101 | +
|
| 102 | + Returns: |
| 103 | + str: Vehicle impact status |
| 104 | + """ |
| 105 | + num_lanes = len(lanes) |
| 106 | + num_closed_lanes = 0 |
| 107 | + for i in lanes: |
| 108 | + if i["status"] != "open": |
| 109 | + num_closed_lanes += 1 |
| 110 | + if num_closed_lanes == num_lanes: |
| 111 | + return "all-lanes-closed" |
| 112 | + elif num_closed_lanes == 0: |
| 113 | + return "all-lanes-open" |
| 114 | + else: |
| 115 | + return "some-lanes-closed" |
| 116 | + |
| 117 | + |
| 118 | +# Parse Icone Incident to CWZ |
| 119 | +def parse_work_zone(incident: dict) -> dict | None: |
| 120 | + """Translate Planned Events RTDH standard work zone to CWZ |
| 121 | +
|
| 122 | + Args: |
| 123 | + incident (dict): Planned event event data |
| 124 | +
|
| 125 | + Returns: |
| 126 | + dict: CWZ object |
| 127 | + """ |
| 128 | + if not incident or type(incident) is not dict: |
| 129 | + return None |
| 130 | + |
| 131 | + event = incident.get("event") |
| 132 | + |
| 133 | + source = event.get("source") |
| 134 | + header = event.get("header") |
| 135 | + detail = event.get("detail") |
| 136 | + additional_info = event.get("additional_info", {}) |
| 137 | + |
| 138 | + geometry = { |
| 139 | + "type": "LineString", |
| 140 | + "coordinates": event.get("geometry", []), |
| 141 | + } |
| 142 | + properties = cwz_translator.initialize_feature_properties() |
| 143 | + |
| 144 | + core_details = properties["core_details"] |
| 145 | + |
| 146 | + # Event Type ['work-zone', 'detour'] |
| 147 | + core_details["event_type"] = event.get("type") |
| 148 | + |
| 149 | + # data_source_id - Leave this empty, it will be populated by add_ids |
| 150 | + core_details["data_source_id"] = "" |
| 151 | + |
| 152 | + # road_name |
| 153 | + road_names = [detail.get("road_name")] |
| 154 | + core_details["road_names"] = road_names |
| 155 | + |
| 156 | + # direction |
| 157 | + core_details["direction"] = detail.get("direction", "unknown") |
| 158 | + |
| 159 | + # related_road_events |
| 160 | + core_details["related_road_events"] = [] |
| 161 | + |
| 162 | + # name |
| 163 | + core_details["name"] = event.get("source", {}).get("id", None) |
| 164 | + |
| 165 | + # description |
| 166 | + core_details["description"] = header.get("description") |
| 167 | + |
| 168 | + # creation_date - not available |
| 169 | + |
| 170 | + # update_date |
| 171 | + core_details["update_date"] = date_tools.get_iso_string_from_unix( |
| 172 | + source.get("last_updated_timestamp") |
| 173 | + ) |
| 174 | + |
| 175 | + # core_details |
| 176 | + properties["core_details"] = core_details |
| 177 | + |
| 178 | + start_time = date_tools.parse_datetime_from_unix(header.get("start_timestamp")) |
| 179 | + end_time = date_tools.parse_datetime_from_unix(header.get("end_timestamp")) |
| 180 | + |
| 181 | + # start_date |
| 182 | + properties["start_date"] = date_tools.get_iso_string_from_datetime(start_time) |
| 183 | + |
| 184 | + # end_date |
| 185 | + properties["end_date"] = date_tools.get_iso_string_from_datetime(end_time) |
| 186 | + |
| 187 | + # is_start_date_verified |
| 188 | + properties["is_start_date_verified"] = False |
| 189 | + |
| 190 | + # is_end_date_verified |
| 191 | + properties["is_end_date_verified"] = False |
| 192 | + |
| 193 | + # is_start_position_verified |
| 194 | + properties["is_start_position_verified"] = False |
| 195 | + |
| 196 | + # is_end_position_verified |
| 197 | + properties["is_end_position_verified"] = False |
| 198 | + |
| 199 | + # location_method |
| 200 | + properties["location_method"] = "channel-device-method" |
| 201 | + |
| 202 | + # work_zone_type |
| 203 | + properties["work_zone_type"] = event.get("work_zone_type", "static") |
| 204 | + |
| 205 | + # vehicle impact |
| 206 | + lanes = additional_info.get("lanes", []) |
| 207 | + properties["vehicle_impact"] = get_vehicle_impact(lanes) |
| 208 | + |
| 209 | + # lanes |
| 210 | + properties["lanes"] = lanes |
| 211 | + |
| 212 | + # beginning_cross_street |
| 213 | + properties["beginning_cross_street"] = additional_info.get("beginning_cross_street") |
| 214 | + |
| 215 | + # beginning_cross_street |
| 216 | + properties["ending_cross_street"] = additional_info.get("ending_cross_street") |
| 217 | + |
| 218 | + # mileposts |
| 219 | + properties["beginning_milepost"] = additional_info.get("beginning_milepost") |
| 220 | + |
| 221 | + # ending_milepost |
| 222 | + properties["ending_milepost"] = additional_info.get("ending_milepost") |
| 223 | + |
| 224 | + # type_of_work |
| 225 | + # non-encroachment, minor-road-defect-repair, roadside-work, overhead-work, below-road-work, barrier-work, surface-work, painting, roadway-relocation, roadway-creation |
| 226 | + types_of_work = event.get("types_of_work", []) |
| 227 | + for type_of_work in types_of_work: |
| 228 | + if type_of_work.get("type_name") == "maintenance": |
| 229 | + type_of_work["type_name"] = "non-encroachment" |
| 230 | + properties["types_of_work"] = types_of_work |
| 231 | + |
| 232 | + # worker_presence - not available |
| 233 | + |
| 234 | + # reduced_speed_limit_kph - not available |
| 235 | + |
| 236 | + # restrictions |
| 237 | + properties["restrictions"] = additional_info.get("restrictions", []) |
| 238 | + |
| 239 | + properties["route_details_start"] = additional_info.get("route_details_start") |
| 240 | + properties["route_details_end"] = additional_info.get("route_details_end") |
| 241 | + |
| 242 | + properties["condition_1"] = additional_info.get("condition_1", True) |
| 243 | + |
| 244 | + filtered_properties = copy.deepcopy(properties) |
| 245 | + |
| 246 | + INVALID_PROPERTIES: list = [None, "", []] |
| 247 | + |
| 248 | + for key, value in properties.items(): |
| 249 | + if value in INVALID_PROPERTIES: |
| 250 | + del filtered_properties[key] |
| 251 | + |
| 252 | + for key, value in properties["core_details"].items(): |
| 253 | + if value in INVALID_PROPERTIES and key not in ["data_source_id"]: |
| 254 | + del filtered_properties["core_details"][key] |
| 255 | + |
| 256 | + feature = {} |
| 257 | + feature["id"] = uuid_tools.named_uuid_string( |
| 258 | + event.get("source", {}).get("id", None) |
| 259 | + ) |
| 260 | + feature["type"] = "Feature" |
| 261 | + feature["properties"] = filtered_properties |
| 262 | + feature["geometry"] = geometry |
| 263 | + |
| 264 | + return feature |
| 265 | + |
| 266 | + |
| 267 | +if __name__ == "__main__": |
| 268 | + main() |
0 commit comments