Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 78 additions & 48 deletions pre_editor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import argparse
import gpxpy
import json
import overpy
import pickle

from geopy.distance import geodesic
from typing import Tuple

import gpxpy
import overpy
from geopy.distance import geodesic, distance

# Manejar entradas
parser = argparse.ArgumentParser(
prog='Pre-editor OSM',
Expand All @@ -19,14 +19,19 @@
parser.add_argument('--rango', '-r', metavar='', required=False,
default=20, type=int,
help='radio en metros de la circunferencia donde' +
'se descargarán los elementos de OSM')
parser.add_argument('--debug', '-d', metavar='', required=False, default=False,
type=bool, help='mostrar mensajes de depuración')
'se descargarán los elementos de OSM')
parser.add_argument('--debug', '-d', required=False, action='store_true',
help='mostrar mensajes de depuración')

args = parser.parse_args()

# ruta de nodos constante
RUTA_ARCHIVO_NODOS = "datos/nodos.pickle"

# índices constantes para el cuadro delimitador
LATITUD_INFERIOR, LONGITUD_INFERIOR = 0, 1
LATITUD_SUPERIOR, LONGITUD_SUPERIOR = 2, 3


def parsear_esquema() -> dict:
"""
Expand All @@ -35,31 +40,28 @@ def parsear_esquema() -> dict:

Devuelve un diccionario correspondiente al esquema de mapeo.json
"""
with open(args.esquema) as archivo_esquema:
with open(args.esquema, 'r') as archivo_esquema:
esquema_parseado = json.load(archivo_esquema)

return esquema_parseado


def obtener_cuadro_delimitador_de_gpx(gpx: gpxpy.gpx.GPX) -> \
Tuple[float, float, float, float]:
def obtener_cuadro_delimitador_de_gpx(gpx: gpxpy.gpx.GPX) \
-> Tuple[float, float, float, float]:
"""
Calcula las coordenadas del cuadro delimitador (`bbox`) más pequeño capaz
de contener los puntos de referencia de la traza (`waypoints`). El orden de
las coordenadas del cuadro delimitador es el utilizado por Overpass,
el cual es:
(latitud inferior, longitud inferior, latitud superior, longitud superior)
"""
# índices constantes
LATITUD_INFERIOR, LONGITUD_INFERIOR = 0, 1
LATITUD_SUPERIOR, LONGITUD_SUPERIOR = 2, 3

lat, lon = gpx.waypoints[0].latitude, gpx.waypoints[0].longitude
# [latitud inferior, longitud inferior,
# latitud superior, longitud superior]
cuadro = [lat, lon, lat, lon]

# revisa los puntos de cada punto de refrencia
# revisa los puntos de cada punto de referencia
for waypoint in gpx.waypoints[1:]:
if waypoint.latitude < cuadro[LATITUD_INFERIOR]:
cuadro[LATITUD_INFERIOR] = waypoint.latitude
Expand All @@ -74,11 +76,40 @@ def obtener_cuadro_delimitador_de_gpx(gpx: gpxpy.gpx.GPX) -> \
if debug:
print("El cuadro es: ", cuadro)

return sumar_rango_cuadro_delimitador(tuple(cuadro))


def sumar_rango_cuadro_delimitador(cuadro: Tuple[float, float, float, float]) \
-> Tuple[float, float, float, float]:
"""
Dado un cuadro delimitador (`bbox`) más pequeño capaz de contener
los puntos de referencia de la traza (`waypoints`). Incrementa
las coordenadas en n metros según el rango dado por
parámetro en la terminal.

El resultado es una tupla de la siguiente forma:
(latitud inferior + args.rango hacia el Noreste,
longitud inferior + args.rango hacia el Noreste,
latitud superior + args.rango hacia el Noroeste,
longitud superior + args.rango hacia el Noroeste)
"""

coordenada_superior = cuadro[LATITUD_SUPERIOR], cuadro[LONGITUD_SUPERIOR]
distance(meters=args.rango).destination(coordenada_superior, bearing=-45)

coordenada_inferior = cuadro[LATITUD_INFERIOR], cuadro[LONGITUD_INFERIOR]
distance(meters=args.rango).destination(coordenada_inferior, bearing=45)

cuadro_aumentado = [0.0, 0.0, 0.0, 0.0]

cuadro_aumentado[LATITUD_INFERIOR], cuadro_aumentado[LONGITUD_INFERIOR] = coordenada_superior
cuadro_aumentado[LONGITUD_INFERIOR], cuadro_aumentado[LONGITUD_INFERIOR] = coordenada_inferior

