|
| 1 | +# /// script |
| 2 | +# requires-python = ">=3.11" |
| 3 | +# dependencies = [ |
| 4 | +# "click", |
| 5 | +# "shapely", |
| 6 | +# "pyproj", |
| 7 | +# ] |
| 8 | +# /// |
| 9 | + |
| 10 | +""" |
| 11 | +KWIVER Dump to JSON Converter |
| 12 | +Uses the kitware/kwiver docker file to run dump-klv to create |
| 13 | +a CSV file with the output data (CSV is chosen becase the json file is a bit obtuse). |
| 14 | +The CSV file is converted to a JSON file for easer processing and from there it creates 3 files: |
| 15 | +output_metadata.geojson - geojson file with the camera footprint plus the camera positioning indexed by frame number |
| 16 | +ground_frame_mapping.json - dictionary where the keys are video frames and the values are a bounding box representing the video location |
| 17 | +bbox_frame.geojson - A geojson file of all of the ground mapped bounding boxes where each has a property of the frame. |
| 18 | +
|
| 19 | +Usage: |
| 20 | + uv kwiver-klv.py path/to/video.mpg ./output |
| 21 | +""" |
| 22 | + |
| 23 | +import csv |
| 24 | +import json |
| 25 | +import subprocess |
| 26 | +from pathlib import Path |
| 27 | +import click |
| 28 | +from shapely.geometry import Point, Polygon, mapping |
| 29 | +from shapely.ops import unary_union |
| 30 | +import pyproj |
| 31 | + |
| 32 | +@click.command() |
| 33 | +@click.argument('video', type=click.Path(exists=True), required=True) |
| 34 | +@click.argument('output_dir', type=click.Path(), default='./output') |
| 35 | +def main(video, output_dir): |
| 36 | + video_path = Path(video).resolve() |
| 37 | + output_dir = Path(output_dir).resolve() |
| 38 | + |
| 39 | + output_dir.mkdir(parents=True, exist_ok=True) |
| 40 | + |
| 41 | + output_csv = output_dir / 'output.csv' |
| 42 | + output_json = output_dir / 'output.json' |
| 43 | + geojson_out = output_dir / 'output_metadata.geojson' |
| 44 | + bbox_out = output_dir / 'ground_frame_mapping.json' |
| 45 | + bbox_geojson_out = output_dir / 'bbox_frame.geojson' |
| 46 | + click.echo(f'Processing video: {video_path}') |
| 47 | + click.echo('Running KWIVER via Docker to extract metadata...') |
| 48 | + |
| 49 | + output_csv_file = Path(video_path.parent / 'output.csv') |
| 50 | + cmd = [ |
| 51 | + 'docker', 'run', '--rm', |
| 52 | + '-v', f'{video_path.parent}:/data', |
| 53 | + 'kitware/kwiver:latest', |
| 54 | + 'dump-klv', f'/data/{video_path.name}', |
| 55 | + '-l', f'/data/output.csv', |
| 56 | + '-e', 'csv' |
| 57 | + ] |
| 58 | + |
| 59 | + try: |
| 60 | + subprocess.run(cmd, check=True) |
| 61 | + except subprocess.CalledProcessError as e: |
| 62 | + click.secho(f'Error running KWIVER: {e}', fg='red') |
| 63 | + return |
| 64 | + |
| 65 | + if not output_csv_file.exists(): |
| 66 | + click.secho('KWIVER did not generate the expected CSV output.', fg='red') |
| 67 | + return |
| 68 | + |
| 69 | + click.echo(f'KWIVER output saved to {output_csv}') |
| 70 | + |
| 71 | + click.echo(f'Reading {output_csv_file} and converting to JSON...') |
| 72 | + with open(output_csv_file, newline='') as csvfile: |
| 73 | + reader = csv.DictReader(csvfile) |
| 74 | + frames = [row for row in reader] |
| 75 | + |
| 76 | + with open(output_json, 'w') as f: |
| 77 | + json.dump(frames, f, indent=2) |
| 78 | + |
| 79 | + click.secho(f'JSON output written to {output_json}', fg='green') |
| 80 | + |
| 81 | + create_geojson_and_bbox(frames, geojson_out, bbox_out, bbox_geojson_out) |
| 82 | + click.secho(f'GeoJSON written to {geojson_out}', fg='cyan') |
| 83 | + click.secho(f'Bounding box mapping written to {bbox_out}', fg='cyan') |
| 84 | + |
| 85 | + |
| 86 | +def create_geojson_and_bbox(frames, geojson_out, bbox_out, bbox_geojson_out): |
| 87 | + geod = pyproj.Geod(ellps='WGS84') |
| 88 | + features = [] |
| 89 | + polygons = [] |
| 90 | + frame_polygons = [] |
| 91 | + frame_to_bbox = {} |
| 92 | + total = len(frames) |
| 93 | + |
| 94 | + for frame in frames: |
| 95 | + try: |
| 96 | + frame_id = frame.get("Frame ID", None) |
| 97 | + if frame_id is None: |
| 98 | + continue |
| 99 | + |
| 100 | + # Sensor location |
| 101 | + sensor_lat = float(frame["Sensor Geodetic Latitude (EPSG:4326)"]) |
| 102 | + sensor_lon = float(frame["Sensor Geodetic Longitude (EPSG:4326)"]) |
| 103 | + |
| 104 | + # Frame center and bounding |
| 105 | + center_lat = float(frame["Geodetic Frame Center Latitude (EPSG:4326)"]) |
| 106 | + center_lon = float(frame["Geodetic Frame Center Longitude (EPSG:4326)"]) |
| 107 | + width = float(frame["Target Width (meters)"]) |
| 108 | + |
| 109 | + # Compute bounding box corners around center |
| 110 | + corners = [] |
| 111 | + for az in (0, 90, 180, 270): |
| 112 | + lon, lat, _ = geod.fwd(center_lon, center_lat, az, width / 2) |
| 113 | + corners.append((lon, lat)) |
| 114 | + corners.append(corners[0]) # close the polygon |
| 115 | + |
| 116 | + polygon = Polygon(corners) |
| 117 | + polygons.append(polygon) |
| 118 | + frame_polygons.append((frame_id, polygon)) |
| 119 | + frame_to_bbox[frame_id] = corners |
| 120 | + |
| 121 | + # Point feature at sensor location |
| 122 | + point = Point(sensor_lon, sensor_lat) |
| 123 | + feature = { |
| 124 | + "type": "Feature", |
| 125 | + "geometry": mapping(point), |
| 126 | + "properties": { |
| 127 | + "Frame ID": frame_id, |
| 128 | + "Platform Ground Speed": frame.get("Platform Ground Speed (m/s)"), |
| 129 | + "Platform Vertical Speed": frame.get("Platform Vertical Speed (m/s)") |
| 130 | + } |
| 131 | + } |
| 132 | + features.append(feature) |
| 133 | + |
| 134 | + except (KeyError, ValueError) as e: |
| 135 | + print(f"Skipping frame due to error: {e}") |
| 136 | + continue |
| 137 | + |
| 138 | + # Add unioned polygon |
| 139 | + if polygons: |
| 140 | + merged = unary_union(polygons) |
| 141 | + features.append({ |
| 142 | + "type": "Feature", |
| 143 | + "geometry": mapping(merged), |
| 144 | + "properties": { |
| 145 | + "type": "Unioned Bounding Box" |
| 146 | + } |
| 147 | + }) |
| 148 | + |
| 149 | + # Save GeoJSON |
| 150 | + geojson = { |
| 151 | + "type": "FeatureCollection", |
| 152 | + "features": features |
| 153 | + } |
| 154 | + |
| 155 | + with open(geojson_out, 'w') as f: |
| 156 | + json.dump(geojson, f, indent=2) |
| 157 | + |
| 158 | + # Save bbox mapping |
| 159 | + with open(bbox_out, 'w') as f: |
| 160 | + json.dump(frame_to_bbox, f, indent=2) |
| 161 | + |
| 162 | + def get_gradient_color(idx, total): |
| 163 | + r = int(255 * (idx / (total - 1))) |
| 164 | + b = int(255 * (1 - idx / (total - 1))) |
| 165 | + return f"#{r:02x}00{b:02x}" |
| 166 | + |
| 167 | + # Individual frame bbox polygons with styling |
| 168 | + bbox_features = [] |
| 169 | + for idx, (frame_id, poly) in enumerate(frame_polygons): |
| 170 | + color = get_gradient_color(idx, total) |
| 171 | + feature = { |
| 172 | + "type": "Feature", |
| 173 | + "geometry": mapping(poly), |
| 174 | + "properties": { |
| 175 | + "frame_id": frame_id, |
| 176 | + "type": "Unioned Bounding Box", |
| 177 | + "stroke": color, |
| 178 | + "stroke-width": 2, |
| 179 | + "stroke-opacity": 1, |
| 180 | + "fill": "#ff0000", |
| 181 | + "fill-opacity": 0.5 |
| 182 | + } |
| 183 | + } |
| 184 | + bbox_features.append(feature) |
| 185 | + |
| 186 | + bbox_geojson = { |
| 187 | + "type": "FeatureCollection", |
| 188 | + "features": bbox_features |
| 189 | + } |
| 190 | + |
| 191 | + with open(bbox_geojson_out, 'w') as f: |
| 192 | + json.dump(bbox_geojson, f, indent=2) |
| 193 | + |
| 194 | +if __name__ == '__main__': |
| 195 | + main() |
0 commit comments