diff --git a/data-generator/src/main/java/com/datasqrl/cmd/RootGenerateCommand.java b/data-generator/src/main/java/com/datasqrl/cmd/RootGenerateCommand.java index 5a447e35..4f49d4fe 100644 --- a/data-generator/src/main/java/com/datasqrl/cmd/RootGenerateCommand.java +++ b/data-generator/src/main/java/com/datasqrl/cmd/RootGenerateCommand.java @@ -3,6 +3,7 @@ import com.datasqrl.data.GenerateClickstream; import com.datasqrl.data.GenerateCreditCards; import com.datasqrl.data.GenerateLoans; +import com.datasqrl.data.GenerateMeasurement; import com.datasqrl.data.GeneratePatients; import com.datasqrl.data.GenerateSensors; import com.datasqrl.data.GenerateWarrants; @@ -14,7 +15,7 @@ @CommandLine.Command(name = "datasqrl", mixinStandardHelpOptions = true, version = "0.1", subcommands = {GenerateClickstream.class, GenerateSensors.class, GenerateLoans.class, GenerateCreditCards.class, - GeneratePatients.class, GenerateWarrants.class}) + GeneratePatients.class, GenerateWarrants.class, GenerateMeasurement.class}) @Getter public class RootGenerateCommand implements Runnable { diff --git a/data-generator/src/main/java/com/datasqrl/data/GenerateCreditCards.java b/data-generator/src/main/java/com/datasqrl/data/GenerateCreditCards.java index a391ad43..fab37d95 100644 --- a/data-generator/src/main/java/com/datasqrl/data/GenerateCreditCards.java +++ b/data-generator/src/main/java/com/datasqrl/data/GenerateCreditCards.java @@ -231,7 +231,7 @@ public static class Reward { public static class Config implements Configuration { - public int numCustomers = 10; + public int numCustomers = 1000; public int numMerchants = 100; diff --git a/data-generator/src/main/java/com/datasqrl/data/GenerateMeasurement.java b/data-generator/src/main/java/com/datasqrl/data/GenerateMeasurement.java new file mode 100644 index 00000000..d5c3d85c --- /dev/null +++ b/data-generator/src/main/java/com/datasqrl/data/GenerateMeasurement.java @@ -0,0 +1,176 @@ +package com.datasqrl.data; + +import com.datasqrl.cmd.AbstractGenerateCommand; +import com.datasqrl.util.Configuration; +import com.datasqrl.util.RandomSampler; +import com.datasqrl.util.SerializerUtil; +import com.datasqrl.util.WriterUtil; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.Flow; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import lombok.EqualsAndHashCode; +import lombok.EqualsAndHashCode.Include; +import lombok.Value; +import org.apache.commons.lang3.StringUtils; +import picocli.CommandLine; + +@CommandLine.Command(name = "measurement", description = "Generates Measurements") +public class GenerateMeasurement extends AbstractGenerateCommand { + + + @Override + public void run() { + initialize(); + Config config = getConfiguration(new Config()); + long numMeasurements = root.getNumber(); + Instant end = Instant.now().truncatedTo(ChronoUnit.SECONDS); + DataFunction normalTemperature = new DataFunction(190, 190, 0, 1); + DataFunction normalPressure = new DataFunction(3000, 3000, 0, 4); + DataFunction normalFlow = new DataFunction(250, 250, 0, 2); + Map> assets = Map.of( + 12221, List.of(normalPressure.with(3400, -3600), normalTemperature, normalFlow.with(180, -3600)), + 21112, List.of(normalPressure.with(1800, -4000), normalTemperature, normalFlow.with(150, -1400)), + 34443, List.of(normalPressure.with(3800, -100), normalTemperature, normalFlow.with(120, -40)), + 45555, List.of(normalPressure.with(2000, -190), normalTemperature, normalFlow.with(100, -70)), + 59995, List.of(normalPressure, normalTemperature, normalFlow) + ); + System.out.println(assets.keySet()); + + List measurements = new ArrayList<>(); + List flowrates = new ArrayList<>(); + for (int i = 1; i <=numMeasurements ; i++) { + long past = i-numMeasurements; + Instant timestamp = end.plus(past, ChronoUnit.SECONDS); + assets.forEach((id, functions) -> { + measurements.add(new Measurement(//UUID.randomUUID().toString(), + id, + functions.get(0).getValue(past), + functions.get(1).getValue(past), + timestamp.toString())); + flowrates.add(new FlowRate(id, + functions.get(2).getValue(past), + timestamp.toString())); + }); + + + } + + WriterUtil.writeToFileSorted(measurements, getOutputDir().resolve("measurement.jsonl"), + Comparator.comparing(Measurement::getTimestamp), + null, null); + WriterUtil.writeToFileSorted(flowrates, getOutputDir().resolve("flowrate.jsonl"), + Comparator.comparing(FlowRate::getTimestamp), + null, null); + } + + @Value + public class DataFunction { + double startValue; + double endValue; + long startTime; + double noiseStdDev; + + public double getValue(long past) { + double value = startValue; + if (past>startTime) { + long delta = past - startTime; + value = value + (endValue - startValue) * (delta/Math.abs(startTime*1.0)); + } + return roundToOneDecimalPlace(value + sampler.nextNormal(0, noiseStdDev)); + } + + public DataFunction with(double endValue, long startTime) { + return new DataFunction(startValue, endValue, startTime, noiseStdDev); + } + + public static double roundToOneDecimalPlace(double value) { + return Math.round(value * 10.0) / 10.0; + } + + + } + + + @Value + public static class Measurement { + + int assetId; + double pressure_psi; + double temperature_f; + String timestamp; + + @Override + public String toString() { + return SerializerUtil.toJson(this); + } + + } + + @Value + public static class FlowRate { + + int assetId; + double flowrate; + String timestamp; + + @Override + public String toString() { + return SerializerUtil.toJson(this); + } + + } + + + public static class Config implements Configuration { + + public int numSensors = 10; + + public int avgSensorsPerMachine = 2; + + public int avgSensorReassignments = 3; + + public int avgSensorReassignmentsDeviation = 1; + + public int avgGroupPerDay = 5; + + public double avgGroupPerDayDeviation = 4.0; + + public int avgMachinesPerGroup = 10; + + public double avgMachinesPerGroupDeviation = 20.0; + + public double temperatureBaselineMin = 97; + + public double temperatureBaselineMax = 99; + + public double maxNoise = 0.05; + + public double maxTemperaturePeakStdDev = 2.0; + + public int minMaxRampWidth = 600; + + public int maxMaxRampWidth = 10000; + + public double errorProbability = 0.00; + + + + @Override + public void scale(long scaleFactor, long number) { + numSensors = (int)Math.min(10000000,Math.max(20, number/(86400*60))); + } + } + + +} diff --git a/data-generator/src/test/java/com/datasqrl/data/TestDataGenerators.java b/data-generator/src/test/java/com/datasqrl/data/TestDataGenerators.java index e28af10d..d4da9c17 100644 --- a/data-generator/src/test/java/com/datasqrl/data/TestDataGenerators.java +++ b/data-generator/src/test/java/com/datasqrl/data/TestDataGenerators.java @@ -29,7 +29,7 @@ public void testLoan() { @Test public void testCreditCard() { - new RootGenerateCommand().getCmd().execute(new String[]{"creditcard","-n","100","-o","data/creditcard"}); + new RootGenerateCommand().getCmd().execute(new String[]{"creditcard","-n","365","-o","data/creditcard"}); } @Test @@ -37,5 +37,11 @@ public void TestWarrants() { new RootGenerateCommand().getCmd().execute(new String[]{"warrants","-n","100","-o","data/warrants"}); } + @Test + public void testMeasurements() { + new RootGenerateCommand().getCmd().execute(new String[]{"measurement","-n","8000","-o","data/well"}); + } + + } diff --git a/oil-gas-agent-automation/README.md b/oil-gas-agent-automation/README.md new file mode 100644 index 00000000..8155859c --- /dev/null +++ b/oil-gas-agent-automation/README.md @@ -0,0 +1,41 @@ +# Automated Monitoring - Oil & Gas Use Case + +Run the data backend with the following command: + +`docker run -it -p 8888:8888 -p 8081:8081 -p 9092:9092 --rm -v $PWD:/build datasqrl/cmd:latest run -c package-local.json ` + +To publish flowrate metrics, open [GraphiQL in the browser](http://localhost:8888/graphiql/) and run the following mutation: + +```graphql +mutation { + AddFlowRate(event: {assetId: 12221, flowrate: 220.5 }) { + assetId + flowrate + } +} +``` + +This sample dataset has 5 pre-defined wells (i.e. assets) with the following ids: +`12221, 21112, 34443, 45555, 59995` + +Open GraphiQL again in another window and listen to this subscription: +```graphql +subscription { + LowFlowRate { + assetId + flowrate + } +} +``` + +Add another flowrate metric with a value below `200` and observe it through the subscription. +To retrieve recent pressure and temperature readings for a well, run this query: +```graphql +{ + RecentPressure(assetId: 12221) { + pressure_psi + temperature_f + timestamp_normalized + } +} +``` \ No newline at end of file diff --git a/oil-gas-agent-automation/monitoring-agent.sqrl b/oil-gas-agent-automation/monitoring-agent.sqrl new file mode 100644 index 00000000..4e9db8d3 --- /dev/null +++ b/oil-gas-agent-automation/monitoring-agent.sqrl @@ -0,0 +1,25 @@ +IMPORT test-data.*; +IMPORT time.endOfMinute; + +Assets := DISTINCT Assets ON asset_id ORDER BY lastUpdated DESC; +Maintenance := DISTINCT Maintenance ON work_order_id ORDER BY lastUpdated DESC; +Assets.maintenance := JOIN Maintenance m ON @.asset_id = m.asset_id ORDER BY m.lastUpdated DESC; + +FlowRateByRange(@minRate: DOUBLE, @maxRate: DOUBLE) := SELECT * FROM Flowrate + WHERE flowrate >= @minRate AND flowrate <= @maxRate ORDER BY event_time DESC LIMIT 100; + +PressureByMinute := SELECT assetId, endOfMinute(timestamp) AS timestamp_min, + AVG(pressure_psi) AS avg_pressure_psi, + MAX(pressure_psi) AS max_pressure_psi, + AVG(temperature_f) AS avg_temperature_f + FROM Measurement + GROUP BY assetId, timestamp_min ORDER BY timestamp_min DESC; + +FlowrateByMinute := SELECT assetId, endOfMinute(event_time) AS timestamp_min, + AVG(flowrate) AS avg_flowrate, + MAX(flowrate) AS max_flowrate + FROM Flowrate + GROUP BY assetId, timestamp_min ORDER BY timestamp_min DESC; + +Assets.flowrate := JOIN FlowrateByMinute f ON @.asset_id = f.assetId ORDER BY timestamp_min DESC; +Assets.pressure := JOIN PressureByMinute p ON @.asset_id = p.assetId ORDER BY timestamp_min DESC; \ No newline at end of file diff --git a/oil-gas-agent-automation/operations-agent.graphqls b/oil-gas-agent-automation/operations-agent.graphqls new file mode 100644 index 00000000..d9613fa1 --- /dev/null +++ b/oil-gas-agent-automation/operations-agent.graphqls @@ -0,0 +1,81 @@ +"An RFC-3339 compliant DateTime Scalar" +scalar DateTime + +type Query { + """ + Returns information about the well/asset including it's maintenance records, manual, + description, and other important information. + """ + Assets(asset_id: Int!, limit: Int = 20, offset: Int = 0): [Assets!] + """ + Returns the flowrate readings for a given well/asset within the specified range order by timestamp decreasing. + """ + FlowRateByRange(assetId: Int!, minRate: Float = 0, maxRate: Float = 10000): [EnrichedFlowRate!] + """ + Returns recent pressure and temperature readings for a given well/asset by id. + """ + RecentPressure(assetId: Int!, limit: Int = 25, offset: Int = 0): [RecentPressure!] +} + +type Subscription { + LowFlowRate: EnrichedFlowRate +} + +type RecentPressure { + assetId: Int! + timestamp_normalized: DateTime! + pressure_psi: Float! + temperature_f: Float! +} + +type Assets { + asset_id: Int! + asset_number: String! + asset_name: String! + asset_category_id: Int! + description: String! + date_placed_in_service: String! + asset_cost: Float! + status: String! + asset_manual: String! + lastUpdated: DateTime! + maintenance: [Maintenance!] +} + +type Maintenance { + work_order_id: Int! + work_order_num: String! + asset_id: Int! + description: String! + wo_type: String! + priority: String! + status: String! + request_date: String! + start_date: String + completion_date: String + lastUpdated: DateTime! +} + +type EnrichedFlowRate { + assetId: Int! + flowrate: Float! + event_time: DateTime! + asset_number: String! + asset_name: String! + description: String! +} + +input FlowRateInput { + assetId: Int! + flowrate: Float! +} + +type FlowRateResult { + assetId: Int! + flowrate: Float! + event_time: DateTime! +} + +type Mutation { + AddFlowRate(event: FlowRateInput!): FlowRateResult! +} \ No newline at end of file diff --git a/oil-gas-agent-automation/operations-agent.sqrl b/oil-gas-agent-automation/operations-agent.sqrl new file mode 100644 index 00000000..03996703 --- /dev/null +++ b/oil-gas-agent-automation/operations-agent.sqrl @@ -0,0 +1,29 @@ +IMPORT test-data.Assets; +IMPORT test-data.Maintenance AS MaintenanceUpdates; +IMPORT test-data.Measurement; +IMPORT operations-agent.AddFlowRate AS Flowrate; +IMPORT time.endOfSecond; + +Assets := DISTINCT Assets ON asset_id ORDER BY lastUpdated DESC; +Maintenance := DISTINCT MaintenanceUpdates ON work_order_id ORDER BY lastUpdated DESC; +Assets.maintenance := JOIN Maintenance m ON @.asset_id = m.asset_id ORDER BY m.lastUpdated DESC; + +EXPORT Flowrate TO print.Flowrate; + +EnrichedFlowRate := SELECT f.*, a.asset_number, a.asset_name, a.description + FROM Flowrate f + TEMPORAL JOIN Assets a ON f.assetId = a.asset_id; + +EXPORT EnrichedFlowRate TO print.EnrichedFlowRate; + +LowFlowRate := SELECT * FROM EnrichedFlowRate WHERE flowrate < 200; + +FlowRateByRange(@assetId: BIGINT, @minRate: DOUBLE, @maxRate: DOUBLE) := SELECT * FROM EnrichedFlowRate + WHERE assetId = @assetId AND flowrate >= @minRate AND flowrate <= @maxRate ORDER BY event_time DESC LIMIT 100; + +RecentPressure := SELECT assetId, endOfSecond(timestamp, 10) AS timestamp_normalized, + AVG(pressure_psi) AS pressure_psi, + AVG(temperature_f) AS temperature_f + FROM Measurement + GROUP BY assetId, timestamp_normalized ORDER BY timestamp_normalized DESC; + diff --git a/oil-gas-agent-automation/package-local.json b/oil-gas-agent-automation/package-local.json new file mode 100644 index 00000000..75ea1ac4 --- /dev/null +++ b/oil-gas-agent-automation/package-local.json @@ -0,0 +1,14 @@ +{ + "version": "1", + "enabled-engines": ["vertx", "postgres", "kafka", "flink"], + "profiles": ["datasqrl.profile.default"], + "script": { + "main": "operations-agent.sqrl", + "graphql": "operations-agent.graphqls" + }, + "values" : { + "flink-config": { + "table.exec.source.idle-timeout": "1000 ms" + } + } +} \ No newline at end of file diff --git a/oil-gas-agent-automation/test-data/assets.jsonl b/oil-gas-agent-automation/test-data/assets.jsonl new file mode 100644 index 00000000..fb6b2f6e --- /dev/null +++ b/oil-gas-agent-automation/test-data/assets.jsonl @@ -0,0 +1,6 @@ +{"asset_id":21112,"asset_number":"W-21112","asset_name":"Well Alpha 21112","asset_category_id":200,"description":"Production well in Field Alpha","date_placed_in_service":"2022-07-15","asset_cost":5200000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellAlpha21112.pdf", "lastUpdated": "2025-01-11T04:00:00Z"} +{"asset_id":12221,"asset_number":"W-12221","asset_name":"Well Beta 12221","asset_category_id":200,"description":"Exploratory well in Field Beta","date_placed_in_service":"2021-05-20","asset_cost":4500000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellBeta12221.pdf", "lastUpdated": "2025-01-11T04:00:00Z"} +{"asset_id":59995,"asset_number":"W-59995","asset_name":"Well Gamma 59995","asset_category_id":200,"description":"Production well in Field Gamma","date_placed_in_service":"2020-11-10","asset_cost":6000000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellGamma59995.pdf", "lastUpdated": "2025-01-11T04:00:00Z"} +{"asset_id":45555,"asset_number":"W-45555","asset_name":"Well Delta 45555","asset_category_id":200,"description":"Injection well in Field Delta","date_placed_in_service":"2023-01-05","asset_cost":5500000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellDelta45555.pdf", "lastUpdated": "2025-01-11T04:00:00Z"} +{"asset_id":34443,"asset_number":"W-34443","asset_name":"Well Epsilon 34443","asset_category_id":200,"description":"Production well in Field Epsilon","date_placed_in_service":"2019-09-30","asset_cost":4900000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellEpsilon34443.pdf", "lastUpdated": "2025-01-11T04:00:00Z"} +{"asset_id":34443,"asset_number":"W-34443","asset_name":"Well Epsilon 34443","asset_category_id":200,"description":"Production well in Field Epsilon","date_placed_in_service":"2019-09-30","asset_cost":4900000,"status":"ACTIVE","asset_manual":"http://docs.example.com/manuals/WellEpsilon34443.pdf", "lastUpdated": "2025-02-11T05:00:00Z"} \ No newline at end of file diff --git a/oil-gas-agent-automation/test-data/assets.schema.yml b/oil-gas-agent-automation/test-data/assets.schema.yml new file mode 100644 index 00000000..5fa5a6b6 --- /dev/null +++ b/oil-gas-agent-automation/test-data/assets.schema.yml @@ -0,0 +1,45 @@ +--- +name: "assets" +schema_version: "1" +partial_schema: false +columns: +- name: "asset_id" + type: "BIGINT" + tests: + - "not_null" +- name: "asset_number" + type: "STRING" + tests: + - "not_null" +- name: "asset_name" + type: "STRING" + tests: + - "not_null" +- name: "asset_category_id" + type: "BIGINT" + tests: + - "not_null" +- name: "description" + type: "STRING" + tests: + - "not_null" +- name: "date_placed_in_service" + type: "STRING" + tests: + - "not_null" +- name: "asset_cost" + type: "BIGINT" + tests: + - "not_null" +- name: "status" + type: "STRING" + tests: + - "not_null" +- name: "asset_manual" + type: "STRING" + tests: + - "not_null" +- name: "lastUpdated" + type: "TIMESTAMP" + tests: + - "not_null" diff --git a/oil-gas-agent-automation/test-data/assets.table.json b/oil-gas-agent-automation/test-data/assets.table.json new file mode 100644 index 00000000..016f9d07 --- /dev/null +++ b/oil-gas-agent-automation/test-data/assets.table.json @@ -0,0 +1,15 @@ +{ + "flink" : { + "format" : "flexible-json", + "path" : "${DATA_PATH}/assets.jsonl", + "source.monitor-interval" : 10000, + "connector" : "filesystem" + }, + "version" : 1, + "table" : { + "type" : "source", + "primary-key" : [ "asset_id", "lastUpdated" ], + "timestamp" : "lastUpdated", + "watermark-millis" : "0" + } +} \ No newline at end of file diff --git a/oil-gas-agent-automation/test-data/flowrate.jsonl.gz b/oil-gas-agent-automation/test-data/flowrate.jsonl.gz new file mode 100644 index 00000000..10f05c21 Binary files /dev/null and b/oil-gas-agent-automation/test-data/flowrate.jsonl.gz differ diff --git a/oil-gas-agent-automation/test-data/flowrate.schema.yml b/oil-gas-agent-automation/test-data/flowrate.schema.yml new file mode 100644 index 00000000..5ed9a81c --- /dev/null +++ b/oil-gas-agent-automation/test-data/flowrate.schema.yml @@ -0,0 +1,17 @@ +--- +name: "flowrate" +schema_version: "1" +partial_schema: false +columns: +- name: "assetId" + type: "BIGINT" + tests: + - "not_null" +- name: "flowrate" + type: "DOUBLE" + tests: + - "not_null" +- name: "event_time" + type: "TIMESTAMP" + tests: + - "not_null" diff --git a/oil-gas-agent-automation/test-data/flowrate.table.json b/oil-gas-agent-automation/test-data/flowrate.table.json new file mode 100644 index 00000000..5dd01c40 --- /dev/null +++ b/oil-gas-agent-automation/test-data/flowrate.table.json @@ -0,0 +1,15 @@ +{ + "flink" : { + "format" : "flexible-json", + "path" : "${DATA_PATH}/flowrate.jsonl.gz", + "source.monitor-interval" : 10000, + "connector" : "filesystem" + }, + "version" : 1, + "table" : { + "type" : "source", + "primary-key" : [ "assetId", "event_time" ], + "timestamp" : "event_time", + "watermark-millis" : "0" + } +} \ No newline at end of file diff --git a/oil-gas-agent-automation/test-data/maintenance.jsonl b/oil-gas-agent-automation/test-data/maintenance.jsonl new file mode 100644 index 00000000..5cd8e7bb --- /dev/null +++ b/oil-gas-agent-automation/test-data/maintenance.jsonl @@ -0,0 +1,8 @@ +{"work_order_id":2,"work_order_num":"WO-10457","asset_id":12221,"description":"Pump replacement and calibration","wo_type":"EQUIP_REPLACE","priority":"MEDIUM","status":"OPEN","request_date":"2024-08-01","start_date":"2024-11-05","completion_date":"2024-11-07", "lastUpdated": "2024-11-07T06:00:00Z"} +{"work_order_id":3,"work_order_num":"WO-10458","asset_id":59995,"description":"Routine inspection and valve check","wo_type":"INSPECTION","priority":"LOW","status":"CLOSED","request_date":"2024-08-10","start_date":"2024-08-12","completion_date":"2024-08-12", "lastUpdated": "2024-08-12T22:00:00Z"} +{"work_order_id":4,"work_order_num":"WO-10459","asset_id":21112,"description":"Emergency leak repair at junction","wo_type":"EMERGENCY_REPAIR","priority":"HIGH","status":"CLOSED","request_date":"2024-08-30","start_date":"2024-09-01","completion_date":"2024-09-01", "lastUpdated": "2024-09-02T02:00:00Z"} +{"work_order_id":8,"work_order_num":"WO-10463","asset_id":12221,"description":"Hot-oil treatment for wax removal","wo_type":"PIPE_CLEANING","priority":"HIGH","status":"CLOSED","request_date":"2024-09-20","start_date":"2024-09-25","completion_date":"2024-09-25", "lastUpdated": "2024-09-26T06:00:00Z"} +{"work_order_id":5,"work_order_num":"WO-10460","asset_id":45555,"description":"Acid treatment for scale removal","wo_type":"ACID_TREATMENT","priority":"MEDIUM","status":"CLOSED","request_date":"2024-10-20","start_date":"2024-10-25","completion_date":"2024-10-25", "lastUpdated": "2024-10-25T19:00:00Z"} +{"work_order_id":6,"work_order_num":"WO-10461","asset_id":45555,"description":"Compressor maintenance and oil change","wo_type":"MAINTENANCE","priority":"LOW","status":"SCHEDULED","request_date":"2024-10-20","start_date":"2024-10-22","completion_date":"2024-10-23", "lastUpdated": "2024-10-30T02:20:00Z"} +{"work_order_id":7,"work_order_num":"WO-10462","asset_id":34443,"description":"Hydraulic system pressure testing","wo_type":"TESTING","priority":"MEDIUM","status":"IN_PROGRESS","request_date":"2024-11-15","start_date":null,"completion_date":null, "lastUpdated": "2024-11-15T11:00:00Z"} +{"work_order_id":7,"work_order_num":"WO-10462","asset_id":34443,"description":"Hydraulic system pressure testing","wo_type":"TESTING","priority":"MEDIUM","status":"IN_PROGRESS","request_date":"2024-11-15","start_date":"2024-12-17","completion_date":"2025-02-10", "lastUpdated": "2025-02-12T01:00:00Z"} diff --git a/oil-gas-agent-automation/test-data/maintenance.schema.yml b/oil-gas-agent-automation/test-data/maintenance.schema.yml new file mode 100644 index 00000000..90acd6e6 --- /dev/null +++ b/oil-gas-agent-automation/test-data/maintenance.schema.yml @@ -0,0 +1,45 @@ +--- +name: "maintenance" +schema_version: "1" +partial_schema: false +columns: +- name: "work_order_id" + type: "BIGINT" + tests: + - "not_null" +- name: "work_order_num" + type: "STRING" + tests: + - "not_null" +- name: "asset_id" + type: "BIGINT" + tests: + - "not_null" +- name: "description" + type: "STRING" + tests: + - "not_null" +- name: "wo_type" + type: "STRING" + tests: + - "not_null" +- name: "priority" + type: "STRING" + tests: + - "not_null" +- name: "status" + type: "STRING" + tests: + - "not_null" +- name: "request_date" + type: "STRING" + tests: + - "not_null" +- name: "start_date" + type: "STRING" +- name: "completion_date" + type: "STRING" +- name: "lastUpdated" + type: "TIMESTAMP" + tests: + - "not_null" diff --git a/oil-gas-agent-automation/test-data/maintenance.table.json b/oil-gas-agent-automation/test-data/maintenance.table.json new file mode 100644 index 00000000..cc0d481b --- /dev/null +++ b/oil-gas-agent-automation/test-data/maintenance.table.json @@ -0,0 +1,15 @@ +{ + "flink" : { + "format" : "flexible-json", + "path" : "${DATA_PATH}/maintenance.jsonl", + "source.monitor-interval" : 10000, + "connector" : "filesystem" + }, + "version" : 1, + "table" : { + "type" : "source", + "primary-key" : [ "work_order_id", "lastUpdated" ], + "timestamp" : "lastUpdated", + "watermark-millis" : "0" + } +} \ No newline at end of file diff --git a/oil-gas-agent-automation/test-data/measurement.jsonl.gz b/oil-gas-agent-automation/test-data/measurement.jsonl.gz new file mode 100644 index 00000000..8b496e5f Binary files /dev/null and b/oil-gas-agent-automation/test-data/measurement.jsonl.gz differ diff --git a/oil-gas-agent-automation/test-data/measurement.schema.yml b/oil-gas-agent-automation/test-data/measurement.schema.yml new file mode 100644 index 00000000..8827560a --- /dev/null +++ b/oil-gas-agent-automation/test-data/measurement.schema.yml @@ -0,0 +1,21 @@ +--- +name: "measurement" +schema_version: "1" +partial_schema: false +columns: +- name: "assetId" + type: "BIGINT" + tests: + - "not_null" +- name: "pressure_psi" + type: "DOUBLE" + tests: + - "not_null" +- name: "temperature_f" + type: "DOUBLE" + tests: + - "not_null" +- name: "timestamp" + type: "TIMESTAMP" + tests: + - "not_null" diff --git a/oil-gas-agent-automation/test-data/measurement.table.json b/oil-gas-agent-automation/test-data/measurement.table.json new file mode 100644 index 00000000..8cfadc51 --- /dev/null +++ b/oil-gas-agent-automation/test-data/measurement.table.json @@ -0,0 +1,15 @@ +{ + "flink" : { + "format" : "flexible-json", + "path" : "${DATA_PATH}/measurement.jsonl.gz", + "source.monitor-interval" : 10000, + "connector" : "filesystem" + }, + "version" : 1, + "table" : { + "type" : "source", + "primary-key" : [ "assetId", "timestamp" ], + "timestamp" : "timestamp", + "watermark-millis" : "0" + } +} \ No newline at end of file