Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Queries/Query1-csv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
query: |
SELECT start,
end,
COUNT(time_utc) AS cnt
FROM sncb_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3658 50.6456)',
FLOAT64(2.0)) = INT32(1)
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: SNCB_STREAM$START, type: UINT64 }
- { name: SNCB_STREAM$END, type: UINT64 }
- { name: SNCB_STREAM$CNT, type: UINT64 }
config:
file_path: "/workspace/Output/output_query1.csv"
input_format: CSV

logical:
- name: SNCB_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: DEVICE_ID, type: UINT64 }
- { name: VBAT, type: FLOAT64 }
- { name: PCFA_MBAR, type: FLOAT64 }
- { name: PCFF_MBAR, type: FLOAT64 }
- { name: PCF1_MBAR, type: FLOAT64 }
- { name: PCF2_MBAR, type: FLOAT64 }
- { name: T1_MBAR, type: FLOAT64 }
- { name: T2_MBAR, type: FLOAT64 }
- { name: CODE1, type: FLOAT64 }
- { name: CODE2, type: FLOAT64 }
- { name: GPS_SPEED, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }
- { name: GPS_LON, type: FLOAT64 }

physical:
- logical: SNCB_STREAM
type: File
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
file_path: "/workspace/Input/input_sncb.csv"
55 changes: 55 additions & 0 deletions Queries/Query1-tcp-file.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
query: |
SELECT start,
end,
COUNT(time_utc) AS cnt
FROM sncb_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3658 50.6456)',
FLOAT64(2.0)) = INT32(1)
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: SNCB_STREAM$START, type: UINT64 }
- { name: SNCB_STREAM$END, type: UINT64 }
- { name: SNCB_STREAM$CNT, type: UINT64 }
config:
file_path: "/workspace/Output/output_query1.csv"
input_format: CSV

logical:
- name: SNCB_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: DEVICE_ID, type: UINT64 }
- { name: VBAT, type: FLOAT64 }
- { name: PCFA_MBAR, type: FLOAT64 }
- { name: PCFF_MBAR, type: FLOAT64 }
- { name: PCF1_MBAR, type: FLOAT64 }
- { name: PCF2_MBAR, type: FLOAT64 }
- { name: T1_MBAR, type: FLOAT64 }
- { name: T2_MBAR, type: FLOAT64 }
- { name: CODE1, type: FLOAT64 }
- { name: CODE2, type: FLOAT64 }
- { name: GPS_SPEED, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }
- { name: GPS_LON, type: FLOAT64 }

physical:
- logical: SNCB_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32323"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
connect_timeout_seconds: "120"
85 changes: 39 additions & 46 deletions Queries/Query1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,49 @@ query: |
time_utc,
'SRID=4326;POINT(4.3658 50.6456)',
FLOAT64(2.0)) = INT32(1)
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO BRAKE_ALERTS_STREAM;

sink:
name: file_sink
type: FILE
config:
filePath: "/workspace/Output/output_query1.csv"
inputFormat: CSV
sinks:
- name: BRAKE_ALERTS_STREAM
type: MQTT
schema:
- { name: SNCB_STREAM$START, type: UINT64 }
- { name: SNCB_STREAM$END, type: UINT64 }
- { name: SNCB_STREAM$CNT, type: UINT64 }
config:
serverURI: "tcp://host.docker.internal:1884"
clientId: "brake_alerts_${START_TIME_EPOCH}"
topic: "train/brake/alerts"
qos: "0"

logical:
- name: sncb_stream
- name: SNCB_STREAM
schema:
- name: time_utc
type: UINT64
- name: device_id
type: UINT64
- name: Vbat
type: FLOAT64
- name: PCFA_mbar
type: FLOAT64
- name: PCFF_mbar
type: FLOAT64
- name: PCF1_mbar
type: FLOAT64
- name: PCF2_mbar
type: FLOAT64
- name: T1_mbar
type: FLOAT64
- name: T2_mbar
type: FLOAT64
- name: Code1
type: FLOAT64
- name: Code2
type: FLOAT64
- name: gps_speed
type: FLOAT64
- name: gps_lat
type: FLOAT64
- name: gps_lon
type: FLOAT64
- { name: TIME_UTC, type: UINT64 }
- { name: DEVICE_ID, type: UINT64 }
- { name: VBAT, type: FLOAT64 }
- { name: PCFA_MBAR, type: FLOAT64 }
- { name: PCFF_MBAR, type: FLOAT64 }
- { name: PCF1_MBAR, type: FLOAT64 }
- { name: PCF2_MBAR, type: FLOAT64 }
- { name: T1_MBAR, type: FLOAT64 }
- { name: T2_MBAR, type: FLOAT64 }
- { name: CODE1, type: FLOAT64 }
- { name: CODE2, type: FLOAT64 }
- { name: GPS_SPEED, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }
- { name: GPS_LON, type: FLOAT64 }

