Skip to content

noi-techpark/opendatahub-timeseries-writer

Repository files navigation

Opendatahub Timeseries Writer

REUSE Compliance CI

The Open Data Hub Timeseries writer (formerly known as "Big Data Platform" or BDP) is part of the Open Data Hub project. It serves as the main API to write various timeseries data into a uniform data model, which can be accessed via the Timeseries API

For a detailed introduction, see our Big Data Platform Introduction.

Table of Contents

Inbound API (writer)

The core of the platform contains the business logic of an INBOUND API which handles connections to the database and provides an API for data collectors (see writer), in form of a REST interface and a Java SDK (see client).

Finally, dto which is a library containinig all Data Transfer Objects used by the writer and client to exchange data in a standardized format.

The OUTBOUND API is called Timeseries API.

The writer is a REST API, which takes JSON DTOs, deserializes and validates them and finally stores them in the database. Additionally, it sets stations to active/inactive according to their presence inside the provided data. The writer itself implements the methods to write data and is therefore the endpoint for all data collectors. It uses the persistence-unit of the DAL which has full permissions on the database.

The full API description can be found inside JsonController.java.

Getting started with Docker

If you want to run the application using Docker, the environment is already set up with all dependencies for you. This is the recommended way to test and develop data collectors for this writer API. You only have to install Docker and Docker Compose and follow the instructions below.

In the root folder of this repository:

  1. Copy .env.example to .env
  2. Run docker-compose up -d
  3. You can follow logs with docker-compose logs -f

Now you have a Postgres instance running on port 5555 and the API on port 8999.

Lets test Postgres first:

  1. Login to the DB

    a) with Docker, do:

    $ docker-compose exec db bash
    bash-5.1# psql -U bdp bdp
    

    b) natively, do:

    PGPASSWORD=password psql -h localhost -p 5555 -U bdp bdp
    
  2. Test the installation as follows:

bdp=# set search_path to intimev2;
bdp=# \dt

                  List of relations
  Schema  |           Name           | Type  | Owner
----------+--------------------------+-------+-------
 intimev2 | edge                     | table | bdp
 intimev2 | event                    | table | bdp
 intimev2 | flyway_schema_history    | table | bdp
 intimev2 | location                 | table | bdp
 intimev2 | measurement              | table | bdp
 intimev2 | measurementhistory       | table | bdp
 intimev2 | measurementjson          | table | bdp
 intimev2 | measurementjsonhistory   | table | bdp
 intimev2 | measurementstring        | table | bdp
 intimev2 | measurementstringhistory | table | bdp
 intimev2 | metadata                 | table | bdp
 intimev2 | provenance               | table | bdp
 intimev2 | station                  | table | bdp
 intimev2 | type                     | table | bdp
 intimev2 | type_metadata            | table | bdp
(15 rows)

... if you see a similar output as above, then you are set!

Please use the curl commands inside the chapter Authentication to test the writer API.

Getting started natively

If you do not want to use docker, you can also start this application manually. You need Java 17 and maven, and a Postgres DB. Postgresql can eventually also be started with our Docker setup. Just call docker-compose up -d db. It runs on port 5555. Alternatively, install and start your own Postgresql instance.

The database, schema and the privileged user must already exist, if that is not the case create them:

-- These values are already set inside the application.properties file, so you do
-- not need to configure anything except the port if you keep them like this!
create database bdp;
create user 'bdp' with login password 'password';
create schema if not exists 'intimev2';
grant all on schema intimev2 to bdp;

To start the writer, do the following:

  1. Open writer/src/main/resources/application.properties and configure it, this step can be omitted if you use our dockerized Postgresql. For your own Postgres, just alter the port to 5432 and make sure you use the same names as shown above. Otherwise, configure also those parameters...
  2. Start the Java application with mvn spring-boot:run

The application itself will create tables and other database objects for you. If you prefer to do that manually, set spring.flyway.enabled=false and execute the SQL files inside writer/src/main/resources/db/migration yourself. Replace ${default_schema} with your default schema, most probably intimev2.

Please use the curl commands inside the chapter Authentication to test the writer API.

Authentication

We use Keycloak to authenticate. That service provides an access_token that can be used to send POST requests to the writer. See the Open Data Hub Authentication / Quick Howto for further details.

curl -X POST -L "https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token" \
    --header 'Content-Type: application/x-www-form-urlencoded' \
    --data-urlencode 'grant_type=client_credentials' \
    --data-urlencode 'client_id=odh-mobility-datacollector-development' \
    --data-urlencode 'client_secret=7bd46f8f-c296-416d-a13d-dc81e68d0830'

With this call you get an access_token that can then be used as follows in all writer API methods. Here just an example to get all stations:

curl -X GET "http://localhost:8999/json/stations" \
    --header 'Content-Type: application/json' \
    --header 'Authorization: bearer your-access-token'

You should get an empty JSON list as result.

Write an email to help@opendatahub.com, if you want to get the client_secret and an Open Data Hub OAuth2 account for a non-development setup.

DAL

