Skip to content

Commit defb3f8

Browse files
Add BulkVariableInfo to dispatcher (#1813)
This PR adds BulkVariableInfo to the dispatcher and corresponding datasources It currently only supports the Spanner implementation, which calls GetProvenanceSummary This produces a BulkVariableInfoResponse Note: Not all fields are populated. This only fills a subset of fields required by new frontend design for StatVarExplorer In particular, this excludes - PlaceTypeSummary (both top level and per series) - Provenance release_frequency - In the case of the remote datasource these fields will be dropped from the merged response. As a follow up I'll deprecate them. (Eventually remote datasource will move to v2 call which will not include these) --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 358fdbe commit defb3f8

File tree

20 files changed

+240
-30
lines changed

20 files changed

+240
-30
lines changed

internal/merger/merger.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"slices"
2525
"sort"
2626
"strconv"
27+
"strings"
2728

2829
pb "github.com/datacommonsorg/mixer/internal/proto"
2930
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
@@ -638,3 +639,44 @@ func MergeMultiQueryResponse(allResp []*pb.QueryResponse, orderby string, asc bo
638639

639640
return merged, nil
640641
}
642+
643+
// MergeMultiBulkVariableInfo merges multiple BulkVariableInfoResponses.
644+
func MergeMultiBulkVariableInfo(allResp []*pbv1.BulkVariableInfoResponse) *pbv1.BulkVariableInfoResponse {
645+
if len(allResp) == 0 {
646+
return &pbv1.BulkVariableInfoResponse{}
647+
}
648+
mergedSummaries := map[string]*pb.StatVarSummary{}
649+
for _, resp := range allResp {
650+
if resp == nil {
651+
continue
652+
}
653+
for _, item := range resp.GetData() {
654+
if item == nil || item.Info == nil {
655+
continue
656+
}
657+
summary, ok := mergedSummaries[item.Node]
658+
if !ok {
659+
summary = &pb.StatVarSummary{
660+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{},
661+
}
662+
mergedSummaries[item.Node] = summary
663+
}
664+
for provId, provSummary := range item.Info.ProvenanceSummary {
665+
summary.ProvenanceSummary[provId] = provSummary
666+
}
667+
}
668+
}
669+
merged := &pbv1.BulkVariableInfoResponse{
670+
Data: make([]*pbv1.VariableInfoResponse, 0, len(mergedSummaries)),
671+
}
672+
for node, summary := range mergedSummaries {
673+
merged.Data = append(merged.Data, &pbv1.VariableInfoResponse{
674+
Node: node,
675+
Info: summary,
676+
})
677+
}
678+
slices.SortFunc(merged.Data, func(a, b *pbv1.VariableInfoResponse) int {
679+
return strings.Compare(a.Node, b.Node)
680+
})
681+
return merged
682+
}

internal/merger/merger_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,3 +2051,78 @@ func TestMergeMultiQueryResponse(t *testing.T) {
20512051
}
20522052
}
20532053
}
2054+
2055+
func TestMergeMultiBulkVariableInfo(t *testing.T) {
2056+
cmpOpts := cmp.Options{protocmp.Transform()}
2057+
for _, c := range []struct {
2058+
allResp []*pbv1.BulkVariableInfoResponse
2059+
want *pbv1.BulkVariableInfoResponse
2060+
}{
2061+
{
2062+
[]*pbv1.BulkVariableInfoResponse{
2063+
{
2064+
Data: []*pbv1.VariableInfoResponse{
2065+
{
2066+
Node: "v1",
2067+
Info: &pb.StatVarSummary{
2068+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{
2069+
"prov1": {ImportName: "import1"},
2070+
"prov2": {ImportName: "import2"},
2071+
},
2072+
},
2073+
},
2074+
},
2075+
},
2076+
{
2077+
Data: []*pbv1.VariableInfoResponse{
2078+
{
2079+
Node: "v1",
2080+
Info: &pb.StatVarSummary{
2081+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{
2082+
"prov3": {ImportName: "import3"},
2083+
},
2084+
},
2085+
},
2086+
{
2087+
Node: "v2",
2088+
Info: &pb.StatVarSummary{
2089+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{
2090+
"prov1": {ImportName: "import1"},
2091+
"prov4": {ImportName: "import4"},
2092+
},
2093+
},
2094+
},
2095+
},
2096+
},
2097+
},
2098+
&pbv1.BulkVariableInfoResponse{
2099+
Data: []*pbv1.VariableInfoResponse{
2100+
{
2101+
Node: "v1",
2102+
Info: &pb.StatVarSummary{
2103+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{
2104+
"prov1": {ImportName: "import1"},
2105+
"prov2": {ImportName: "import2"},
2106+
"prov3": {ImportName: "import3"},
2107+
},
2108+
},
2109+
},
2110+
{
2111+
Node: "v2",
2112+
Info: &pb.StatVarSummary{
2113+
ProvenanceSummary: map[string]*pb.StatVarSummary_ProvenanceSummary{
2114+
"prov1": {ImportName: "import1"},
2115+
"prov4": {ImportName: "import4"},
2116+
},
2117+
},
2118+
},
2119+
},
2120+
},
2121+
},
2122+
} {
2123+
got := MergeMultiBulkVariableInfo(c.allResp)
2124+
if diff := cmp.Diff(got, c.want, cmpOpts); diff != "" {
2125+
t.Errorf("MergeMultiBulkVariableInfo(%v) got diff: %s", c.allResp, diff)
2126+
}
2127+
}
2128+
}