physical:
- logical: sncb_stream
sourceConfig:
type: TCP
socketHost: "host.docker.internal"
socketPort: 32323
socketType: "SOCK_STREAM"
socketDomain: "AF_INET"
parserConfig:
- logical: SNCB_STREAM
type: TCP
parser_config:
type: CSV
delimiter: ","
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32323"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
63 changes: 63 additions & 0 deletions Queries/Query2-csv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
query: |
SELECT start,
end,
varBP,
varFF,
cnt
FROM (
SELECT start,
end,
VAR(PCFA_mbar) AS varBP,
VAR(PCFF_mbar) AS varFF,
COUNT(time_utc) AS cnt
FROM sncb_stream
WHERE temporal_eintersects_geometry(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POLYGON((4.0 50.0, 4.0 50.8, 4.6 50.8, 4.6 50.0, 4.0 50.0))') = INT32(0)
WINDOW SLIDING(time_utc, SIZE 10 SEC, ADVANCE BY 10 MS)
)
WHERE (varBP > FLOAT64(0.1))
AND (varFF <= FLOAT64(10))
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: SNCB_STREAM$START, type: UINT64 }
- { name: SNCB_STREAM$END, type: UINT64 }
- { name: SNCB_STREAM$VARBP, type: FLOAT64 }
- { name: SNCB_STREAM$VARFF, type: FLOAT64 }
- { name: SNCB_STREAM$CNT, type: UINT64 }
config:
file_path: "/workspace/Output/output_query2.csv"
input_format: CSV

logical:
- name: SNCB_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: DEVICE_ID, type: UINT64 }
- { name: VBAT, type: FLOAT64 }
- { name: PCFA_MBAR, type: FLOAT64 }
- { name: PCFF_MBAR, type: FLOAT64 }
- { name: PCF1_MBAR, type: FLOAT64 }
- { name: PCF2_MBAR, type: FLOAT64 }
- { name: T1_MBAR, type: FLOAT64 }
- { name: T2_MBAR, type: FLOAT64 }
- { name: CODE1, type: FLOAT64 }
- { name: CODE2, type: FLOAT64 }
- { name: GPS_SPEED, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }
- { name: GPS_LON, type: FLOAT64 }

physical:
- logical: SNCB_STREAM
type: File
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
file_path: "/workspace/Input/input_sncb.csv"
84 changes: 39 additions & 45 deletions Queries/Query2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,53 @@ query: |
gps_lat,
time_utc,
'SRID=4326;POLYGON((4.0 50.0, 4.0 50.8, 4.6 50.8, 4.6 50.0, 4.0 50.0))') = INT32(0)
WINDOW SLIDING(time_utc, SIZE 10 SEC, ADVANCE BY 200 MS)
WINDOW SLIDING(time_utc, SIZE 10 SEC, ADVANCE BY 10 MS)
)
WHERE (varBP > FLOAT64(0.1))
AND (varFF <= FLOAT64(10))
INTO file_sink;

sink:
name: file_sink
type: FILE
config:
filePath: "/workspace/Output/output_query2.csv"
inputFormat: CSV
sinks:
- name: FILE_SINK
type: File
schema:
- { name: SNCB_STREAM$START, type: UINT64 }
- { name: SNCB_STREAM$END, type: UINT64 }
- { name: SNCB_STREAM$VARBP, type: FLOAT64 }
- { name: SNCB_STREAM$VARFF, type: FLOAT64 }
- { name: SNCB_STREAM$CNT, type: UINT64 }
config:
file_path: "/workspace/Output/output_query2.csv"
input_format: CSV

logical:
- name: sncb_stream
- name: SNCB_STREAM
schema:
- name: time_utc
type: UINT64
- name: device_id
type: UINT64
- name: Vbat
type: FLOAT64
- name: PCFA_mbar
type: FLOAT64
- name: PCFF_mbar
type: FLOAT64
- name: PCF1_mbar
type: FLOAT64
- name: PCF2_mbar
type: FLOAT64
- name: T1_mbar
type: FLOAT64
- name: T2_mbar
type: FLOAT64
- name: Code1
type: FLOAT64
- name: Code2
type: FLOAT64
- name: gps_speed
type: FLOAT64
- name: gps_lat
type: FLOAT64
- name: gps_lon
type: FLOAT64
- { name: TIME_UTC, type: UINT64 }
- { name: DEVICE_ID, type: UINT64 }
- { name: VBAT, type: FLOAT64 }
- { name: PCFA_MBAR, type: FLOAT64 }
- { name: PCFF_MBAR, type: FLOAT64 }
- { name: PCF1_MBAR, type: FLOAT64 }
- { name: PCF2_MBAR, type: FLOAT64 }
- { name: T1_MBAR, type: FLOAT64 }
- { name: T2_MBAR, type: FLOAT64 }
- { name: CODE1, type: FLOAT64 }
- { name: CODE2, type: FLOAT64 }
- { name: GPS_SPEED, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }
- { name: GPS_LON, type: FLOAT64 }

physical:
- logical: sncb_stream
sourceConfig:
type: TCP
socketHost: "host.docker.internal"
socketPort: 32323
socketType: "SOCK_STREAM"
socketDomain: "AF_INET"
parserConfig:
- logical: SNCB_STREAM
type: TCP
parser_config:
type: CSV
delimiter: ","
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32323"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
connect_timeout_seconds: "120"
Loading
Loading