Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.datasqrl.data.GenerateClickstream;
import com.datasqrl.data.GenerateCreditCards;
import com.datasqrl.data.GenerateLoans;
import com.datasqrl.data.GenerateMeasurement;
import com.datasqrl.data.GeneratePatients;
import com.datasqrl.data.GenerateSensors;
import com.datasqrl.data.GenerateWarrants;
Expand All @@ -14,7 +15,7 @@

@CommandLine.Command(name = "datasqrl", mixinStandardHelpOptions = true, version = "0.1",
subcommands = {GenerateClickstream.class, GenerateSensors.class, GenerateLoans.class, GenerateCreditCards.class,
GeneratePatients.class, GenerateWarrants.class})
GeneratePatients.class, GenerateWarrants.class, GenerateMeasurement.class})
@Getter
public class RootGenerateCommand implements Runnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public static class Reward {

public static class Config implements Configuration {

public int numCustomers = 10;
public int numCustomers = 1000;

public int numMerchants = 100;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.datasqrl.data;

import com.datasqrl.cmd.AbstractGenerateCommand;
import com.datasqrl.util.Configuration;
import com.datasqrl.util.RandomSampler;
import com.datasqrl.util.SerializerUtil;
import com.datasqrl.util.WriterUtil;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.EqualsAndHashCode;
import lombok.EqualsAndHashCode.Include;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import picocli.CommandLine;

@CommandLine.Command(name = "measurement", description = "Generates Measurements")
public class GenerateMeasurement extends AbstractGenerateCommand {


@Override
public void run() {
initialize();
Config config = getConfiguration(new Config());
long numMeasurements = root.getNumber();
Instant end = Instant.now().truncatedTo(ChronoUnit.SECONDS);
DataFunction normalTemperature = new DataFunction(190, 190, 0, 1);
DataFunction normalPressure = new DataFunction(3000, 3000, 0, 4);
DataFunction normalFlow = new DataFunction(250, 250, 0, 2);
Map<Integer, List<DataFunction>> assets = Map.of(
12221, List.of(normalPressure.with(3400, -3600), normalTemperature, normalFlow.with(180, -3600)),
21112, List.of(normalPressure.with(1800, -4000), normalTemperature, normalFlow.with(150, -1400)),
34443, List.of(normalPressure.with(3800, -100), normalTemperature, normalFlow.with(120, -40)),
45555, List.of(normalPressure.with(2000, -190), normalTemperature, normalFlow.with(100, -70)),
59995, List.of(normalPressure, normalTemperature, normalFlow)
);
System.out.println(assets.keySet());

List<Measurement> measurements = new ArrayList<>();
List<FlowRate> flowrates = new ArrayList<>();
for (int i = 1; i <=numMeasurements ; i++) {
long past = i-numMeasurements;
Instant timestamp = end.plus(past, ChronoUnit.SECONDS);
assets.forEach((id, functions) -> {
measurements.add(new Measurement(//UUID.randomUUID().toString(),
id,
functions.get(0).getValue(past),
functions.get(1).getValue(past),
timestamp.toString()));
flowrates.add(new FlowRate(id,
functions.get(2).getValue(past),
timestamp.toString()));
});


}

WriterUtil.writeToFileSorted(measurements, getOutputDir().resolve("measurement.jsonl"),
Comparator.comparing(Measurement::getTimestamp),
null, null);
WriterUtil.writeToFileSorted(flowrates, getOutputDir().resolve("flowrate.jsonl"),
Comparator.comparing(FlowRate::getTimestamp),
null, null);
}

@Value
public class DataFunction {
double startValue;
double endValue;
long startTime;
double noiseStdDev;

public double getValue(long past) {
double value = startValue;
if (past>startTime) {
long delta = past - startTime;
value = value + (endValue - startValue) * (delta/Math.abs(startTime*1.0));
}
return roundToOneDecimalPlace(value + sampler.nextNormal(0, noiseStdDev));
}

public DataFunction with(double endValue, long startTime) {
return new DataFunction(startValue, endValue, startTime, noiseStdDev);
}

public static double roundToOneDecimalPlace(double value) {
return Math.round(value * 10.0) / 10.0;
}


}


@Value
public static class Measurement {

int assetId;
double pressure_psi;
double temperature_f;
String timestamp;

@Override
public String toString() {
return SerializerUtil.toJson(this);
}

}

@Value
public static class FlowRate {

int assetId;
double flowrate;
String timestamp;

@Override
public String toString() {
return SerializerUtil.toJson(this);
}

}


public static class Config implements Configuration {

public int numSensors = 10;

public int avgSensorsPerMachine = 2;

public int avgSensorReassignments = 3;

public int avgSensorReassignmentsDeviation = 1;

public int avgGroupPerDay = 5;

public double avgGroupPerDayDeviation = 4.0;

public int avgMachinesPerGroup = 10;

public double avgMachinesPerGroupDeviation = 20.0;

public double temperatureBaselineMin = 97;

public double temperatureBaselineMax = 99;

public double maxNoise = 0.05;

public double maxTemperaturePeakStdDev = 2.0;

public int minMaxRampWidth = 600;

public int maxMaxRampWidth = 10000;

public double errorProbability = 0.00;



@Override
public void scale(long scaleFactor, long number) {
numSensors = (int)Math.min(10000000,Math.max(20, number/(86400*60)));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@ public void testLoan() {

@Test
public void testCreditCard() {
new RootGenerateCommand().getCmd().execute(new String[]{"creditcard","-n","100","-o","data/creditcard"});
new RootGenerateCommand().getCmd().execute(new String[]{"creditcard","-n","365","-o","data/creditcard"});
}

@Test
public void TestWarrants() {
new RootGenerateCommand().getCmd().execute(new String[]{"warrants","-n","100","-o","data/warrants"});
}

@Test
public void testMeasurements() {
new RootGenerateCommand().getCmd().execute(new String[]{"measurement","-n","8000","-o","data/well"});
}



}
41 changes: 41 additions & 0 deletions oil-gas-agent-automation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Automated Monitoring - Oil & Gas Use Case

Run the data backend with the following command:

`docker run -it -p 8888:8888 -p 8081:8081 -p 9092:9092 --rm -v $PWD:/build datasqrl/cmd:latest run -c package-local.json `

To publish flowrate metrics, open [GraphiQL in the browser](http://localhost:8888/graphiql/) and run the following mutation:

```graphql
mutation {
AddFlowRate(event: {assetId: 12221, flowrate: 220.5 }) {
assetId
flowrate
}
}
```

This sample dataset has 5 pre-defined wells (i.e. assets) with the following ids:
`12221, 21112, 34443, 45555, 59995`

Open GraphiQL again in another window and listen to this subscription:
```graphql
subscription {
LowFlowRate {
assetId
flowrate
}
}
```

Add another flowrate metric with a value below `200` and observe it through the subscription.
To retrieve recent pressure and temperature readings for a well, run this query:
```graphql
{
RecentPressure(assetId: 12221) {
pressure_psi
temperature_f
timestamp_normalized
}
}
```
25 changes: 25 additions & 0 deletions oil-gas-agent-automation/monitoring-agent.sqrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
IMPORT test-data.*;
IMPORT time.endOfMinute;

Assets := DISTINCT Assets ON asset_id ORDER BY lastUpdated DESC;
Maintenance := DISTINCT Maintenance ON work_order_id ORDER BY lastUpdated DESC;
Assets.maintenance := JOIN Maintenance m ON @.asset_id = m.asset_id ORDER BY m.lastUpdated DESC;

FlowRateByRange(@minRate: DOUBLE, @maxRate: DOUBLE) := SELECT * FROM Flowrate
WHERE flowrate >= @minRate AND flowrate <= @maxRate ORDER BY event_time DESC LIMIT 100;

PressureByMinute := SELECT assetId, endOfMinute(timestamp) AS timestamp_min,
AVG(pressure_psi) AS avg_pressure_psi,
MAX(pressure_psi) AS max_pressure_psi,
AVG(temperature_f) AS avg_temperature_f
FROM Measurement
GROUP BY assetId, timestamp_min ORDER BY timestamp_min DESC;

FlowrateByMinute := SELECT assetId, endOfMinute(event_time) AS timestamp_min,
AVG(flowrate) AS avg_flowrate,
MAX(flowrate) AS max_flowrate
FROM Flowrate
GROUP BY assetId, timestamp_min ORDER BY timestamp_min DESC;

Assets.flowrate := JOIN FlowrateByMinute f ON @.asset_id = f.assetId ORDER BY timestamp_min DESC;
Assets.pressure := JOIN PressureByMinute p ON @.asset_id = p.assetId ORDER BY timestamp_min DESC;
81 changes: 81 additions & 0 deletions oil-gas-agent-automation/operations-agent.graphqls
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"An RFC-3339 compliant DateTime Scalar"
scalar DateTime

type Query {
"""
Returns information about the well/asset including it's maintenance records, manual,
description, and other important information.
"""
Assets(asset_id: Int!, limit: Int = 20, offset: Int = 0): [Assets!]
"""
Returns the flowrate readings for a given well/asset within the specified range order by timestamp decreasing.
"""
FlowRateByRange(assetId: Int!, minRate: Float = 0, maxRate: Float = 10000): [EnrichedFlowRate!]
"""
Returns recent pressure and temperature readings for a given well/asset by id.
"""
RecentPressure(assetId: Int!, limit: Int = 25, offset: Int = 0): [RecentPressure!]
}

type Subscription {
LowFlowRate: EnrichedFlowRate
}

type RecentPressure {
assetId: Int!
timestamp_normalized: DateTime!
pressure_psi: Float!
temperature_f: Float!
}

type Assets {
asset_id: Int!
asset_number: String!
asset_name: String!
asset_category_id: Int!
description: String!
date_placed_in_service: String!
asset_cost: Float!
status: String!
asset_manual: String!
lastUpdated: DateTime!
maintenance: [Maintenance!]
}

type Maintenance {
work_order_id: Int!
work_order_num: String!
asset_id: Int!
description: String!
wo_type: String!
priority: String!
status: String!
request_date: String!
start_date: String
completion_date: String
lastUpdated: DateTime!
}

type EnrichedFlowRate {
assetId: Int!
flowrate: Float!
event_time: DateTime!
asset_number: String!
asset_name: String!
description: String!
}

input FlowRateInput {
assetId: Int!
flowrate: Float!
}

type FlowRateResult {
assetId: Int!
flowrate: Float!
event_time: DateTime!
}

type Mutation {
AddFlowRate(event: FlowRateInput!): FlowRateResult!
}
Loading