return tuple(cuadro)


def descargar_nodos_en_cuadro_delimitador(
cuadro: Tuple[float, float, float, float]) -> [overpy.Node]:
cuadro: Tuple[float, float, float, float]) -> [overpy.Node]:
"""
Descarga todos los nodos con al menos una etiqueta ubicados dentro de las
coordenadas del cuadro delimitador.
Expand Down Expand Up @@ -134,7 +165,7 @@ def leer_nodos_en_archivo(nombre_archivo=RUTA_ARCHIVO_NODOS) -> [overpy.Node]:
with open(nombre_archivo, 'rb') as archivo:
nodos = pickle.load(archivo)
if debug:
print(f"Los nodos del archivo {nombre_archivo} fueron leidos")
print(f"Los nodos del archivo {nombre_archivo} fueron leídos")
archivo.close()
return nodos

Expand Down Expand Up @@ -162,16 +193,16 @@ def mostrar_nodos_descargados(nodos_descargados: [overpy.Node],
print('{:>10} => {:>10}'.format(llave, valor))


def imprimir_encabezado_traza(ruta_gpx: str) -> None:
def imprimir_encabezado_traza() -> None:
"""
Muestra en pantalla un encabezado correspondiente a un
informe de una ruta de traza gpx brindada y
la ruta del esquema enviada por terminal.

No devuelve ningun valor.
No devuelve ningún valor.
"""
msj = "Informe \n{0}\n\nAnalizando la ruta {1} con el esquema de mapeo {2}"
print(msj.format("="*8, ruta_gpx, args.esquema))
print(msj.format("=" * 8, args.gpx, args.esquema))


def imprimir_encabezado_waypoint(nombre: str,
Expand All @@ -181,7 +212,7 @@ def imprimir_encabezado_waypoint(nombre: str,
Muestra en pantalla un encabezado correspondiente
a un informe del análisis de un waypoint.

No devuelve ningun valor.
No devuelve ningún valor.
"""
msj = "\n\nAnalizando el waypoint con nombre '{0}' ubicado en ({1},{2})."
print(msj.format(nombre, latitud, longitud))
Expand All @@ -193,7 +224,7 @@ def imprimir_crear(latitud: float, longitud: float,
Basado en una latitud y longitud indica que se debe crear un
nodo con ciertas etiquetas en esa coordenada especifica.

No devuelve ningun valor.
No devuelve ningún valor.
"""
msj = ("[CREAR] Se debe añadir un nuevo nodo en ({0}, {1})"
" con las etiquetas:\n{2}")
Expand All @@ -204,16 +235,16 @@ def imprimir_crear(latitud: float, longitud: float,
def imprimir_info(argumentos_imprimir: (int, dict, float, float)) -> None:
"""
Con base a los argumentos de imprimir, obtiene
el indentificador de un nodo o vía en Open Street Map e
el identificador de un nodo o vía en Open Street Map e
imprime un mensaje de que el nodo se encuentra
mapeado correctamente.

No devuelve ningun valor.
No devuelve ningún valor.
"""
id = argumentos_imprimir[0]
identificador = argumentos_imprimir[0]
msj = ("[INFO] El nodo https://osm.org/node/{}"
" está correctamente mapeado.")
print(msj.format(id))
print(msj.format(identificador))


def imprimir_editar(argumentos_imprimir: (int, dict, float, float)) -> None:
Expand All @@ -224,12 +255,12 @@ def imprimir_editar(argumentos_imprimir: (int, dict, float, float)) -> None:
etiquetas de la forma {llave1:valor1...llaveN:valorN}
encontradas en el esquema de mapeo.

No devuelve ningun valor.
No devuelve ningún valor.
"""
id, etiquetas_faltantes, _, _ = argumentos_imprimir
identificador, etiquetas_faltantes, _, _ = argumentos_imprimir
msj = ("[EDITAR] El nodo https://osm.org/node/{0}"
" debe ser mejorado con las etiquetas:\n{1}")
print(msj.format(id, json.dumps(etiquetas_faltantes, indent=4)[1:-1]))
print(msj.format(identificador, json.dumps(etiquetas_faltantes, indent=4)[1:-1]))


def imprimir_revisar(argumentos_imprimir: (int, dict, float, float)) -> None:
Expand All @@ -241,34 +272,34 @@ def imprimir_revisar(argumentos_imprimir: (int, dict, float, float)) -> None:
mayor cantidad de etiquetas a las que corresponde en el
esquema de mapeo y por lo tanto debe ser revisado.

No devuelve ningun valor.
No devuelve ningún valor.
"""
id, etiquetas_sobrantes, _, _ = argumentos_imprimir
identificador, etiquetas_sobrantes, _, _ = argumentos_imprimir
msj = ("[REVISAR] El nodo https://osm.org/node/{0}"
" tiene más etiquetas que las indicadas en el"
"esquema de mapeo {1}\nLas etiquetas demás son:\n{2}")
print(msj.format(id, args.esquema, json.dumps(
print(msj.format(identificador, args.esquema, json.dumps(
etiquetas_sobrantes, indent=4)[1:-1]))


def imprimir_resultado(resultado: (callable, dict), id: int,
def imprimir_resultado(resultado: (callable, dict), identificador: int,
latitud: float, longitud: float) -> None:
"""
Recibe un resultado retornado de la forma
(funcion_imprimir, etiquetas_analizadas),
(función_imprimir, etiquetas_analizadas),
un identificador y la latitud y longitud del waypoint.

Con base a esto decide cuál funcion debe ser llamada
Con base a esto decide cuál función debe ser llamada
para imprimir el resultado.

No devuelve ningun valor.
No devuelve ningún valor.
"""
funcion_imprimir, etiquetas_analizadas = resultado
argumentos_imprimir = (id, etiquetas_analizadas, latitud, longitud)
argumentos_imprimir = (identificador, etiquetas_analizadas, latitud, longitud)
funcion_imprimir(argumentos_imprimir)


def analizar_traza(ruta_gpx: str, almacenar=False) -> None:
def analizar_traza(almacenar=False) -> None:
"""
Recibe una ruta a un archivo GPX el cual cada uno de sus
waypoints debe ser analizado.
Expand All @@ -279,10 +310,10 @@ def analizar_traza(ruta_gpx: str, almacenar=False) -> None:
No devuelve ningún valor.
"""

imprimir_encabezado_traza(ruta_gpx)
imprimir_encabezado_traza()

# Parsear el gpx
archivo_gpx = open(ruta_gpx, 'r')
archivo_gpx = open(args.gpx, 'r')
gpx = gpxpy.parse(archivo_gpx)
archivo_gpx.close()

Expand All @@ -301,7 +332,7 @@ def analizar_traza(ruta_gpx: str, almacenar=False) -> None:
longitud = waypoint.longitude
nombre = waypoint.name

# Mostrar informacion del waypoint
# Mostrar información del waypoint
nodos_cercanos = obtener_nodos_en_rango(nodos_totales, latitud,
longitud)
imprimir_encabezado_waypoint(nombre, latitud, longitud)
Expand Down Expand Up @@ -330,8 +361,8 @@ def analizar_traza(ruta_gpx: str, almacenar=False) -> None:
def analizar_etiquetas(etiquetas_osm: dict,
etiquetas_esquema: dict) -> (callable, dict):
"""
Segun las etiquetas en osm cercanas y las etiquetas en el
esquema de cierto waypoint, determina cual es la funcion
Según las etiquetas en osm cercanas y las etiquetas en el
esquema de cierto waypoint, determina cual es la función
que se debe llamar para mostrar en pantalla las etiquetas
analizadas

Expand All @@ -344,23 +375,23 @@ def analizar_etiquetas(etiquetas_osm: dict,
diccionarios pero hay algunas adicionales en OSM (REVISAR)

Si ninguno de los tres casos está presente entonces se
devuelve un resultado vacio
devuelve un resultado vacío

Devuelve una tupla de resultado con el formato:

(funcion_imprimir, etiquetas)
(función_imprimir, etiquetas)

etiquetas será una opción entre:
etiquetas_sobrantes||faltantes||necesarias||None

funcion_imprimir será una opción entre imprimir_:
función_imprimir será una opción entre imprimir_:
info||editar||revisar||None
"""

debug_preambulo = "DEBUG (analizar_etiquetas): "

if debug:
print('\n\n {0} etiquetas_osm: {1} etiquetas_esquema{2}'
print('\n\n{0} etiquetas_osm: {1} etiquetas_esquema{2}'
.format(debug_preambulo, etiquetas_osm, etiquetas_esquema))
total_osm = len(etiquetas_osm)
total_esquema = len(etiquetas_esquema)
Expand Down Expand Up @@ -410,6 +441,5 @@ def analizar_etiquetas(etiquetas_osm: dict,
esquema = parsear_esquema()

if __name__ == "__main__":
ruta_gpx = args.gpx
debug = args.debug
analizar_traza(ruta_gpx)
analizar_traza()