Skip to content

Commit 942c766

Browse files
authored
feat(server): GetLatestObservations multiple locs (#75)
1 parent 5ae5e6a commit 942c766

File tree

6 files changed

+138
-83
lines changed

6 files changed

+138
-83
lines changed

internal/server/dummy/dataserverimpl.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,19 +317,27 @@ func (d *DataPlatformDataServiceServerImpl) CreateObservations(
317317
return &pb.CreateObservationsResponse{}, nil
318318
}
319319

320-
// GetLatestObservation implements dp.DataPlatformDataServiceServer.
321-
func (s *DataPlatformDataServiceServerImpl) GetLatestObservation(
320+
// GetLatestObservations implements dp.DataPlatformDataServiceServer.
321+
func (s *DataPlatformDataServiceServerImpl) GetLatestObservations(
322322
ctx context.Context,
323-
req *pb.GetLatestObservationRequest,
324-
) (*pb.GetLatestObservationResponse, error) {
323+
req *pb.GetLatestObservationsRequest,
324+
) (*pb.GetLatestObservationsResponse, error) {
325325
if req.PivotTimestampUtc == nil {
326326
req.PivotTimestampUtc = timestamppb.New(time.Now().UTC())
327327
}
328328

329-
return &pb.GetLatestObservationResponse{
330-
TimestampUtc: req.PivotTimestampUtc,
331-
ValueFraction: 0.75,
332-
EffectiveCapacityWatts: 150e6,
329+
observations := make([]*pb.GetLatestObservationsResponse_Observation, len(req.LocationUuids))
330+
for i := range observations {
331+
observations[i] = &pb.GetLatestObservationsResponse_Observation{
332+
LocationUuid: req.LocationUuids[i],
333+
TimestampUtc: req.PivotTimestampUtc,
334+
ValueFraction: 0.75,
335+
EffectiveCapacityWatts: 150e6,
336+
}
337+
}
338+
339+
return &pb.GetLatestObservationsResponse{
340+
Observations: observations,
333341
}, nil
334342
}
335343

internal/server/postgres/dataserverimpl.go

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -903,10 +903,10 @@ func (s *DataPlatformDataServiceServerImpl) CreateObservations(
903903
return &pb.CreateObservationsResponse{}, nil
904904
}
905905

906-
func (s *DataPlatformDataServiceServerImpl) GetLatestObservation(
906+
func (s *DataPlatformDataServiceServerImpl) GetLatestObservations(
907907
ctx context.Context,
908-
req *pb.GetLatestObservationRequest,
909-
) (*pb.GetLatestObservationResponse, error) {
908+
req *pb.GetLatestObservationsRequest,
909+
) (*pb.GetLatestObservationsResponse, error) {
910910
l := zerolog.Ctx(ctx)
911911
querier := db.New(ix.GetTxFromContext(ctx))
912912

@@ -915,31 +915,50 @@ func (s *DataPlatformDataServiceServerImpl) GetLatestObservation(
915915
req.PivotTimestampUtc = timestamppb.New(time.Now().UTC().Truncate(time.Minute))
916916
}
917917

918-
goprms := db.GetLatestObservationParams{
919-
GeometryUuid: uuid.MustParse(req.LocationUuid),
920-
SourceTypeID: int16(req.EnergySource),
921-
ObserverName: req.ObserverName,
922-
PivotTimeUtc: pgtype.Timestamp{Time: req.PivotTimestampUtc.AsTime(), Valid: true},
918+
locUuids := make([]uuid.UUID, len(req.LocationUuids))
919+
for i, locStr := range req.LocationUuids {
920+
locUuids[i] = uuid.MustParse(locStr)
921+
}
922+
923+
goprms := db.GetLatestObservationsParams{
924+
GeometryUuids: locUuids,
925+
SourceTypeID: int16(req.EnergySource),
926+
ObserverName: req.ObserverName,
927+
PivotTimeUtc: pgtype.Timestamp{Time: req.PivotTimestampUtc.AsTime(), Valid: true},
923928
}
924929

925-
dbObs, err := querier.GetLatestObservation(ctx, goprms)
930+
dbObs, err := querier.GetLatestObservations(ctx, goprms)
926931
if err != nil {
927932
l.Err(err).Msgf("querier.GetLatestObservation(%+v)", goprms)
928933

929934
return nil, status.Error(
930-
codes.NotFound,
931-
"No observations found. Ensure location and observer exist, and observations have been created.",
935+
codes.Internal,
936+
"Backend communication error. See logs for details.",
932937
)
933938
}
934939

935-
return &pb.GetLatestObservationResponse{
936-
TimestampUtc: timestamppb.New(dbObs.ObservationTimestampUtc.Time),
937-
ValueFraction: float32(dbObs.ValueSip) / 30000.0,
938-
EffectiveCapacityWatts: uint64(
939-
dbObs.CapacityIncLimit,
940-
) * uint64(
941-
math.Pow10(int(dbObs.CapacityUnitPrefixFactor)),
942-
),
940+
observations := make([]*pb.GetLatestObservationsResponse_Observation, len(dbObs))
941+
for i, obs := range dbObs {
942+
observations[i] = &pb.GetLatestObservationsResponse_Observation{
943+
LocationUuid: obs.GeometryUuid.String(),
944+
TimestampUtc: timestamppb.New(obs.ObservationTimestampUtc.Time),
945+
ValueFraction: float32(obs.ValueSip) / 30000.0,
946+
EffectiveCapacityWatts: uint64(
947+
obs.CapacityIncLimit,
948+
) * uint64(
949+
math.Pow10(int(obs.CapacityUnitPrefixFactor)),
950+
),
951+
}
952+
}
953+
954+
l.Debug().
955+
Int16("dp.source.type_id", goprms.SourceTypeID).
956+
Int("dp.geometry.count", len(req.LocationUuids)).
957+
Int("dp.observations.count", len(observations)).
958+
Msg("found observations")
959+
960+
return &pb.GetLatestObservationsResponse{
961+
Observations: observations,
943962
}, nil
944963
}
945964

internal/server/postgres/dataserverimpl_test.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,12 +1012,12 @@ func TestGetObservationsAsTimeseries(t *testing.T) {
10121012
}
10131013
}
10141014

1015-
func TestGetLatestObservation(t *testing.T) {
1015+
func TestGetLatestObservations(t *testing.T) {
10161016
pivotTime := time.Now().Truncate(time.Minute)
10171017

10181018
// Create a site to attach the observations to
10191019
siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{
1020-
LocationName: "test_get_latest_observation_site",
1020+
LocationName: "test_get_latest_observations_site_1",
10211021
GeometryWkt: "POINT(-20.25 57.5)",
10221022
EffectiveCapacityWatts: 1000000,
10231023
Metadata: &structpb.Struct{},
@@ -1029,7 +1029,7 @@ func TestGetLatestObservation(t *testing.T) {
10291029

10301030
// Create an observer to make the observations
10311031
obsResp, err := dc.CreateObserver(t.Context(), &pb.CreateObserverRequest{
1032-
Name: "test_get_latest_observation_observer",
1032+
Name: "test_get_latest_observations_observer",
10331033
})
10341034
require.NoError(t, err)
10351035

@@ -1053,56 +1053,60 @@ func TestGetLatestObservation(t *testing.T) {
10531053
require.NoError(t, err)
10541054

10551055
testcases := []struct {
1056-
name string
1057-
req *pb.GetLatestObservationRequest
1058-
expectedFraction float32
1056+
name string
1057+
req *pb.GetLatestObservationsRequest
1058+
expectedFractions []float32
10591059
}{
10601060
{
1061-
name: "Should get latest observation",
1062-
req: &pb.GetLatestObservationRequest{
1063-
LocationUuid: siteResp.LocationUuid,
1064-
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1065-
ObserverName: obsResp.ObserverName,
1061+
name: "Should get latest observations",
1062+
req: &pb.GetLatestObservationsRequest{
1063+
LocationUuids: []string{siteResp.LocationUuid},
1064+
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1065+
ObserverName: obsResp.ObserverName,
10661066
},
1067-
expectedFraction: 0.5,
1067+
expectedFractions: []float32{0.5},
10681068
},
10691069
{
10701070
name: "Should get earlier observation before cutoff",
1071-
req: &pb.GetLatestObservationRequest{
1072-
LocationUuid: siteResp.LocationUuid,
1071+
req: &pb.GetLatestObservationsRequest{
1072+
LocationUuids: []string{siteResp.LocationUuid},
10731073
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
10741074
ObserverName: obsResp.ObserverName,
10751075
PivotTimestampUtc: timestamppb.New(pivotTime.Add(-time.Hour * 1).Add(-time.Second)),
10761076
},
1077-
expectedFraction: 0.3,
1077+
expectedFractions: []float32{0.3},
10781078
},
10791079
{
1080-
name: "Shouldn't fetch for non-existent observer",
1081-
req: &pb.GetLatestObservationRequest{
1082-
LocationUuid: siteResp.LocationUuid,
1083-
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1084-
ObserverName: "non_existent_observer",
1080+
name: "Should fetch no rows for non-existent observer",
1081+
req: &pb.GetLatestObservationsRequest{
1082+
LocationUuids: []string{siteResp.LocationUuid},
1083+
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1084+
ObserverName: "non_existent_observer",
10851085
},
10861086
},
10871087
{
1088-
name: "Shouldn't fetch for non-existent location",
1089-
req: &pb.GetLatestObservationRequest{
1090-
LocationUuid: "non_existent_location",
1091-
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1092-
ObserverName: obsResp.ObserverName,
1088+
name: "Should fetch no rows for non-existent location",
1089+
req: &pb.GetLatestObservationsRequest{
1090+
LocationUuids: []string{uuid.New().String()},
1091+
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
1092+
ObserverName: obsResp.ObserverName,
10931093
},
10941094
},
10951095
}
10961096

10971097
for _, tc := range testcases {
10981098
t.Run(tc.name, func(t *testing.T) {
1099-
resp, err := dc.GetLatestObservation(t.Context(), tc.req)
1099+
resp, err := dc.GetLatestObservations(t.Context(), tc.req)
11001100
if strings.Contains(tc.name, "Shouldn't") {
11011101
require.Error(t, err)
11021102
} else {
11031103
require.NoError(t, err)
11041104
require.NotNil(t, resp)
1105-
require.Equal(t, tc.expectedFraction, resp.ValueFraction)
1105+
1106+
for i, obs := range resp.Observations {
1107+
t.Log(obs)
1108+
require.Equal(t, tc.expectedFractions[i], obs.ValueFraction)
1109+
}
11061110
}
11071111
})
11081112
}

internal/server/postgres/sql/queries/observations.sql

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -84,28 +84,43 @@ WHERE
8484
AND og.observation_timestamp_utc BETWEEN sqlc.arg(start_time_utc)::TIMESTAMP AND sqlc.arg(end_time_utc)::TIMESTAMP
8585
AND sh.sys_period @> og.observation_timestamp_utc;
8686

87-
-- name: GetLatestObservation :one
88-
/* GetLatestObservation gets the latest observation for a given location, source type, and observer.
89-
* The value is returned as a 16-bit integer, with 0 representing 0%
87+
-- name: GetLatestObservations :many
88+
/* GetLatestObservations gets the latest observations for a given location set, source type, and
89+
* observer. The value is returned as a 16-bit integer, with 0 representing 0%
9090
* and 30000 representing 100% of capacity.
9191
*/
92+
WITH ranked_observations AS (
93+
SELECT
94+
og.geometry_uuid,
95+
og.source_type_id,
96+
og.observation_timestamp_utc,
97+
og.value_sip,
98+
sh.capacity_limit_sip,
99+
sh.capacity,
100+
sh.capacity_unit_prefix_factor,
101+
ROW_NUMBER() OVER (
102+
PARTITION BY og.geometry_uuid, og.source_type_id, o.observer_uuid
103+
ORDER BY og.observation_timestamp_utc DESC
104+
) AS rn
105+
FROM obs.observed_generation_values AS og
106+
INNER JOIN loc.sources_mv AS sh USING (geometry_uuid, source_type_id)
107+
INNER JOIN obs.observers AS o USING (observer_uuid)
108+
WHERE
109+
og.geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID [])
110+
AND og.source_type_id = $1
111+
AND o.observer_name = LOWER(sqlc.arg(observer_name)::TEXT)
112+
AND sh.sys_period @> og.observation_timestamp_utc
113+
AND og.observation_timestamp_utc <= sqlc.arg(pivot_time_utc)::TIMESTAMP
114+
)
92115
SELECT
93-
og.geometry_uuid,
94-
og.source_type_id,
95-
og.observation_timestamp_utc,
96-
og.value_sip,
116+
geometry_uuid,
117+
source_type_id,
118+
observation_timestamp_utc,
119+
value_sip,
97120
COALESCE(
98-
sh.capacity_limit_sip::REAL * sh.capacity / 30000.0, sh.capacity::REAL
121+
capacity_limit_sip::REAL * capacity / 30000.0, capacity::REAL
99122
)::REAL AS capacity_inc_limit,
100-
sh.capacity_unit_prefix_factor
101-
FROM obs.observed_generation_values AS og
102-
INNER JOIN loc.sources_mv AS sh USING (geometry_uuid, source_type_id)
103-
INNER JOIN obs.observers AS o USING (observer_uuid)
104-
WHERE
105-
og.geometry_uuid = $1
106-
AND og.source_type_id = $2
107-
AND o.observer_name = LOWER(sqlc.arg(observer_name)::TEXT)
108-
AND sh.sys_period @> og.observation_timestamp_utc
109-
AND og.observation_timestamp_utc <= sqlc.arg(pivot_time_utc)::TIMESTAMP
110-
ORDER BY og.observation_timestamp_utc DESC
111-
LIMIT 1;
123+
capacity,
124+
capacity_unit_prefix_factor
125+
FROM ranked_observations
126+
WHERE rn = 1;

proto/ocf/dp/dp-data.messages.proto

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,10 +443,14 @@ message CreateObservationsRequest {
443443
message CreateObservationsResponse {}
444444

445445

446-
message GetLatestObservationRequest {
447-
string location_uuid = 1 [
448-
(buf.validate.field).required = true,
449-
(buf.validate.field).string.uuid = true
446+
message GetLatestObservationsRequest {
447+
repeated string location_uuids = 1 [
448+
(buf.validate.field).repeated.min_items = 1,
449+
(buf.validate.field).repeated.max_items = 100,
450+
(buf.validate.field).repeated.unique = true,
451+
(buf.validate.field).repeated.items = {
452+
string: {uuid: true}
453+
}
450454
];
451455
EnergySource energy_source = 2 [
452456
(buf.validate.field).required = true
@@ -463,10 +467,15 @@ message GetLatestObservationRequest {
463467
];
464468
}
465469

466-
message GetLatestObservationResponse {
467-
google.protobuf.Timestamp timestamp_utc = 1;
468-
float value_fraction = 2;
469-
uint64 effective_capacity_watts = 3;
470+
message GetLatestObservationsResponse {
471+
message Observation {
472+
string location_uuid = 1;
473+
google.protobuf.Timestamp timestamp_utc = 2;
474+
float value_fraction = 3;
475+
uint64 effective_capacity_watts = 4;
476+
}
477+
478+
repeated GetLatestObservationsResponse.Observation observations = 1;
470479
}
471480

472481

proto/ocf/dp/dp-data.service.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ service DataPlatformDataService {
4444
rpc ListObservers(ListObserversRequest) returns (ListObserversResponse) {}
4545
rpc CreateObservations(CreateObservationsRequest) returns (CreateObservationsResponse) {}
4646
/* GetLatestObservation fetches the most recent observation for a given location and observer. */
47-
rpc GetLatestObservation(GetLatestObservationRequest) returns (GetLatestObservationResponse) {}
47+
rpc GetLatestObservations(GetLatestObservationsRequest) returns (GetLatestObservationsResponse) {}
4848

4949
// --- Analysis ----------------------------------------------------------------
5050

0 commit comments

Comments
 (0)