Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions oil-gas-agent-automation/oil-gas-agent-operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# IoT Monitoring Agent

Ingests IoT data about wells and a change-stream of updates for the monitored assets and their maintenance records.
Analyzes and relates the information to provide comprehensive monitoring with context.

The result is exposed via model context protocol (MCP) to be consumed by agents for automated troubleshooting. It also exposes GraphQL and REST APIs for testing and troubleshooting.

This project template only configures a test environment.

## Customization

- Update the sources and processing script
- Add sources and configuration for environments other than testing. Look at the other templates for examples.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
CREATE TABLE AssetUpdates (
`asset_id` BIGINT NOT NULL,
`asset_number` STRING NOT NULL,
`asset_name` STRING NOT NULL,
`asset_category_id` BIGINT NOT NULL,
`description` STRING NOT NULL,
`date_placed_in_service` STRING NOT NULL,
`asset_cost` BIGINT NOT NULL,
`status` STRING NOT NULL,
`asset_manual` STRING NOT NULL,
`lastUpdated` TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`asset_id`, `lastUpdated`) NOT ENFORCED,
WATERMARK FOR `lastUpdated` AS `lastUpdated`
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/assets.jsonl',
'source.monitor-interval' = '10000',
'connector' = 'filesystem'
);

CREATE TABLE FlowRate (
`assetId` BIGINT NOT NULL,
`flowrate` DOUBLE NOT NULL,
`event_time` TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`assetId`, `event_time`) NOT ENFORCED,
WATERMARK FOR `event_time` AS `event_time`
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/flowrate.jsonl.gz',
'source.monitor-interval' = '10000',
'connector' = 'filesystem'
);

CREATE TABLE MaintenanceUpdates (
`work_order_id` BIGINT NOT NULL,
`work_order_num` STRING NOT NULL,
`asset_id` BIGINT NOT NULL,
`description` STRING NOT NULL,
`wo_type` STRING NOT NULL,
`priority` STRING NOT NULL,
`status` STRING NOT NULL,
`request_date` STRING NOT NULL,
`start_date` STRING,
`completion_date` STRING,
`lastUpdated` TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`work_order_id`, `lastUpdated`) NOT ENFORCED,
WATERMARK FOR `lastUpdated` AS `lastUpdated`
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/maintenance.jsonl',
'source.monitor-interval' = '10000',
'connector' = 'filesystem'
);

CREATE TABLE Measurement (
`assetId` BIGINT NOT NULL,
`pressure_psi` DOUBLE NOT NULL,
`temperature_f` DOUBLE NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`assetId`, `timestamp`) NOT ENFORCED,
WATERMARK FOR `timestamp` AS `timestamp`
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/measurement.jsonl.gz',
'source.monitor-interval' = '10000',
'connector' = 'filesystem'
);

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"script": {
"main": "operations_agent.sqrl",
"config": {
"variant": "testdata"
"environment": "local"
}
},
"engines": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
IMPORT oilgas-{{variant}}.assets AS _AssetUpdates;
IMPORT oilgas-{{variant}}.maintenance AS _MaintenanceUpdates;
IMPORT oilgas-{{variant}}.measurement AS _Measurement;
IMPORT connectors.sources-{{environment}} AS sources;

/*+no_query */
CREATE TABLE FlowRate (
event_id STRING NOT NULL METADATA FROM 'uuid',
assetId BIGINT NOT NULL,
flowrate FLOAT NOT NULL,
event_time TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp'
);
/* ========================================
PART 1: Deduplication of Input CDC Streams
======================================== */

/**
Returns information about the well/asset including it's maintenance records, manual,
description, and other important information.
*/
/*+query_by_all(asset_id) */
Assets := DISTINCT _AssetUpdates ON asset_id ORDER BY lastUpdated DESC;
Assets := DISTINCT sources.AssetUpdates ON asset_id ORDER BY lastUpdated DESC;
/*+no_query */
Maintenance := DISTINCT _MaintenanceUpdates ON work_order_id ORDER BY lastUpdated DESC;
Maintenance := DISTINCT sources.MaintenanceUpdates ON work_order_id ORDER BY lastUpdated DESC;
Assets.maintenance := SELECT * FROM Maintenance m WHERE this.asset_id = m.asset_id ORDER BY m.lastUpdated DESC;

EXPORT FlowRate TO print.Flowrate;
/* ========================================
PART 2: IoT Event Processing
======================================== */

/* Enrich well flowrate events with information about the well (asset) */
/*+no_query */
EnrichedFlowRate := SELECT f.*, a.asset_number, a.asset_name, a.description
FROM FlowRate f
FROM sources.FlowRate f
JOIN Assets FOR SYSTEM_TIME AS OF f.event_time a ON f.assetId = a.asset_id;

EXPORT EnrichedFlowRate TO print.EnrichedFlowRate;

LowFlowRate := SUBSCRIBE SELECT * FROM EnrichedFlowRate WHERE flowrate < 200;

EXPORT LowFlowRate TO logger.LowFlow;

/**
Returns the flowrate readings for a given well/asset within the specified range order by timestamp decreasing.
*/
Expand All @@ -46,10 +43,16 @@ Returns recent pressure and temperature readings for a given well/asset by id.
RecentPressure := SELECT assetId, window_time AS timestamp_normalized,
AVG(pressure_psi) AS pressure_psi,
AVG(temperature_f) AS temperature_f
FROM TABLE(TUMBLE(TABLE _Measurement, DESCRIPTOR(`timestamp`), INTERVAL '10' SECOND))
FROM TABLE(TUMBLE(TABLE sources.Measurement, DESCRIPTOR(`timestamp`), INTERVAL '10' SECOND))
GROUP BY assetId, window_start, window_end, window_time
ORDER BY window_time DESC;

/* ========================================
PART 3: Testing
======================================== */

/*+ test */
/*+test */
RecentPressureTest := SELECT * FROM RecentPressure ORDER BY timestamp_normalized DESC, assetId ASC;

/*+test */
LowFlowRateTest := SELECT * FROM LowFlowRate ORDER BY flowrate ASC, event_time DESC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"data" : {
"LowFlowRateTest" : [ {
"assetId" : 45555,
"flowrate" : 98.4,
"event_time" : "2025-02-11T12:21:39.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 99.5,
"event_time" : "2025-02-11T12:21:38.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 104.8,
"event_time" : "2025-02-11T12:21:37.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 107.2,
"event_time" : "2025-02-11T12:21:36.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 110.7,
"event_time" : "2025-02-11T12:21:35.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 113.1,
"event_time" : "2025-02-11T12:21:34.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 114.6,
"event_time" : "2025-02-11T12:21:33.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 45555,
"flowrate" : 114.9,
"event_time" : "2025-02-11T12:21:32.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
}, {
"assetId" : 34443,
"flowrate" : 117.3,
"event_time" : "2025-02-11T12:21:39.000Z",
"asset_number" : "W-34443",
"asset_name" : "Well Epsilon 34443",
"description" : "Production well in Field Epsilon"
}, {
"assetId" : 45555,
"flowrate" : 117.3,
"event_time" : "2025-02-11T12:21:31.000Z",
"asset_number" : "W-45555",
"asset_name" : "Well Delta 45555",
"description" : "Injection well in Field Delta"
} ]
}
}
Loading