High-Reliability Telemetry Bridge & Protocol Converter
This project is a robust, production-ready middleware designed to bridge the gap between modern HTTP-based IoT APIs and TCP-based tracking platforms. It represents an evolution in architecture, focusing on thread safety, clean code principles, modularity, and traceability.
While its predecessor (communication-protocol-translator) focused on direct socket translation, this middleware specializes in fetching data from external sources (HTTP/API), normalizing it, and injecting it into TCP streams as if it were a native device connection.
To understand this repository, you must visualize it as a Pipeline of Abstraction. The system moves data through three distinct layers:
- Input Layer (Source Specific): "Go get the data." (HTTP polling, WebSockets, etc.)
- Normalization Layer (The Core): "Make the data speak our language."
- Output Layer (Protocol Specific): "Send it to the server in their language." (TCP Packets)
graph TD
subgraph "Input Layer (Source)"
A[External API] -->|HTTP GET| B(Worker Loop)
B -->|Raw JSON| C(Input Processor)
C -->|Raw Data| D(Input Mapper)
end
subgraph "Middleware Layer (Core)"
D -->|Normalized Dict| E{Output Processor Singleton}
E -->|Routing| F[Output Session Manager]
end
subgraph "Output Layer (Destination)"
F -->|Select Session| G(Main Server Session)
G -->|Data + Config| H(Output Builder)
H -->|Binary Packet| G
G -->|TCP Stream| I[Tracking Platform]
end
If you are a developer trying to debug or extend this system, follow the "Life of a Packet" step-by-step:
Everything starts here. The main.py is an orchestrator. It does not contain logic; it contains configuration.
- It reads
settings.WORKERS_INPUT_SOURCE. - It dynamically imports the modules specified (e.g.,
app.src.input.mt02.worker). - It spawns Daemon Threads for each worker.
- Role: The heartbeat of the input side. It runs an infinite loop.
- Logic: It calls the
ApiClientto get data. - Intelligence: It checks Redis to see if the data is actually new (deduplication) before processing.
- Next Step: Passes data to
processor.process_location.
- Role: The Translator.
- Logic: This is where the specific external API format is converted into the Internal Standard Dictionary.
- Intelligence: It handles odometer calculations (using Haversine logic via
app/src/input/utils.pyif the API doesn't provide it) and timezone corrections. - Result: A clean Python dictionary (
lat,lon,speed,voltage, etc.) that the rest of the app understands.
- Role: The Traffic Controller.
- Key Component:
OutputProcessor(Singleton). - Logic: The input layer calls
output_processor.forward(). It doesn't know how or where the data goes, it just forwards it. - Intelligence: This component checks Redis to see which Output Protocol (e.g., GT06, Suntech) is assigned to this specific device.
- Role: The Connection Keeper. This is the most complex and robust part of the code.
- Key Class:
MainServerSession. - Robustness:
- Thread Safety: Uses
threading.RLock()to ensure that writing to the socket is safe even if multiple threads access it. - State Machine: Handles the connection lifecycle:
Connect->Login(if protocol requires) ->Send Data->Heartbeat. - Auto-Recovery: If the TCP connection drops, it handles reconnection seamlessly.
- Thread Safety: Uses
- Role: The Encoder.
- Logic: Takes the Internal Standard Dictionary and uses
struct.packto convert Python data types into Binary Byte Arrays required by protocols like GT06.
The system is designed so that main.py doesn't know which protocols exist until runtime.
- Input Sources are loaded based on
settings.py. - Output Protocols are loaded based on Redis configuration per device.
- Benefit: You can add a new API integration without touching the core logic.
- Daemon Threads: Workers run in the background.
- Locking (
threading.RLock): Critical inoutput_session.py. Since a device might send a heartbeat (timer thread) and a location packet (worker thread) simultaneously, locks prevent socket corruption.
Unlike simple scripts that open/close sockets per packet, this middleware maintains Persistent TCP Connections.
- Login Handshake: It knows that protocols like GT06 need a Login Packet before sending data. It handles this state (
self._is_gt06_login_step). - Heartbeats: It uses
threading.Timerto keep the connection alive (keep-alive) if the API stops sending data for a while.
Redis is not just a database here; it is the Shared Memory.
- Deduplication: "Did we already process this timestamp?"
- Configuration: "What output protocol does device X use?"
- Persistence: "What was the last odometer reading?"
- Create folder
app/src/input/starlink. - Create
api_client.py(Handling HTTP Auth/GET). - Create
mapper.py(Convert Starlink JSON -> Internal Dict). - Create
worker.py(The loop that calls client -> mapper ->output_processor.forward). - Register in
settings.WORKERS_INPUT_SOURCE.
- Create folder
app/src/output/coban. - Create
builder.py(Functions to build Login/Location binary packets). - Register in
app/src/output/output_mappers.py. - Add connection details in
settings.py.
Configuration is managed via pydantic in app/config/settings.py and environment variables (.env).
Key Settings:
WORKERS_INPUT_SOURCE: Dict defining which input modules to load.OUTPUT_PROTOCOL_HOST_ADRESSES: Target TCP servers (IP/Port).REDIS_*: Redis connection details.
The project uses pydantic/pydantic-settings via app/config/settings.py. Settings are loaded from a .env file by default (see Settings.model_config) and can be overridden with environment variables. Below is a concise reference for the most important settings you'll likely adjust.
-
LOG_LEVEL(str) β default:"INFO"- Controls application logging. Example:
LOG_LEVEL=DEBUG.
- Controls application logging. Example:
-
REDIS_HOST(str) β default:"localhost"- Hostname or IP for Redis. Example:
REDIS_HOST=redis.local.
- Hostname or IP for Redis. Example:
-
REDIS_PORT(int) β default:6379- Redis TCP port. Example:
REDIS_PORT=6379.
- Redis TCP port. Example:
-
REDIS_PASSWORD(str) β default: empty- Password for Redis auth (if configured). Keep secrets out of VCS.
-
REDIS_DB_MAIN(int) β default:15- Logical DB index used by the application for main storage.
-
WORKERS_INPUT_SOURCE(Dict[str, Dict[str,str]]) β default:{ "mt02": {"module_path": "app.src.input.mt02.worker", "func_name": "worker"} }-
Registers input worker modules the
main.pywill load dynamically. Each entry should includemodule_pathandfunc_name. -
To override from environment, set a JSON string. Example in
.env:WORKERS_INPUT_SOURCE='{"mt02": {"module_path": "app.src.input.mt02.worker", "func_name": "worker"}}'
-
-
MT02_API_BASE_URL(str) β default:"..."- Base URL used by the MT02 API client.
-
MT02_API_KEY(str) β default:"..."- API key/token for MT02.
-
MT02_WORKER_SLEEP_SECONDS(int) β default:30- How long the MT02 worker sleeps between polls.
-
OUTPUT_PROTOCOL_HOST_ADRESSES(Dict[str, tuple]) β default: usesos.getenv("SUNTECH_MAIN_SERVER_HOST")andos.getenv("GT06_MAIN_SERVER_HOST")-
Mapping of protocol name -> (host, port). By default this code reads
SUNTECH_MAIN_SERVER_*andGT06_MAIN_SERVER_*environment variables. If you prefer, you can setOUTPUT_PROTOCOL_HOST_ADRESSESdirectly as a JSON string in the environment, for example:OUTPUT_PROTOCOL_HOST_ADRESSES='{"gt06": ["gt06.example.com", 7011], "suntech4g": ["suntech.example.com", 7010] }'
-
-
DEFAULT_OUTPUT_PROTOCOL(str) β default:"gt06"- Protocol chosen when device-specific output mapping is not found.
-
GT06_LOCATION_PACKET_PROTOCOL_NUMBER(int) β default:0xA0- Numeric code used by GT06 builders to select packet variants.
How to inspect loaded settings at runtime
You can print or log the runtime settings object to verify loaded values. Example (useful in REPL or a small script):
from app.config.settings import settings
print(settings.model_dump()) # pydantic v2 safe dumpNotes on precedence and types
pydantic-settingsreads.env(UTF-8) by default and also honors environment variables; environment variables take precedence over values in.env.- When overriding complex objects (dicts/tuples) via environment variables, pass valid JSON (strings) and ensure proper quoting in shell or
.env. - Pydantic will coerce types where sensible (e.g.,
"6379"to int) but prefer correct types in the environment.
If you add new settings
- Add the default/value in
app/config/settings.pyand reference it in code viafrom app.config.settings import settings; settings.MY_NEW_SETTING. - For per-device protocol assignment, the middleware relies on Redis configuration β see
app/src/session/*for lookup keys and formats.
Before running the application you must define some environment variables that the project expects. You can set them directly in the system (recommended for production) or in a .env file at the repository root during development.
Expected variables (names):
MT02_API_BASE_URLβ Base URL of the MT02 API (e.g.,https://api.example.com)MT02_API_KEYβ API key/token to access the MT02 APIGT06_MAIN_SERVER_HOSTβ Host of the destination GT06 server (e.g.,gt06.your-host.com)GT06_MAIN_SERVER_PORTβ Port of the GT06 server (e.g.,7011)SUNTECH_MAIN_SERVER_HOSTβ Host of the destination Suntech 4G serverSUNTECH_MAIN_SERVER_PORTβ Port of the Suntech 4G serverREDIS_HOSTβ Redis host (e.g.,localhostorredis-service)REDIS_PORTβ Redis port (e.g.,6379)REDIS_PASSWORDβ Redis password (if applicable)
Example .env (place at the repository root; do not commit this file):
# MT02 API
MT02_API_BASE_URL="http://www.brgps.com/open"
MT02_API_KEY="<YOUR_MT02_API_KEY_HERE>"
# GT06 server
GT06_MAIN_SERVER_HOST="gt06.platform.example.com"
GT06_MAIN_SERVER_PORT="7011"
# Suntech 4G server
SUNTECH_MAIN_SERVER_HOST="suntech4g.platform.example.com"
SUNTECH_MAIN_SERVER_PORT="7010"
# Redis
REDIS_HOST="crossover.proxy.rlwy.net"
REDIS_PORT="55858"
REDIS_PASSWORD="<YOUR_REDIS_PASSWORD_HERE>"Best practices:
- Never include real keys or passwords in version control. Use placeholders in examples.
- For production, inject sensitive variables via secure mechanisms (secrets manager, CI/CD secrets, or orchestrator environment variables).
- For local development, use a
.envonly for convenience and add it to.gitignore.
- Python 3.11+
- Redis Server running
- Clone the repository.
- Create virtual env:
python -m venv venv&source venv/bin/activate. - Install dependencies:
pip install -r requirements.txt. - Copy
.env.exampleto.envand configure.
python main.pydocker build -t http2tcp-middleware .
docker run -d --env-file .env --network host http2tcp-middlewareDistributed under the Apache 2.0 License. See LICENSE for more information.