Skip to content

Understanding the AIS Data Streaming and Storage Code

Caio Abreu Ribeiro edited this page Sep 9, 2023 · 7 revisions

Introduction:

This wiki page provides an explanation of the file pcs_db.py, which is designed to connect to an Automatic Identification System (AIS) data stream, process incoming AIS messages, and store them in a PostgreSQL database. AIS is commonly used for tracking and identifying ships and vessels. This code utilizes Python and various libraries, including asyncio, websockets, json, SQLAlchemy, and more.


Code Overview:

The provided code accomplishes the following key tasks:

  1. Importing Necessary Libraries:

The code starts by importing essential Python libraries to support its functionality.

import asyncio
import websockets
import json
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
  1. Database Configuration:

It configures the connection to a PostgreSQL database using SQLAlchemy. The database URL, including credentials and location, is defined as db_url.

db_url = 'postgresql://your_username:your_password@your_host/your_database'
engine = create_engine(db_url)
  1. SQLAlchemy Model Definition:

The code defines a SQLAlchemy model called AISData to represent the structure of a table in the database. This model includes fields for storing AIS data, such as ship ID, latitude, longitude, ship name, and timestamp.

Base = declarative_base()

class AISData(Base):
    __tablename__ = 'ais_data'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ship_id = Column(Integer)
    latitude = Column(Float)
    longitude = Column(Float)
    ship_name = Column(String)
    time_utc = Column(DateTime)
  1. Table Creation:

If the specified table (ais_data) does not exist in the database, the code creates it using the defined model structure.

Base.metadata.create_all(engine)
  1. Database Session Initialization:

A SQLAlchemy session is created to interact with the database.

Session = sessionmaker(bind=engine)
session = Session()
  1. AIS Data Stream Connection:

The code establishes a connection to an AIS WebSocket stream hosted at "wss://stream.aisstream.io/v0/stream".

api_key = get.env("API_KEY")
async def connect_ais_stream():
    async with websockets.connect("wss://stream.aisstream.io/v0/stream") as websocket:
  1. Subscribing to AIS Data:

It sends a subscription message to the AIS stream, specifying an API key and a bounding box that defines the geographic area of interest.

subscribe_message = {"APIKey": api_key,
                     "BoundingBoxes": [[[-11, 178], [30, 74]]]}
subscribe_message_json = json.dumps(subscribe_message)
await websocket.send(subscribe_message_json)
  1. Receiving and Processing AIS Messages:

The code enters a loop to receive and process incoming AIS messages. When a "PositionReport" message is received, it extracts relevant data fields such as ship ID, latitude, longitude, ship name, and timestamp. The timestamp is converted to a SQL-compatible format.

async for message_json in websocket:
    message = json.loads(message_json)
    message_type = message["MessageType"]

    if message_type == "PositionReport":
        # Extract AIS data fields
        ais_message = message['Message']['PositionReport']
        ship_id = ais_message['UserID']
        latitude = ais_message['Latitude']
        longitude = ais_message['Longitude']
        ship_name = message['MetaData']['ShipName']
        ship_name = ship_name.replace(" ", "")
        timestamp_string = message['MetaData']['time_utc']

        # Convert timestamp to SQL-compatible format
        formatted_input = timestamp_string.split('.')[0].strip()
        input_datetime = datetime.strptime(formatted_input, '%Y-%m-%d %H:%M:%S')
        sql_datetime = input_datetime.strftime('%Y-%m-%d %H:%M:%S')

        # Create an instance of AISData and add it to the session
        ais_data = AISData(
            ship_id=ship_id, latitude=latitude, longitude=longitude, ship_name=ship_name, time_utc=sql_datetime)
        session.add(ais_data)
        session.commit()
  1. Main Function:

The main function simply calls the connect_ais_stream function.

async def main():
    await connect_ais_stream()
  1. Execution:

The script's main execution is wrapped in a conditional block that ensures it runs when executed as a standalone script (not as an imported module).

if __name__ == "__main__":
    asyncio.run(main())

This code is designed to continuously fetch real-time AIS data, process it, and store it in a PostgreSQL database for further analysis or use. It demonstrates how to work with websockets, JSON data, and a relational database in a Python script.

Clone this wiki locally