diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/README.md b/oil-gas-agent-automation/oil-gas-agent-operations/README.md new file mode 100644 index 00000000..2bb7d70a --- /dev/null +++ b/oil-gas-agent-automation/oil-gas-agent-operations/README.md @@ -0,0 +1,13 @@ +# IoT Monitoring Agent + +Ingests IoT data about wells and a change-stream of updates for the monitored assets and their maintenance records. +Analyzes and relates the information to provide comprehensive monitoring with context. + +The result is exposed via model context protocol (MCP) to be consumed by agents for automated troubleshooting. It also exposes GraphQL and REST APIs for testing and troubleshooting. + +This project template only configures a test environment. + +## Customization + +- Update the sources and processing script +- Add sources and configuration for environments other than testing. Look at the other templates for examples. \ No newline at end of file diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/connectors/sources-local.sqrl b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/sources-local.sqrl new file mode 100644 index 00000000..c6a62832 --- /dev/null +++ b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/sources-local.sqrl @@ -0,0 +1,67 @@ +CREATE TABLE AssetUpdates ( + `asset_id` BIGINT NOT NULL, + `asset_number` STRING NOT NULL, + `asset_name` STRING NOT NULL, + `asset_category_id` BIGINT NOT NULL, + `description` STRING NOT NULL, + `date_placed_in_service` STRING NOT NULL, + `asset_cost` BIGINT NOT NULL, + `status` STRING NOT NULL, + `asset_manual` STRING NOT NULL, + `lastUpdated` TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`asset_id`, `lastUpdated`) NOT ENFORCED, + WATERMARK FOR `lastUpdated` AS `lastUpdated` +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/assets.jsonl', + 'source.monitor-interval' = '10000', + 'connector' = 'filesystem' +); + +CREATE TABLE FlowRate ( + `assetId` BIGINT NOT NULL, + `flowrate` DOUBLE NOT NULL, + `event_time` TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`assetId`, `event_time`) NOT ENFORCED, + WATERMARK FOR `event_time` AS `event_time` +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/flowrate.jsonl.gz', + 'source.monitor-interval' = '10000', + 'connector' = 'filesystem' +); + +CREATE TABLE MaintenanceUpdates ( + `work_order_id` BIGINT NOT NULL, + `work_order_num` STRING NOT NULL, + `asset_id` BIGINT NOT NULL, + `description` STRING NOT NULL, + `wo_type` STRING NOT NULL, + `priority` STRING NOT NULL, + `status` STRING NOT NULL, + `request_date` STRING NOT NULL, + `start_date` STRING, + `completion_date` STRING, + `lastUpdated` TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`work_order_id`, `lastUpdated`) NOT ENFORCED, + WATERMARK FOR `lastUpdated` AS `lastUpdated` +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/maintenance.jsonl', + 'source.monitor-interval' = '10000', + 'connector' = 'filesystem' +); + +CREATE TABLE Measurement ( + `assetId` BIGINT NOT NULL, + `pressure_psi` DOUBLE NOT NULL, + `temperature_f` DOUBLE NOT NULL, + `timestamp` TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`assetId`, `timestamp`) NOT ENFORCED, + WATERMARK FOR `timestamp` AS `timestamp` +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/measurement.jsonl.gz', + 'source.monitor-interval' = '10000', + 'connector' = 'filesystem' +); diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/assets.jsonl b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/assets.jsonl similarity index 100% rename from oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/assets.jsonl rename to oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/assets.jsonl diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/flowrate.jsonl.gz b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/flowrate.jsonl.gz new file mode 100644 index 00000000..e69de29b diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/maintenance.jsonl b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/maintenance.jsonl similarity index 100% rename from oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/maintenance.jsonl rename to oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/maintenance.jsonl diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/measurement.jsonl.gz b/oil-gas-agent-automation/oil-gas-agent-operations/connectors/test-data/measurement.jsonl.gz new file mode 100644 index 00000000..e69de29b diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/assets.table.sql b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/assets.table.sql deleted file mode 100644 index 3c81a7bc..00000000 --- a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/assets.table.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE TABLE Assets ( - `asset_id` BIGINT NOT NULL, - `asset_number` STRING NOT NULL, - `asset_name` STRING NOT NULL, - `asset_category_id` BIGINT NOT NULL, - `description` STRING NOT NULL, - `date_placed_in_service` STRING NOT NULL, - `asset_cost` BIGINT NOT NULL, - `status` STRING NOT NULL, - `asset_manual` STRING NOT NULL, - `lastUpdated` TIMESTAMP_LTZ(3) NOT NULL, - PRIMARY KEY (`asset_id`, `lastUpdated`) NOT ENFORCED, - WATERMARK FOR `lastUpdated` AS `lastUpdated` -) WITH ( - 'format' = 'flexible-json', - 'path' = '${DATA_PATH}/assets.jsonl', - 'source.monitor-interval' = '10000', - 'connector' = 'filesystem' -); diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flow_rate.table.sql b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flow_rate.table.sql deleted file mode 100644 index 64130201..00000000 --- a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flow_rate.table.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE FlowRate ( - `assetId` BIGINT NOT NULL, - `flowrate` DOUBLE NOT NULL, - `event_time` TIMESTAMP_LTZ(3) NOT NULL, - PRIMARY KEY (`assetId`, `event_time`) NOT ENFORCED, - WATERMARK FOR `event_time` AS `event_time` -) WITH ( - 'format' = 'flexible-json', - 'path' = '${DATA_PATH}/flowrate.jsonl.gz', - 'source.monitor-interval' = '10000', - 'connector' = 'filesystem' -); diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flowrate.jsonl.gz b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flowrate.jsonl.gz deleted file mode 100644 index 10f05c21..00000000 Binary files a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/flowrate.jsonl.gz and /dev/null differ diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/maintenance.table.sql b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/maintenance.table.sql deleted file mode 100644 index 335dd491..00000000 --- a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/maintenance.table.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE TABLE Maintenance ( - `work_order_id` BIGINT NOT NULL, - `work_order_num` STRING NOT NULL, - `asset_id` BIGINT NOT NULL, - `description` STRING NOT NULL, - `wo_type` STRING NOT NULL, - `priority` STRING NOT NULL, - `status` STRING NOT NULL, - `request_date` STRING NOT NULL, - `start_date` STRING, - `completion_date` STRING, - `lastUpdated` TIMESTAMP_LTZ(3) NOT NULL, - PRIMARY KEY (`work_order_id`, `lastUpdated`) NOT ENFORCED, - WATERMARK FOR `lastUpdated` AS `lastUpdated` -) WITH ( - 'format' = 'flexible-json', - 'path' = '${DATA_PATH}/maintenance.jsonl', - 'source.monitor-interval' = '10000', - 'connector' = 'filesystem' -); diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.jsonl.gz b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.jsonl.gz deleted file mode 100644 index 8b496e5f..00000000 Binary files a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.jsonl.gz and /dev/null differ diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.table.sql b/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.table.sql deleted file mode 100644 index c9db6cb7..00000000 --- a/oil-gas-agent-automation/oil-gas-agent-operations/oilgas-testdata/measurement.table.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE Measurement ( - `assetId` BIGINT NOT NULL, - `pressure_psi` DOUBLE NOT NULL, - `temperature_f` DOUBLE NOT NULL, - `timestamp` TIMESTAMP_LTZ(3) NOT NULL, - PRIMARY KEY (`assetId`, `timestamp`) NOT ENFORCED, - WATERMARK FOR `timestamp` AS `timestamp` -) WITH ( - 'format' = 'flexible-json', - 'path' = '${DATA_PATH}/measurement.jsonl.gz', - 'source.monitor-interval' = '10000', - 'connector' = 'filesystem' -); diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent_package_test.json b/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent-test-package.json similarity index 90% rename from oil-gas-agent-automation/oil-gas-agent-operations/operations_agent_package_test.json rename to oil-gas-agent-automation/oil-gas-agent-operations/operations_agent-test-package.json index f3e087d2..4a10bc42 100644 --- a/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent_package_test.json +++ b/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent-test-package.json @@ -4,7 +4,7 @@ "script": { "main": "operations_agent.sqrl", "config": { - "variant": "testdata" + "environment": "local" } }, "engines": { diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent.sqrl b/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent.sqrl index cfff2d3d..10650ed4 100644 --- a/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent.sqrl +++ b/oil-gas-agent-automation/oil-gas-agent-operations/operations_agent.sqrl @@ -1,36 +1,33 @@ -IMPORT oilgas-{{variant}}.assets AS _AssetUpdates; -IMPORT oilgas-{{variant}}.maintenance AS _MaintenanceUpdates; -IMPORT oilgas-{{variant}}.measurement AS _Measurement; +IMPORT connectors.sources-{{environment}} AS sources; -/*+no_query */ -CREATE TABLE FlowRate ( - event_id STRING NOT NULL METADATA FROM 'uuid', - assetId BIGINT NOT NULL, - flowrate FLOAT NOT NULL, - event_time TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp' -); +/* ======================================== + PART 1: Deduplication of Input CDC Streams + ======================================== */ /** Returns information about the well/asset including it's maintenance records, manual, description, and other important information. */ /*+query_by_all(asset_id) */ -Assets := DISTINCT _AssetUpdates ON asset_id ORDER BY lastUpdated DESC; +Assets := DISTINCT sources.AssetUpdates ON asset_id ORDER BY lastUpdated DESC; /*+no_query */ -Maintenance := DISTINCT _MaintenanceUpdates ON work_order_id ORDER BY lastUpdated DESC; +Maintenance := DISTINCT sources.MaintenanceUpdates ON work_order_id ORDER BY lastUpdated DESC; Assets.maintenance := SELECT * FROM Maintenance m WHERE this.asset_id = m.asset_id ORDER BY m.lastUpdated DESC; -EXPORT FlowRate TO print.Flowrate; +/* ======================================== + PART 2: IoT Event Processing + ======================================== */ +/* Enrich well flowrate events with information about the well (asset) */ /*+no_query */ EnrichedFlowRate := SELECT f.*, a.asset_number, a.asset_name, a.description - FROM FlowRate f + FROM sources.FlowRate f JOIN Assets FOR SYSTEM_TIME AS OF f.event_time a ON f.assetId = a.asset_id; -EXPORT EnrichedFlowRate TO print.EnrichedFlowRate; - LowFlowRate := SUBSCRIBE SELECT * FROM EnrichedFlowRate WHERE flowrate < 200; +EXPORT LowFlowRate TO logger.LowFlow; + /** Returns the flowrate readings for a given well/asset within the specified range order by timestamp decreasing. */ @@ -46,10 +43,16 @@ Returns recent pressure and temperature readings for a given well/asset by id. RecentPressure := SELECT assetId, window_time AS timestamp_normalized, AVG(pressure_psi) AS pressure_psi, AVG(temperature_f) AS temperature_f - FROM TABLE(TUMBLE(TABLE _Measurement, DESCRIPTOR(`timestamp`), INTERVAL '10' SECOND)) + FROM TABLE(TUMBLE(TABLE sources.Measurement, DESCRIPTOR(`timestamp`), INTERVAL '10' SECOND)) GROUP BY assetId, window_start, window_end, window_time ORDER BY window_time DESC; +/* ======================================== + PART 3: Testing + ======================================== */ -/*+ test */ +/*+test */ RecentPressureTest := SELECT * FROM RecentPressure ORDER BY timestamp_normalized DESC, assetId ASC; + +/*+test */ +LowFlowRateTest := SELECT * FROM LowFlowRate ORDER BY flowrate ASC, event_time DESC; \ No newline at end of file diff --git a/oil-gas-agent-automation/oil-gas-agent-operations/snapshots/LowFlowRateTest.snapshot b/oil-gas-agent-automation/oil-gas-agent-operations/snapshots/LowFlowRateTest.snapshot new file mode 100644 index 00000000..1cb1cd89 --- /dev/null +++ b/oil-gas-agent-automation/oil-gas-agent-operations/snapshots/LowFlowRateTest.snapshot @@ -0,0 +1,75 @@ +{ + "data" : { + "LowFlowRateTest" : [ { + "assetId" : 45555, + "flowrate" : 98.4, + "event_time" : "2025-02-11T12:21:39.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 99.5, + "event_time" : "2025-02-11T12:21:38.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 104.8, + "event_time" : "2025-02-11T12:21:37.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 107.2, + "event_time" : "2025-02-11T12:21:36.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 110.7, + "event_time" : "2025-02-11T12:21:35.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 113.1, + "event_time" : "2025-02-11T12:21:34.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 114.6, + "event_time" : "2025-02-11T12:21:33.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 45555, + "flowrate" : 114.9, + "event_time" : "2025-02-11T12:21:32.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + }, { + "assetId" : 34443, + "flowrate" : 117.3, + "event_time" : "2025-02-11T12:21:39.000Z", + "asset_number" : "W-34443", + "asset_name" : "Well Epsilon 34443", + "description" : "Production well in Field Epsilon" + }, { + "assetId" : 45555, + "flowrate" : 117.3, + "event_time" : "2025-02-11T12:21:31.000Z", + "asset_number" : "W-45555", + "asset_name" : "Well Delta 45555", + "description" : "Injection well in Field Delta" + } ] + } +} \ No newline at end of file