internal/server/datasource/datasource.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919

2020
pb "github.com/datacommonsorg/mixer/internal/proto"
21+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2122
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2223
)
2324

@@ -41,4 +42,5 @@ type DataSource interface {
4142
Resolve(context.Context, *pbv2.ResolveRequest) (*pbv2.ResolveResponse, error)
4243
Sparql(context.Context, *pb.SparqlRequest) (*pb.QueryResponse, error)
4344
Event(context.Context, *pbv2.EventRequest) (*pbv2.EventResponse, error)
45+
BulkVariableInfo(context.Context, *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error)
4446
}

internal/server/datasources/datasources.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/datacommonsorg/mixer/internal/merger"
2121
pb "github.com/datacommonsorg/mixer/internal/proto"
22+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2223
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2324
"github.com/datacommonsorg/mixer/internal/server/datasource"
2425
"github.com/datacommonsorg/mixer/internal/translator/sparql"
@@ -145,3 +146,14 @@ func (ds *DataSources) Event(ctx context.Context, in *pbv2.EventRequest) (*pbv2.
145146
},
146147
)
147148
}
149+
150+
func (ds *DataSources) BulkVariableInfo(ctx context.Context, in *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
151+
return fetchAndMerge(ctx, ds.sources, in,
152+
func(c context.Context, s datasource.DataSource, r *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
153+
return s.BulkVariableInfo(c, r)
154+
},
155+
func(all []*pbv1.BulkVariableInfoResponse) (*pbv1.BulkVariableInfoResponse, error) {
156+
return merger.MergeMultiBulkVariableInfo(all), nil
157+
},
158+
)
159+
}

internal/server/dispatcher/dispatcher.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,21 @@ import (
2222
"google.golang.org/protobuf/proto"
2323

2424
pb "github.com/datacommonsorg/mixer/internal/proto"
25+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2526
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2627
)
2728

2829
// RequestType represents the type of request.
2930
type RequestType string
3031

3132
const (
32-
TypeNode RequestType = "Node"
33-
TypeNodeSearch RequestType = "NodeSearch"
34-
TypeObservation RequestType = "Observation"
35-
TypeResolve RequestType = "Resolve"
36-
TypeSparql RequestType = "Sparql"
37-
TypeEvent RequestType = "Event"
33+
TypeNode RequestType = "Node"
34+
TypeNodeSearch RequestType = "NodeSearch"
35+
TypeObservation RequestType = "Observation"
36+
TypeResolve RequestType = "Resolve"
37+
TypeSparql RequestType = "Sparql"
38+
TypeEvent RequestType = "Event"
39+
TypeBulkVariableInfo RequestType = "BulkVariableInfo"
3840
)
3941

4042
// RequestContext holds the context for a given request.
@@ -210,6 +212,19 @@ func (dispatcher *Dispatcher) Event(ctx context.Context, in *pbv2.EventRequest)
210212
return response.(*pbv2.EventResponse), nil
211213
}
212214

