-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathshared.go
More file actions
177 lines (156 loc) · 4.97 KB
/
shared.go
File metadata and controls
177 lines (156 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package observation is for V2 observation API
package shared
import (
"context"
"fmt"
"net/http"
pb "github.com/datacommonsorg/mixer/internal/proto"
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/server/placein"
"github.com/datacommonsorg/mixer/internal/server/resource"
"github.com/datacommonsorg/mixer/internal/store"
"github.com/datacommonsorg/mixer/internal/util"
"golang.org/x/sync/errgroup"
)
const (
LATEST = "LATEST"
)
// QueryType indicates the type of query requested, which dictates which
// cache we fetch from. It is used in Mixer usage logs.
type QueryType string
const (
// Fetches the value of statvars for given entities.
QueryTypeValue QueryType = "value"
// Fetches information about a given facet.
QueryTypeFacet QueryType = "facet"
// Checks if a given statVar has information for a given set of entities AND variables.
QueryTypeExistenceByVar QueryType = "existence-var"
// Checks if a given statVar has ANY statVars for a given set of entities
QueryTypeExistenceByEntity QueryType = "existence-entity"
// Calculates a value from other fetches.
QueryTypeDerived QueryType = "derived"
)
// Num of concurrent series to read at a time. Set this to prevent OOM issue.
const MaxSeries = 5000
// Max number of nodes to be requested
const MaxNodes = 5000
// For mocking in tests.
var (
getPlacesIn = placein.GetPlacesIn
fetchRemote = fetchRemoteWrapper
)
func fetchRemoteWrapper(
metadata *resource.Metadata,
httpClient *http.Client,
apiPath string,
remoteReq *pbv2.NodeRequest,
) (*pbv2.NodeResponse, error) {
remoteResp := &pbv2.NodeResponse{}
err := util.FetchRemote(metadata, httpClient, apiPath, remoteReq, remoteResp)
if err != nil {
return nil, err
}
return remoteResp, nil
}
func storeFetchChildPlaces(
ctx context.Context,
store *store.Store,
ancestor, childType string,
) (map[string][]string, error) {
return getPlacesIn(ctx, store, []string{ancestor}, childType)
}
func remoteMixerFetchChildPlaces(
metadata *resource.Metadata,
httpClient *http.Client,
ancestor, childType string,
) (*pbv2.NodeResponse, error) {
remoteReq := &pbv2.NodeRequest{
Nodes: []string{ancestor},
Property: fmt.Sprintf("<-containedInPlace+{typeOf:%s}", childType),
}
return fetchRemote(metadata, httpClient, "/v2/node", remoteReq)
}
// FetchChildPlaces fetches child places
func FetchChildPlaces(
ctx context.Context,
store *store.Store,
metadata *resource.Metadata,
httpClient *http.Client,
remoteMixer, ancestor, childType string,
) ([]string, error) {
errGroup, errCtx := errgroup.WithContext(ctx)
storeResponseChan := make(chan map[string][]string, 1)
remoteMixerResponseChan := make(chan *pbv2.NodeResponse, 1)
errGroup.Go(func() error {
storeResponse, err := storeFetchChildPlaces(errCtx, store, ancestor, childType)
if err != nil {
return err
}
storeResponseChan <- storeResponse
return nil
})
if remoteMixer != "" {
errGroup.Go(func() error {
remoteMixerResponse, err := remoteMixerFetchChildPlaces(metadata, httpClient, ancestor, childType)
if err != nil {
return err
}
remoteMixerResponseChan <- remoteMixerResponse
return nil
})
} else {
remoteMixerResponseChan <- nil
}
if err := errGroup.Wait(); err != nil {
return nil, err
}
close(storeResponseChan)
close(remoteMixerResponseChan)
childPlacesMap := <-storeResponseChan
remoteResp := <-remoteMixerResponseChan
childPlaces := childPlacesMap[ancestor]
// V2 API should always ensure data merging.
// Here needs to fetch both local PlacesIn and remote PlacesIn data
if remoteResp != nil {
if g, ok := remoteResp.Data[ancestor]; ok {
for _, arc := range g.Arcs {
for _, node := range arc.Nodes {
childPlaces = append(childPlaces, node.Dcid)
}
}
}
}
return childPlaces, nil
}
// TrimObservationsResponse removes entities with no observations from the response.
func TrimObservationsResponse(resp *pbv2.ObservationResponse) *pbv2.ObservationResponse {
result := &pbv2.ObservationResponse{
ByVariable: map[string]*pbv2.VariableObservation{},
Facets: map[string]*pb.Facet{},
}
for variable, variableData := range resp.ByVariable {
for entity, entityData := range variableData.ByEntity {
if len(entityData.OrderedFacets) == 0 {
delete(variableData.ByEntity, entity)
}
}
result.ByVariable[variable] = variableData
}
for facet, res := range resp.Facets {
result.Facets[facet] = res
}
return result
}