Skip to content

Commit 5a37841

Browse files
authored
Merge pull request #151 from Patowhiz/pwa
Implements importing of large files and adds gaps analysis on data availability
2 parents 13d4414 + 4a637aa commit 5a37841

File tree

48 files changed

+741
-579
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+741
-579
lines changed

back-end/api/package-lock.json

Lines changed: 24 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

back-end/api/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "api",
3-
"version": "preview-2.0.1",
3+
"version": "preview-2.0.3",
44
"description": "Climsoft API",
55
"author": "Climsoft Foundation",
66
"private": true,
@@ -32,7 +32,7 @@
3232
"connect-pg-simple": "^10.0.0",
3333
"duckdb-async": "^1.1.0",
3434
"express-session": "^1.18.1",
35-
"mariadb": "^3.4.0",
35+
"mariadb": "^3.4.5",
3636
"pg": "^8.15.6",
3737
"reflect-metadata": "^0.1.13",
3838
"rxjs": "^7.8.1",

back-end/api/src/migrations/migrations.service.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { SqlScriptsLoaderService } from 'src/sql-scripts/sql-scripts-loader.serv
1414

1515
@Injectable()
1616
export class MigrationsService {
17-
private readonly SUPPORTED_DB_VERSION: string = "0.0.2"; // TODO. Should come from a versioning file.
17+
private readonly SUPPORTED_DB_VERSION: string = "0.0.3"; // TODO. Should come from a versioning file.
1818
private readonly logger = new Logger(MigrationsService.name);
1919

2020
constructor(
@@ -84,6 +84,7 @@ export class MigrationsService {
8484
await this.sqlScriptsService.addEntryDatetimeTriggerToDB();
8585
await this.sqlScriptsService.addLogsTriggersToDB();
8686
await this.sqlScriptsService.addQCTestsFunctionsToDB();
87+
await this.sqlScriptsService.addDataAvailabilityFunctionsToDB();
8788
}
8889

8990
private async seedFirstUser() {

back-end/api/src/observation/services/climsoft-v4-web-sync-set-up.service.ts

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,16 @@ export interface V4ElementModel {
3434
export interface V4StationModel {
3535
stationId: string;
3636
stationName: string;
37-
wmoid: string;
38-
icaoid: string;
39-
wsi: string;
37+
wmoid: string | null;
38+
icaoid: string | null;
39+
wsi: string | null;
4040
longitude: number;
4141
latitude: number;
4242
elevation: string;
43-
qualifier: string;
43+
qualifier: string | null;
4444
stationOperational: boolean;
45-
openingDatetime: string;
46-
closingDatetime: string;
47-
authority: string;
45+
openingDatetime: string | null;
46+
closingDatetime: string | null;
4847
}
4948

5049
@Injectable()
@@ -120,7 +119,8 @@ export class ClimsoftV4WebSyncSetUpService {
120119
password: AppConfig.v4DbCredentials.password,
121120
database: AppConfig.v4DbCredentials.databaseName,
122121
port: AppConfig.v4DbCredentials.port,
123-
dateStrings: true,
122+
dateStrings: true,
123+
charset: 'utf8mb4',
124124
});
125125

126126
// Clear any previous conflicts
@@ -298,7 +298,6 @@ export class ClimsoftV4WebSyncSetUpService {
298298
try {
299299
conn = await this.v4DBPool.getConnection();
300300
const rows: V4StationModel[] = await conn.query("SELECT stationId as stationId, stationName as stationName, wmoid as wmoid, icaoid as icaoid, wsi as wsi, longitude as longitude, latitude as latitude, elevation as elevation, qualifier as qualifier, stationOperational as stationOperational, openingDatetime as openingDatetime, closingDatetime as closingDatetime, authority as authority FROM station");
301-
//console.log('station rows: ', rows[0]);
302301
return rows;
303302
} catch (error) {
304303
console.error('Setting up V4 stations failed: ', error);
@@ -330,6 +329,23 @@ export class ClimsoftV4WebSyncSetUpService {
330329
v4Station.stationName = `${v4Station.stationName}_${(i + 1)}`;
331330
}
332331

332+
// Some climsoft version 4 installations have the below columns storing null bytes instead of nulls
333+
// So ignore such null bytes
334+
//----------------------------------------------
335+
if(v4Station.wmoid !== null && v4Station.wmoid.startsWith('\x00')){
336+
v4Station.wmoid = null;
337+
}
338+
339+
if(v4Station.wsi !== null && v4Station.wsi.startsWith('\x00')){
340+
v4Station.wsi = null;
341+
}
342+
343+
if(v4Station.icaoid !== null && v4Station.icaoid.startsWith('\x00')){
344+
v4Station.icaoid = null;
345+
}
346+
347+
//----------------------------------------------
348+
333349
// Make sure the wmo id is unique. V5 doesn't accept duplicates like v4 model
334350
if (v4Station.wmoid !== null && v5Dtos.find(item => item.wmoId === v4Station.wmoid)) {
335351
v4Station.wmoid = `${v4Station.wmoid}_${(i + 1)}`;
@@ -370,7 +386,18 @@ export class ClimsoftV4WebSyncSetUpService {
370386
v5Dtos.push(dto);
371387
}
372388

373-
await this.stationsService.bulkPut(v5Dtos, userId);
389+
// Save on batches of a thousand just incase there are many stations
390+
const batchSize: number = 1000;
391+
for (let i = 0; i < v5Dtos.length; i += batchSize) {
392+
const batch = v5Dtos.slice(i, i + batchSize);
393+
try {
394+
await this.stationsService.bulkPut(batch, userId);
395+
} catch (error) {
396+
//console.error(error);
397+
console.log('Error saving stations: ', batch);
398+
return false;
399+
}
400+
}
374401

375402
// Important to do this just incase observations were not being saved to v4 database due to lack of stations or changes in v4 configuration
376403
this.setupV4StationsChecking();

back-end/api/src/observation/services/observation-import.service.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BadRequestException, Injectable } from '@nestjs/common';
1+
import { BadRequestException, Injectable, Logger } from '@nestjs/common';
22
import { CreateObservationDto } from '../dtos/create-observation.dto';
33
import { ObservationsService } from './observations.service';
44
import { SourceTemplatesService } from 'src/metadata/source-templates/services/source-templates.service';
@@ -17,6 +17,7 @@ import { SourceTypeEnum } from 'src/metadata/source-templates/enums/source-type.
1717

1818
@Injectable()
1919
export class ObservationImportService {
20+
private readonly logger = new Logger(ObservationImportService.name);
2021
// Enforce these fields to always match CreateObservationDto properties naming. Important to ensure objects returned by duckdb matches the dto structure.
2122
private readonly STATION_ID_PROPERTY_NAME: keyof CreateObservationDto = "stationId";
2223
private readonly ELEMENT_ID_PROPERTY_NAME: keyof CreateObservationDto = "elementId";
@@ -59,7 +60,7 @@ export class ObservationImportService {
5960
}
6061

6162
} catch (error) {
62-
console.error("File Import Failed: " , error)
63+
console.error("File Import Failed: ", error)
6364
throw new BadRequestException("Error: File Import Failed: " + error);
6465
} finally {
6566
this.fileIOService.deleteFile(tmpFilePathName);
@@ -100,29 +101,31 @@ export class ObservationImportService {
100101

101102
//console.log("alterSQLs: ", alterSQLs);
102103

103-
let startTime = new Date().getTime();
104104
// Execute the duckdb DDL SQL commands
105+
let startTime = new Date().getTime();
105106
await this.fileIOService.duckDb.exec(alterSQLs);
106-
console.log("DuckDB alters took: ", new Date().getTime() - startTime);
107+
this.logger.log(`DuckDB alters took ${new Date().getTime() - startTime} milliseconds`);
107108

108109
if (sourceDef.scaleValues) {
109110
startTime = new Date().getTime();
110111
// Scale values if indicated, execute the scale values SQL
111112
await this.fileIOService.duckDb.exec(await this.getScaleValueSQL(tmpObsTableName));
112-
console.log("DuckDB scaling took: ", new Date().getTime() - startTime);
113+
this.logger.log(`DuckDB scaling took ${new Date().getTime() - startTime} milliseconds`);
113114
}
114115

115-
startTime = new Date().getTime();
116116
// Get the rows of the columns that match the dto properties
117+
startTime = new Date().getTime();
117118
const rows = await this.fileIOService.duckDb.all(`SELECT ${this.STATION_ID_PROPERTY_NAME}, ${this.ELEMENT_ID_PROPERTY_NAME}, ${this.SOURCE_ID_PROPERTY_NAME}, ${this.level}, ${this.DATE_TIME_PROPERTY_NAME}, ${this.INTERVAL_PROPERTY_NAME}, ${this.VALUE_PROPERTY_NAME}, ${this.FLAG_PROPERTY_NAME}, ${this.COMMENT_PROPERTY_NAME} FROM ${tmpObsTableName};`);
118-
119-
console.log("DuckDB fetch rows took: ", new Date().getTime() - startTime);
119+
this.logger.log(`DuckDB fetch rows took ${new Date().getTime() - startTime} milliseconds`);
120120

121121
// Delete the table
122+
startTime = new Date().getTime();
122123
await this.fileIOService.duckDb.run(`DROP TABLE ${tmpObsTableName};`);
124+
this.logger.log(`DuckDB drop table took ${new Date().getTime() - startTime} milliseconds`);
123125

124126
// Save the rows into the database
125-
await this.observationsService.bulkPut(rows as CreateObservationDto[], userId);
127+
// TODO. Note, no need await. All current active ingestion processes will be tagged and show on the ingestion monitoring page
128+
this.observationsService.bulkPut(rows as CreateObservationDto[], userId);
126129
}
127130

128131
private getAlterStationColumnSQL(source: CreateImportTabularSourceDTO, tableName: string, stationId?: string): string {

back-end/api/src/observation/services/observations.service.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ export class ObservationsService {
254254
public async bulkPut(createObservationDtos: CreateObservationDto[], userId: number, qcStatus = QCStatusEnum.NONE, ignoreV4Saving: boolean = false): Promise<void> {
255255
let startTime = new Date().getTime();
256256

257+
// Transform dtos to entities
257258
const obsEntities: ObservationEntity[] = [];
258259
for (const dto of createObservationDtos) {
259260
const entity: ObservationEntity = this.observationRepo.create({
@@ -274,18 +275,18 @@ export class ObservationsService {
274275

275276
obsEntities.push(entity);
276277
}
277-
278-
279278
this.logger.log(`DTO transformation took: ${(new Date().getTime() - startTime)} milliseconds`);
280279

280+
// Save in batches of 1000 to minimise excess payload errors when saving to postgres
281+
this.logger.log(`Saving ${obsEntities.length} entities from user - ${userId}`);
281282
startTime = new Date().getTime();
282-
283283
const batchSize = 1000; // batch size of 1000 seems to be safer (incase there are comments) and faster.
284284
for (let i = 0; i < obsEntities.length; i += batchSize) {
285285
const batch = obsEntities.slice(i, i + batchSize);
286286
await this.insertOrUpdateObsValues(this.observationRepo, batch);
287+
this.logger.log(`${batch.length} entities from user - ${userId} successfully saved!`);
287288
}
288-
this.logger.log(`Saving entities took: ${(new Date().getTime() - startTime)} milliseconds`);
289+
this.logger.log(`Saving entities from user - ${userId}, took: ${(new Date().getTime() - startTime)} milliseconds`);
289290

290291
if (!ignoreV4Saving) {
291292
// Initiate saving to version 4 database as well
@@ -543,7 +544,7 @@ export class ObservationsService {
543544
filter.toDate ?? null // p_to_date timestamptz
544545
];
545546

546-
const sql = `SELECT * FROM func_data_availaibility_details($1, $2, $3, $4, $5, $6)`;
547+
const sql = `SELECT * FROM func_data_availaibility_details($1, $2, $3, $4, $5, $6)`;
547548

548549
const rows = await this.dataSource.query(sql, params);
549550

back-end/api/src/shared/services/file-io.service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Injectable, StreamableFile } from '@nestjs/common';
2+
import os from "os";
23
import * as fs from 'node:fs';
34
import * as path from 'node:path';
45
import { Database } from "duckdb-async";

back-end/api/src/sql-scripts/data-availability/data-availiability-details.sql renamed to back-end/api/src/sql-scripts/data-availability/data-availiability-details-function.sql

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ BEGIN
6161
o.station_id,
6262
o.element_id,
6363
o.level,
64-
o."interval",
65-
MIN(o.date_time) AS from_date,
66-
MAX(o.date_time) AS to_date
64+
o."interval"
6765
FROM observations o
6866
WHERE %s
6967
GROUP BY o.station_id, o.element_id, o.level, o."interval"
@@ -74,7 +72,7 @@ BEGIN
7472
og.element_id,
7573
og.level, og.
7674
"interval",
77-
generate_series(og.from_date, og.to_date, (og."interval" || ' minutes')::interval) AS date_time
75+
generate_series($5, $6, (og."interval" || ' minutes')::interval) AS date_time
7876
FROM observation_groups og
7977
),
8078
infilled_data AS (

back-end/api/src/sql-scripts/sql-scripts-loader.service.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class SqlScriptsLoaderService {
3030
await this.dataSource.query(sql);
3131
this.logger.log('default entry date time triggers added');
3232
} catch (error) {
33-
this.logger.error(`Developer error in adding entry date time triggers: ${error}`);
33+
this.logger.error(`Developer error in adding entry date time triggers`);
3434
throw new Error(error);
3535
}
3636
}
@@ -75,11 +75,29 @@ export class SqlScriptsLoaderService {
7575
await this.dataSource.query(sql);
7676
this.logger.log('qc tests functions added');
7777
} catch (error) {
78-
this.logger.error(`Developer error in adding qc tests functions: ${error}`);
78+
this.logger.error(`Developer error in adding qc tests functions`);
7979
throw new Error(error);
8080
}
8181
}
8282

83+
/**
84+
* Used by the migrations service
85+
*/
86+
public async addDataAvailabilityFunctionsToDB() {
87+
try {
88+
// Get the script directory from absolute path of this service file
89+
// For windows platform, replace the backslashes with forward slashes.
90+
const scriptsDirPath: string = this.getScriptsDirectoryPath().replaceAll("\\", "\/");
91+
const entryDatetimeScriptsDirPath: string = `${scriptsDirPath}/data-availability/data-availiability-details-function.sql`
92+
const sql: string = await this.fileIOService.readFile(entryDatetimeScriptsDirPath, 'utf8');
93+
//console.log('ENTRY DATE TIME SQL:', sql);
94+
await this.dataSource.query(sql);
95+
this.logger.log('data availability functions added');
96+
} catch (error) {
97+
this.logger.error(`Developer error in adding data availability functions`);
98+
throw new Error(error);
99+
}
100+
}
83101

84102

85103

0 commit comments

Comments
 (0)