Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions backend/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,5 @@ class Metrics(Base):
zone_id= Column("zone_id", Integer, ForeignKey("dim_zone.id"))
zone_category= Column("zone_category", String, ForeignKey("dim_zone.category"))
zone_sub_category= Column("zone_sub_category", String, ForeignKey("dim_zone.sub_category"))
zone_enable = Column("enable",Boolean(), server_default="True")


zone_enable = Column("zone_enable", Boolean(), server_default="True")
__mapper_args__ = {"eager_defaults": False}
19 changes: 13 additions & 6 deletions backend/bloom/infra/repositories/repository_excursion.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def map_to_domain(excursion: sql_model.Excursion) -> Excursion:
excursion_duration=excursion.excursion_duration,
total_time_at_sea=excursion.total_time_at_sea,
total_time_in_amp=excursion.total_time_in_amp,
total_time_in_territorial_waters=excursion.total_time_fishing_in_territorial_waters,
total_time_in_territorial_waters=excursion.total_time_in_territorial_waters,
total_time_in_zones_with_no_fishing_rights=excursion.total_time_in_zones_with_no_fishing_rights,
total_time_fishing=excursion.total_time_fishing,
total_time_fishing_in_amp=excursion.total_time_fishing_in_amp,
Expand All @@ -199,21 +199,28 @@ def map_to_orm(excursion: Excursion) -> sql_model.Excursion:
vessel_id=excursion.vessel_id,
departure_port_id=excursion.departure_port_id,
departure_at=excursion.departure_at,
departure_position=from_shape(
excursion.departure_position) if excursion.departure_position is not None else None,
departure_position=(
from_shape(excursion.departure_position)
if excursion.departure_position is not None
else None
),
arrival_port_id=excursion.arrival_port_id,
arrival_at=excursion.arrival_at,
arrival_position=from_shape(excursion.arrival_position) if excursion.arrival_position is not None else None,
arrival_position=(
from_shape(excursion.arrival_position)
if excursion.arrival_position is not None
else None
),
excursion_duration=excursion.excursion_duration,
total_time_at_sea=excursion.total_time_at_sea,
total_time_in_amp=excursion.total_time_in_amp,
total_time_in_territorial_waters=excursion.total_time_fishing_in_territorial_waters,
total_time_in_territorial_waters=excursion.total_time_in_territorial_waters,
total_time_in_zones_with_no_fishing_rights=excursion.total_time_in_zones_with_no_fishing_rights,
total_time_fishing=excursion.total_time_fishing,
total_time_fishing_in_amp=excursion.total_time_fishing_in_amp,
total_time_fishing_in_territorial_waters=excursion.total_time_fishing_in_territorial_waters,
total_time_fishing_in_zones_with_no_fishing_rights=excursion.total_time_fishing_in_zones_with_no_fishing_rights,
total_time_default_ais=excursion.total_time_default_ais,
created_at=excursion.created_at,
updated_at=excursion.updated_at
updated_at=excursion.updated_at,
)
5 changes: 3 additions & 2 deletions backend/bloom/infra/repositories/repository_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ def __init__(
# return df

def batch_create_metrics(
self, session: Session, metricss: list[Metrics]
self, session: Session, metrics: list[Metrics]
) -> list[Metrics]:
orm_list = [MetricsRepository.map_to_orm(metrics) for metrics in metricss]
orm_list = [MetricsRepository.map_to_orm(metrics) for metrics in metrics]
session.add_all(orm_list)
session.flush()
return [MetricsRepository.map_to_domain(orm) for orm in orm_list]


Expand Down
2 changes: 2 additions & 0 deletions backend/bloom/infra/repositories/repository_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def update_port_has_excursion(self, session : Session, port_id: int ):
.values(has_excursion= True)
)
session.execute(stmt)
session.flush()


def has_excursion_for_port(self, session: Session, port_id: int) -> bool:
stmt = select(sql_model.Excursion).where(
Expand Down
67 changes: 52 additions & 15 deletions backend/bloom/infra/repositories/repository_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def get_last_vessel_id_segments(self, session: Session) -> pd.DataFrame:
df["end_position"] = df["end_position"].astype(str).apply(wkb.loads)
return df


def get_vessel_attribute_by_segment(self, session: Session, segment_id: int) -> str:
stmt = select(
sql_model.Vessel.country_iso3
Expand All @@ -176,11 +175,10 @@ def get_vessel_attribute_by_segment(self, session: Session, segment_id: int) ->
).filter(
sql_model.Segment.id == segment_id
)

result = session.execute(stmt).scalar()

return result


def get_vessel_attribute_by_segment_created_updated_after(self, session: Session, segment_id: int, created_updated_after: datetime) -> Vessel:
stmt = (
Expand Down Expand Up @@ -217,14 +215,15 @@ def get_vessel_attribute_by_segment_created_updated_after(self, session: Session
return None
else:
return VesselRepository.map_to_domain(vessel)
#df = pd.DataFrame(result, columns=["vessel_id", "vessel_mmsi", "ship_name", "vessel_country_iso3","vessel_imo"])
#return df
# df = pd.DataFrame(result, columns=["vessel_id", "vessel_mmsi", "ship_name", "vessel_country_iso3","vessel_imo"])
# return df

def batch_create_segment(
self, session: Session, segments: list[Segment]
) -> list[Segment]:
orm_list = [SegmentRepository.map_to_orm(segment) for segment in segments]
session.add_all(orm_list)
session.flush()
return [SegmentRepository.map_to_domain(orm) for orm in orm_list]

def get_segments_created_updated_after(self, session: Session, created_updated_after: datetime) -> list[Segment]:
Expand All @@ -235,7 +234,6 @@ def get_segments_created_updated_after(self, session: Session, created_updated_a
result = session.execute(stmt).scalars()
return [SegmentRepository.map_to_domain(orm) for orm in result]


def find_segments_in_zones(self, session: Session) -> dict[
Segment, list[Zone]]:
stmt = select(sql_model.Segment, sql_model.Zone).outerjoin(sql_model.Zone, and_(
Expand All @@ -252,7 +250,6 @@ def find_segments_in_zones(self, session: Session) -> dict[
if zone:
dict[segment].append(zone)
return dict


def find_segments_in_zones_created_updated_after(self, session: Session, created_after: datetime) -> dict[
Segment, list[Zone]]:
Expand All @@ -272,6 +269,35 @@ def find_segments_in_zones_created_updated_after(self, session: Session, created
dict[segment].append(zone)
return dict

def find_segments_in_zones_by_ids(self, session: Session, segments_ids: list[int]) -> dict[
Segment, list[Zone]]:

segments_ids_tuple = tuple(segments_ids)

stmt = (
select(sql_model.Segment, sql_model.Zone)
.where(sql_model.Segment.id.in_(segments_ids_tuple))
.outerjoin(
sql_model.Zone,
and_(
ST_Within(
sql_model.Segment.start_position, sql_model.Zone.geometry
),
ST_Within(sql_model.Segment.end_position, sql_model.Zone.geometry),
),
)
.order_by(sql_model.Segment.created_at.asc())
)
result = session.execute(stmt)
dict = {}
for (segment_orm, zone_orm) in result:
segment = SegmentRepository.map_to_domain(segment_orm)
dict.setdefault(segment, [])
zone = ZoneRepository.map_to_domain(zone_orm) if zone_orm else None
if zone:
dict[segment].append(zone)
return dict

def batch_update_segment(self, session: Session, segments: list[Segment]) -> list[Segment]:
updated_segments = []
for segment in segments:
Expand All @@ -285,13 +311,24 @@ def batch_update_segment(self, session: Session, segments: list[Segment]) -> lis
# passe à False de la colonne last_vessel_segment pour tous les segments des excursions transmises
# passe à True la colonne last_vessel_segment pour tous les Id de segments les plus récent de chaque excursion
def update_last_segments(self, session: Session, vessel_ids: list[int]) -> int:
for v_id in vessel_ids:
upd1 = (update(sql_model.Segment).
where(
sql_model.Segment.id.in_(select(sql_model.Segment.id).join(sql_model.Excursion).where(sql_model.Excursion.vessel_id == v_id))).
values(last_vessel_segment=False))
session.execute(upd1)
session.flush()

# Convertir vessel_ids en tuple (évite des erreurs avec IN ())
vessel_ids_tuple = tuple(vessel_ids)

# Mettre tous les segments des navires concernés à False
upd1 = (
update(sql_model.Segment)
.where(
sql_model.Segment.id.in_(
select(sql_model.Segment.id)
.join(sql_model.Excursion)
.where(sql_model.Excursion.vessel_id.in_(vessel_ids_tuple))
)
)
.values(last_vessel_segment=False)
)
session.execute(upd1)

last_segments = session.execute(text("""SELECT DISTINCT ON (vessel_id) s.id FROM fct_segment s
JOIN fct_excursion e ON e.id = s.excursion_id
WHERE vessel_id in :vessel_ids
Expand Down Expand Up @@ -351,4 +388,4 @@ def map_to_orm(segment: Segment) -> sql_model.Segment:
last_vessel_segment=segment.last_vessel_segment,
created_at=segment.created_at,
updated_at=segment.updated_at
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ def set_duration(session: Session, task_name: str, pit: datetime,duration:timede
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==pit)
.values(duration=duration)
.values(delta=duration)
)
session.execute(stmt)

def set_position_count(session: Session, task_name: str, pit: datetime,count:int)->None:
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
Expand Down
86 changes: 69 additions & 17 deletions backend/bloom/infra/repositories/repository_vessel_position.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, List, Union

import pandas as pd
from dependency_injector.providers import Callable
from geoalchemy2.shape import from_shape, to_shape
from sqlalchemy import select
from sqlalchemy import and_, select, tuple_, func
from sqlalchemy.orm import Session

from bloom.config import settings
Expand All @@ -30,12 +30,12 @@ def batch_create_vessel_position(self, session: Session, vessel_positions: list[
return [VesselPositionRepository.map_to_domain(orm) for orm in orm_list]

def get_all_vessel_last_positions(self, session: Session) -> List[VesselPosition]:

stmt=select(sql_model.VesselPosition)\
.order_by(sql_model.VesselPosition.timestamp.desc())\
.group_by(sql_model.VesselPosition.vessel_id)
result = session.execute(stmt).scalars()
#logger.info(type(result))
# logger.info(type(result))
if result is not None :
return [VesselPositionRepository.map_to_domain(record) for record in result]
else:
Expand All @@ -44,27 +44,79 @@ def get_all_vessel_last_positions(self, session: Session) -> List[VesselPosition
def get_vessel_positions(self, session: Session, vessel_id:int,
start:datetime=datetime.now(),
end:datetime=None) -> List[VesselPosition]:

stmt=select(sql_model.VesselPosition).filter_by(vessel_id=vessel_id).order_by(sql_model.VesselPosition.timestamp.desc())
result = session.execute(stmt).scalars()
#logger.info(type(result))
# logger.info(type(result))
if result is not None :
return [VesselPositionRepository.map_to_domain(record) for record in result]
else:
return []

def get_positions_with_vessel_created_updated_after(self, session: Session,
created_updated_after: datetime) -> pd.DataFrame:
stmt = select(sql_model.VesselPosition.id, sql_model.VesselPosition.timestamp,
sql_model.VesselPosition.accuracy, sql_model.VesselPosition.collection_type,
sql_model.VesselPosition.course, sql_model.VesselPosition.heading,
sql_model.VesselPosition.position, sql_model.VesselPosition.longitude,
sql_model.VesselPosition.latitude, sql_model.VesselPosition.rot,
sql_model.VesselPosition.speed, sql_model.VesselPosition.created_at,
sql_model.Vessel.id, sql_model.Vessel.mmsi).where(
sql_model.VesselPosition.created_at > created_updated_after
).join(sql_model.Vessel, sql_model.VesselPosition.vessel_id == sql_model.Vessel.id).order_by(
sql_model.VesselPosition.created_at.asc())
created_updated_after: datetime,
vessel_ids: List[int]) -> pd.DataFrame:
stmt = (
select(
sql_model.VesselPosition.id,
sql_model.VesselPosition.timestamp,
sql_model.VesselPosition.accuracy,
sql_model.VesselPosition.collection_type,
sql_model.VesselPosition.course,
sql_model.VesselPosition.heading,
sql_model.VesselPosition.position,
sql_model.VesselPosition.longitude,
sql_model.VesselPosition.latitude,
sql_model.VesselPosition.rot,
sql_model.VesselPosition.speed,
sql_model.VesselPosition.created_at,
sql_model.Vessel.id,
sql_model.Vessel.mmsi,
)
.where(
and_(
sql_model.VesselPosition.created_at > created_updated_after,
sql_model.VesselPosition.vessel_id.in_(tuple(vessel_ids))
)
)
.join(
sql_model.Vessel,
sql_model.VesselPosition.vessel_id == sql_model.Vessel.id,
)
.order_by(sql_model.VesselPosition.vessel_id.asc())
)

result = session.execute(stmt)
df = pd.DataFrame(result,
columns=["id", "timestamp", "accuracy", "collection_type", "course", "heading", "position",
"longitude", "latitude", "rot", "speed", "created_at", "vessel_id", "mmsi"])
df["position"] = df["position"].apply(to_shape)
return df

def get_positions_with_vessel(self, session: Session) -> pd.DataFrame:
stmt = (
select(
sql_model.VesselPosition.id,
sql_model.VesselPosition.timestamp,
sql_model.VesselPosition.accuracy,
sql_model.VesselPosition.collection_type,
sql_model.VesselPosition.course,
sql_model.VesselPosition.heading,
sql_model.VesselPosition.position,
sql_model.VesselPosition.longitude,
sql_model.VesselPosition.latitude,
sql_model.VesselPosition.rot,
sql_model.VesselPosition.speed,
sql_model.VesselPosition.created_at,
sql_model.Vessel.id,
sql_model.Vessel.mmsi,
)
.join(
sql_model.Vessel,
sql_model.VesselPosition.vessel_id == sql_model.Vessel.id,
)
.order_by(sql_model.VesselPosition.vessel_id.asc())
)

result = session.execute(stmt)
df = pd.DataFrame(result,
Expand Down
Loading