215+
func (dispatcher *Dispatcher) BulkVariableInfo(ctx context.Context, in *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
216+
requestContext := newRequestContext(ctx, in, TypeBulkVariableInfo)
217+
218+
response, err := dispatcher.handle(requestContext, func(ctx context.Context, request proto.Message) (proto.Message, error) {
219+
return dispatcher.sources.BulkVariableInfo(ctx, request.(*pbv1.BulkVariableInfoRequest))
220+
})
221+
222+
if err != nil {
223+
return nil, err
224+
}
225+
return response.(*pbv1.BulkVariableInfoResponse), nil
226+
}
227+
213228
func newRequestContext(ctx context.Context, request proto.Message, requestType RequestType) *RequestContext {
214229
return &RequestContext{
215230
Context: ctx,

internal/server/redis/processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"log/slog"
2121

2222
pb "github.com/datacommonsorg/mixer/internal/proto"
23+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2324
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2425
"github.com/datacommonsorg/mixer/internal/server/dispatcher"
2526
"github.com/datacommonsorg/mixer/internal/util"
@@ -87,6 +88,8 @@ func newEmptyResponse(requestType dispatcher.RequestType) (proto.Message, error)
8788
return &pb.QueryResponse{}, nil
8889
case dispatcher.TypeEvent:
8990
return &pbv2.EventResponse{}, nil
91+
case dispatcher.TypeBulkVariableInfo:
92+
return &pbv1.BulkVariableInfoResponse{}, nil
9093
default:
9194
return nil, fmt.Errorf("unknown request type for caching: %v", requestType)
9295
}

internal/server/remote/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"net/http"
2121

2222
pb "github.com/datacommonsorg/mixer/internal/proto"
23+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2324
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2425
"github.com/datacommonsorg/mixer/internal/server/resource"
2526

@@ -109,3 +110,13 @@ func (rc *RemoteClient) Event(req *pbv2.EventRequest) (*pbv2.EventResponse, erro
109110
}
110111
return resp, nil
111112
}
113+
114+
func (rc *RemoteClient) BulkVariableInfo(req *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
115+
resp := &pbv1.BulkVariableInfoResponse{}
116+
// TODO: Update the endpoint to /v2/bulk/info/variable once it's supported by the remote mixer.
117+
err := util.FetchRemote(rc.metadata, rc.httpClient, "/v1/bulk/info/variable", req, resp)
118+
if err != nil {
119+
return nil, err
120+
}
121+
return resp, nil
122+
}

internal/server/remote/datasource.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020

2121
pb "github.com/datacommonsorg/mixer/internal/proto"
22+
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2223
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
2324
"github.com/datacommonsorg/mixer/internal/server/datasource"
2425
)
@@ -66,3 +67,7 @@ func (rds *RemoteDataSource) Sparql(ctx context.Context, req *pb.SparqlRequest)
6667
func (rds *RemoteDataSource) Event(ctx context.Context, req *pbv2.EventRequest) (*pbv2.EventResponse, error) {
6768
return rds.client.Event(req)
6869
}
70+
71+
func (rds *RemoteDataSource) BulkVariableInfo(ctx context.Context, req *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
72+
return rds.client.BulkVariableInfo(req)
73+
}

internal/server/spanner/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type SpannerClient interface {
4040
ResolveByID(ctx context.Context, nodes []string, in, out string) (map[string][]string, error)
4141
GetEventCollectionDate(ctx context.Context, placeID, eventType string) ([]string, error)
4242
Sparql(ctx context.Context, nodes []types.Node, queries []*types.Query, opts *types.QueryOptions) ([][]string, error)
43-
GetVariableMetadata(ctx context.Context, ids []string) (map[string][]*pb.StatVarSummary_ProvenanceSummary, error)
43+
GetProvenanceSummary(ctx context.Context, ids []string) (map[string]map[string]*pb.StatVarSummary_ProvenanceSummary, error)
4444
Id() string
4545
Start()
4646
Close()

internal/server/spanner/datasource.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,3 +387,11 @@ func (sds *SpannerDataSource) handleEventCollectionDate(ctx context.Context, req
387387
},
388388
}, nil
389389
}
390+
391+
func (sds *SpannerDataSource) BulkVariableInfo(ctx context.Context, req *pbv1.BulkVariableInfoRequest) (*pbv1.BulkVariableInfoResponse, error) {
392+
metadata, err := sds.client.GetProvenanceSummary(ctx, req.GetNodes())
393+
if err != nil {
394+
return nil, fmt.Errorf("error getting variable metadata from Spanner: %v", err)
395+
}
396+
return generateBulkVariableInfoResponse(metadata), nil
397+
}

0 commit comments

Comments
 (0)