DAL is the Data Access Layer which communicates with the DB underneath used by the writer modules. The communication is handled through the ORM Hibernate and its spatial component for geometries. The whole module got developed using PostgreSQL as database and Postgis as an extension.

Connection pooling is handled by HikariCP for high speed connections to the DB.

In some cases geometry transformations and elaborations were needed to be executed on application level and therefore Geotools was added as dependency.

Configuration of the database connection

To configure the DAL module to communicate with your database you need to provide configuration and credentials inside

writer/src/main/resources/application.properties

Default can be found at:

writer/src/main/resources/META-INF/persistence.xml

Please note, values inside the application.properties file, overwrite values inside persistence.xml.

Hibernate, our object-relational-mapping (ORM) framework, handles the schema validation only (for security reasons). Usually, we set the value hibernate.hbm2ddl.auto = validate during development and hibernate.hbm2ddl.auto = none at runtime for performance reasons on startup.

Entities

This chapter describes the most important DAL entities:

  • station
  • datatype
  • timeseries
  • record
  • edge

Station

The station represents the origin of the data which needs an identifier, a name, a coordinate and a so called stationtype. It also should contain the origin of the data, the current active state (if actively used or not) and if it has a parent station, used to model a hierarchical station structure. For all remaining data, which enriches the station, we created a field metadata. It can hold any kind of meta information in form of a JSON object. To understand the functionality and the main job of this entity check the source code Station.java.

Example:

A station can be of stationtype MeteoStation, has an identifier 89935GW and a position latitude":46.24339407235059,"longitude":11.199431152658656. It can have additional information like address, municipality, opening times etc., which would be modelled as meta data entry.

DataType

The data type represents the typology of the data in form of an unique name and a unit. Description and metric of measurements can also be provided.

Example:

A temperature can have a unit °C and can be an average value of the last 300 seconds (called period).

TimeSeries A timeseries represents the unique set of records, which all have the same data-type, station and period.

Record

A record represents a single measurement containing a value, a timestamp , a timeseries, and a provenance. Provenance indicates which data collector in which version collected the data. It is needed to implement traceability between collectors and inserted data, to identify data for cleansing or bug fixes.

Example:

We measure on Fri Dec 16 2025 10:47:33 a data type temperature (see data type example) of 20.4 for a meteo station called 89935GW (see station example).

Edge

An edge represents the spatial geometry between two stations. We model this internally as a station triple: origin, edge, destination, because currently only stations can be exposed through our API. We add a line-geometry to that triple to describe the entity geographically. Hereby, origin and destination are two stations of any type that represent two points on the map. The edge is also a station of type LinkStation, that has no coordinates. It is the description of the edge.

Example:

A street between two stations, where the measured data could be how many cars passed it.

If you need more information about specific entities or classes, try to use the javadoc or source code inside DAL.

DTO

Data transfer objects (DTOs) are used to define the structure of the data exchange. They are used between data provider and data persister (writer). They consist of fields which are all primitives and easily serializable. The DTO module is a java library contained in all modules of the big data platform, simply because it defines the communication structure in between.

The following chapters describe the most used DTOs.

StationDto

Describes a place where measurements get collected. It is the origin of the data. We define the structure inside StationDto.java.

DataTypeDto

Describes a specific type of data. We define the structure inside DataTypeDto.java

SimpleRecordDto

Describes the measured value. We define the structure inside SimpleRecordDto.java

client (deprecated)

The client is a Java sdk to communicate with the BDP writer.

Warning

Since we are implementing all transformers and elaborations in golang (or python), this client library is considered deprecated.
You can find the Golang SDK here, with the module bdplib being the client library for this API.

Just include the client maven dependency in your project and use the existing JSON client implementation.

The API contains several methods. We describe the most important methods here, for the rest see JSONPusher.java implementation.

Object getDateOfLastRecord(String stationCode,String dataType,Integer period)

This method gets the date of the last valid record

Object syncStations(List<StationDto> data)

This method is used to create, update, and deactivate stations of a specific typology; data must be a list of StationDto's

Object syncDataTypes(List<DataTypeDto> data)

This method is used to create and update(and therefore upsert) data types; data must be a list of DataTypeDto

Object pushData(DataMapDto<? extends RecordDtoImpl> dto)

This is the place, where you place all the data you want to pass to the writer. The data in here gets saved in form of a tree.

Each branch can have multiple child branches, but can also have data itself, which means it can have indefinite depth. Right now, by our internal conventions we store everything on the second level, like this:

+- Station
   |
   +- DataType
      |
      `-Data

As value you can put a list of SimpleRecordDto.java, which contains all the data points with a specific station and a specific type. Each point is represented as timestamp and value. To better understand the structure, see the DataMapDto.java source.

Flight rules

Database Partitioning

To manage large data volumes, the API supports table partitioning of the history tables. Partitioning is done on the timeseries level using timeseries.partition_id as key.

By default only one partition (with ID 1) exists and all new timeseries are put there.

The table Partition represents a partition with some additional descriptions

At time of writing the process is not yet automated. See [create_partition.sql] for an example of how to create partitions and move existing data

Designing the partitioning scheme

The partitions should be designed in a way to reflect query or delete/update patterns. E.g. to group data that is requested together (such as all timeseries of a stationtype+origin), or an elaborated data type that has to be deleted often.

It is possible to further sub-partition partitions, e.g. if you have a large partition of traffic data, you can sub-partition by timestamp date ranges

Creating a new partition

To use additional partitions, first create a record in the table partition and the corresponding partition of the measurement*history table, using the id of the partition record as partition key.

e.g.

CREATE TABLE IF NOT EXISTS measurementhistory_2 PARTITION OF measurementhistory FOR VALUES IN (2)

Define partition rules

Rules on which timeseries goes into which partition are declared in partition_def.

Currently, you can decide to assign a partition by using origin, stationtype, type_id and period as criteria.

The matching of a timeseries to a partition is done by specificity: Whichever partition matches the highest number of these fields (with null being ignored) will be selected.

The code for this can be found in PartitionDef.java

Note that the rules only apply to newly created timeseries records and don't affect existing ones. If you want a certain dataset to be in one partition, you have to migrate existing records manually.

Migrating existing timeseries

You can migrate existing data to this partition by updating the partition_id of both the timeseries and measurement*history records to the target partition id.

Note that this process can be quite slow, due to updating the indexes, WAL, transactionality etc. and you will likely have to batch it in some way.

Since new data is written to whatever partition is referenced on timeseries, it's a good idea to first update timeseries, commit, and then do the records. This way you ensure consistency between the timeseries and history. If the two are not aligned, the outbound API might not be able to find the history records. Be aware that this also applies to the time window between updating timeseries and the history.

At any rate it is good practice to ensure consistency via checks and align possible orphan measurements with their parent timeseries.

I want to use client in my Java Maven project (deprecated)

Include the following snippet in your pom.xml file:

	<repositories>
		<repository>
			<id>maven-repo.opendatahub.com</id>
			<url>https://maven-repo.opendatahub.com/snapshot</url>
		</repository>
	</repositories>

Include the dependency client for data collectors:

<dependency>
  <groupId>com.opendatahub.timeseries.bdp</groupId>
  <artifactId>client</artifactId>
  <version>7.3.0</version>
</dependency>

You can also use a version-range, like [7.3.0,8.0.0). Find the latest version in our release channel on GitHub.

I want to publish a new client sdk on our maven repository (deprecated)

This chapter is for the NOI team only. It describes how to publish a new client manually or via the Github Action workflow on our maven repo. Either as "release" or "snapshot" version...

Automatically via Github Actions

SNAPSHOT RELEASES: If you push code to the main branch, which changes either dto or client the Github Action workflow deploys a new snapshot version of those libraries. The version is then the latest version tag on the prod branch and a -SNAPSHOT postfix. For example, if the version tag is v7.4.0, then the new snapshot version string is 7.4.0-SNAPSHOT (the initial v will be removed).

PRODUCTION RELEASES: Push your code to the prod branch and tag it with a semantic versioning tag prefixed by v. As you might notice in the past we had version tags without that prefix, but the new Github Action workflow requires it, so in future please always put it like this. For example, v7.5.0.

Manually from your machine

Create a file ~/.m2/settings.xml, and copy/paste the following code:

<settings>
    <servers>
        <server>
            <id>maven-repo.opendatahub.com-release</id>
            <username>your-remote-repos-username</username>
            <password>your-remote-repos-password</password>
        </server>
        <server>
            <id>maven-repo.opendatahub.com-snapshot</id>
            <username>your-remote-repos-username</username>
            <password>your-remote-repos-password</password>
        </server>
    </servers>
</settings>

Replace your-remote-repos-username and your-remote-repos-password with your Maven repo credentials. We have a group on our AWS/IAM called s3-odh-maven-repo that gives permissions to push to the maven repo. Assign that role to your user eventually, or search for s3-odh-maven-repo on our password server for credentials.

Update all pom.xml files with the correct version. Here an example to create a snapshot release with version 8.0.1 (do not put a v prefix): ./infrastructure/utils/quickrelease.sh snapshot 8.0.1

Use ./infrastructure/utils/quickrelease.sh release 8.0.1 for a production release.

Call mvn --projects dto --projects client --also-make clean install deploy

I want to get started with a new data-collector

Refer to the data collection monorepo

Information

Support

For support, please contact help@opendatahub.com.

Contributing

If you'd like to contribute, please follow our Getting Started instructions.

Documentation

More documentation can be found at https://docs.opendatahub.com.

License

The code in this project is licensed under the GNU GENERAL PUBLIC LICENSE Version 3 license. See the LICENSE file for more information.

REUSE

This project is REUSE compliant, more information about the usage of REUSE in NOI Techpark repositories can be found here.

Since the CI for this project checks for REUSE compliance you might find it useful to use a pre-commit hook checking for REUSE compliance locally. The pre-commit-config file in the repository root is already configured to check for REUSE compliance with help of the pre-commit tool.

Install the tool by running:

pip install pre-commit

Then install the pre-commit hook via the config file by running:

pre-commit install

About

Open Data Hub / Timeseries writer api

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors