diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index bbbdfd1eba4f..fbe18ff44049 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -183,6 +183,15 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator { return &stringSet{set: map[string]struct{}{}} }, + getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE): func() metricAccumulator { + return &boundedTrie{ + data: &boundedTrieData{ + singleton: []string{}, + root: nil, + bound: 100, // Default maximum size of the trie + }, + } + }, getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} }, } @@ -468,6 +477,273 @@ func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo { } } +type boundedTrie struct { + data *boundedTrieData +} + +func (b *boundedTrie) accumulate(pyld []byte) error { + bt := &pipepb.BoundedTrie{} + if err := proto.Unmarshal(pyld, bt); err != nil { + return err + } + + incoming := boundedTrieDataFromProto(bt) + if incoming.isEmpty() { + return nil + } + + if b.data.isEmpty() { + b.data = incoming + return nil + } + b.data = b.data.combine(incoming) + return nil +} + +func (b *boundedTrie) toProto(key metricKey) *pipepb.MonitoringInfo { + payload, err := proto.MarshalOptions{Deterministic: true}.Marshal(b.data.toProto()) + if err != nil { + panic(fmt.Sprintf("error encoding bounded trie metric: %v", err)) + } + return &pipepb.MonitoringInfo{ + Urn: key.Urn(), + Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE), + Payload: payload, + Labels: key.Labels(), + } +} + +type boundedTrieNode struct { + size int32 + truncated bool + children map[string]*boundedTrieNode +} + +func newBoundedTrieNode() *boundedTrieNode { + return &boundedTrieNode{ + size: 1, + truncated: false, + children: map[string]*boundedTrieNode{}, + } +} + +func boundedTrieNodeFromProto(protoNode *pipepb.BoundedTrieNode) *boundedTrieNode { + node := newBoundedTrieNode() + if protoNode.GetTruncated() { + node.truncated = true + return node + } + children := protoNode.GetChildren() + if len(children) == 0 { + return node + } + + node.children = make(map[string]*boundedTrieNode, len(children)) + var total int32 = 0 + for prefix, child := range children { + c := boundedTrieNodeFromProto(child) + node.children[prefix] = c + total += c.size + } + if total > 1 { + node.size = total + } + return node +} + +func (n *boundedTrieNode) clone() *boundedTrieNode { + clone := &boundedTrieNode{ + size: n.size, + truncated: n.truncated, + } + clone.children = make(map[string]*boundedTrieNode, len(n.children)) + for k, child := range n.children { + clone.children[k] = child.clone() + } + return clone +} + +func (n *boundedTrieNode) add(segments []string) int32 { + if n.truncated || len(segments) == 0 { + return 0 + } + + var delta int32 = 0 + head := segments[0] + tail := segments[1:] + wasEmpty := len(n.children) == 0 + child, ok := n.children[head] + if !ok { + child = newBoundedTrieNode() + n.children[head] = child + + if wasEmpty { + delta = 0 + } else { + delta = 1 + } + } + + if len(tail) > 0 { + delta += child.add(tail) + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) merge(other *boundedTrieNode) int32 { + if n.truncated { + return 0 + } + if other.truncated { + delta := 1 - n.size + n.truncated = true + n.children = map[string]*boundedTrieNode{} + n.size += delta + return delta + } + + if len(other.children) == 0 { + return 0 + } + if len(n.children) == 0 { + delta := other.size - n.size + for prefix, child := range other.children { + n.children[prefix] = child + } + n.size += delta + return delta + } + + var delta int32 = 0 + for prefix, otherChild := range other.children { + if selfChild, ok := n.children[prefix]; ok { + delta += selfChild.merge(otherChild) + } else { + n.children[prefix] = otherChild + delta += otherChild.size + } + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) trim() int32 { + if len(n.children) == 0 { + return 0 + } + + var maxChild *boundedTrieNode + for _, child := range n.children { + if maxChild == nil || child.size > maxChild.size { + maxChild = child + } + } + + var delta int32 + if maxChild.size == 1 { + delta = 1 - n.size + n.truncated = true + n.children = map[string]*boundedTrieNode{} + } else { + delta = maxChild.trim() + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) toProto() *pipepb.BoundedTrieNode { + protoNode := &pipepb.BoundedTrieNode{ + Truncated: n.truncated, + } + if !n.truncated && len(n.children) > 0 { + protoNode.Children = make(map[string]*pipepb.BoundedTrieNode, len(n.children)) + for prefix, child := range n.children { + protoNode.Children[prefix] = child.toProto() + } + } + return protoNode +} + +type boundedTrieData struct { + singleton []string + root *boundedTrieNode + bound int32 +} + +func boundedTrieDataFromProto(protoData *pipepb.BoundedTrie) *boundedTrieData { + data := &boundedTrieData{ + bound: protoData.GetBound(), + } + if len(protoData.GetSingleton()) > 0 { + data.singleton = append(data.singleton, protoData.GetSingleton()...) + } + if protoData.GetRoot() != nil { + data.root = boundedTrieNodeFromProto(protoData.GetRoot()) + } + return data +} + +func (d *boundedTrieData) isEmpty() bool { + return d.root == nil && len(d.singleton) == 0 +} + +func (d *boundedTrieData) clone() *boundedTrieData { + clone := &boundedTrieData{ + bound: d.bound, + } + if len(d.singleton) > 0 { + clone.singleton = append(clone.singleton, d.singleton...) + } + if d.root != nil { + clone.root = d.root.clone() + } + return clone +} + +func (d *boundedTrieData) asTrie() *boundedTrieNode { + if d.root != nil { + return d.root + } + + root := newBoundedTrieNode() + if len(d.singleton) > 0 { + root.add(d.singleton) + } + return root +} + +func (d *boundedTrieData) combine(other *boundedTrieData) *boundedTrieData { + if d.isEmpty() { + return other.clone() + } + if other == nil || other.isEmpty() { + return d.clone() + } + + combined := d.asTrie().clone() + combined.merge(other.asTrie()) + bound := int32(math.Min(float64(d.bound), float64(other.bound))) + for combined.size > bound { + combined.trim() + } + return &boundedTrieData{ + root: combined, + bound: bound, + } +} + +func (d *boundedTrieData) toProto() *pipepb.BoundedTrie { + protoData := &pipepb.BoundedTrie{Bound: d.bound} + if len(d.singleton) > 0 { + protoData.Singleton = append(protoData.Singleton, d.singleton...) + } + if d.root != nil { + protoData.Root = d.root.toProto() + } + return protoData +} + type durability int const ( diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go index 339d862292fd..58f98c79f4bf 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go @@ -53,6 +53,15 @@ func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipep return info } +func boundedTriePayload(t testing.TB, trie *pipepb.BoundedTrie) []byte { + t.Helper() + bytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(trie) + if err != nil { + t.Fatalf("failed to marshal bounded trie: %v", err) + } + return bytes +} + // This test validates that multiple contributions are correctly summed up and accumulated. func Test_metricsStore_ContributeMetrics(t *testing.T) { @@ -166,6 +175,37 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) { want: []*pipepb.MonitoringInfo{ makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}), }, + }, { + name: "boundedTrie", + input: []map[string][]byte{ + {"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"a"}})}, + {"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"z", "z", "z"}})}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE), + }, + want: []*pipepb.MonitoringInfo{ + makeInfoWBytes( + pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE, + boundedTriePayload(t, &pipepb.BoundedTrie{ + Bound: 100, + Root: &pipepb.BoundedTrieNode{ + Children: map[string]*pipepb.BoundedTrieNode{ + "a": {}, + "z": { + Children: map[string]*pipepb.BoundedTrieNode{ + "z": { + Children: map[string]*pipepb.BoundedTrieNode{ + "z": {}, + }, + }, + }, + }, + }, + }, + }), + ), + }, }, } diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index a65f9a9960b4..67710eb8ace2 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -232,7 +232,7 @@ def test_custom_window_type(self): " https://github.com/apache/beam/issues/31921") def test_metrics(self): - super().test_metrics(check_bounded_trie=False) + super().test_metrics(check_bounded_trie=True) def construct_timestamped(k, t): return window.TimestampedValue((k, t), t)