Skip to content

Commit ac8d457

Browse files
committed
Support BoundedTrie metric for Prism Runner
1 parent fb80584 commit ac8d457

File tree

3 files changed

+352
-1
lines changed

3 files changed

+352
-1
lines changed

sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin
183183
getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator {
184184
return &stringSet{set: map[string]struct{}{}}
185185
},
186+
getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE): func() metricAccumulator {
187+
return &boundedTrie{}
188+
},
186189
getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} },
187190
}
188191

@@ -468,6 +471,314 @@ func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo {
468471
}
469472
}
470473

474+
type boundedTrie struct {
475+
data *boundedTrieData
476+
}
477+
478+
func (b *boundedTrie) accumulate(pyld []byte) error {
479+
bt := &pipepb.BoundedTrie{}
480+
if err := proto.Unmarshal(pyld, bt); err != nil {
481+
return err
482+
}
483+
incoming := boundedTrieDataFromProto(bt)
484+
if incoming.isEmpty() {
485+
return nil
486+
}
487+
if b.data == nil || b.data.isEmpty() {
488+
b.data = incoming
489+
return nil
490+
}
491+
b.data = b.data.combine(incoming)
492+
return nil
493+
}
494+
495+
func (b *boundedTrie) toProto(key metricKey) *pipepb.MonitoringInfo {
496+
if b.data == nil {
497+
return &pipepb.MonitoringInfo{
498+
Urn: key.Urn(),
499+
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE),
500+
Labels: key.Labels(),
501+
}
502+
}
503+
payload, err := proto.Marshal(b.data.toProto())
504+
if err != nil {
505+
panic(fmt.Sprintf("error encoding bounded trie metric: %v", err))
506+
}
507+
return &pipepb.MonitoringInfo{
508+
Urn: key.Urn(),
509+
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE),
510+
Payload: payload,
511+
Labels: key.Labels(),
512+
}
513+
}
514+
515+
type boundedTrieNode struct {
516+
size int
517+
truncated bool
518+
children map[string]*boundedTrieNode
519+
}
520+
521+
func newBoundedTrieNode() *boundedTrieNode {
522+
return &boundedTrieNode{
523+
size: 1,
524+
children: map[string]*boundedTrieNode{},
525+
}
526+
}
527+
528+
func boundedTrieNodeFromProto(protoNode *pipepb.BoundedTrieNode) *boundedTrieNode {
529+
node := newBoundedTrieNode()
530+
if protoNode.GetTruncated() {
531+
node.truncated = true
532+
node.children = nil
533+
return node
534+
}
535+
children := protoNode.GetChildren()
536+
if len(children) == 0 {
537+
return node
538+
}
539+
node.children = make(map[string]*boundedTrieNode, len(children))
540+
total := 0
541+
for prefix, child := range children {
542+
c := boundedTrieNodeFromProto(child)
543+
node.children[prefix] = c
544+
total += c.size
545+
}
546+
if total > 1 {
547+
node.size = total
548+
}
549+
return node
550+
}
551+
552+
func (n *boundedTrieNode) clone() *boundedTrieNode {
553+
if n == nil {
554+
return nil
555+
}
556+
clone := &boundedTrieNode{
557+
size: n.size,
558+
truncated: n.truncated,
559+
}
560+
if n.children != nil {
561+
clone.children = make(map[string]*boundedTrieNode, len(n.children))
562+
for k, child := range n.children {
563+
clone.children[k] = child.clone()
564+
}
565+
}
566+
return clone
567+
}
568+
569+
func (n *boundedTrieNode) add(segments []string) int {
570+
if n.truncated || len(segments) == 0 {
571+
return 0
572+
}
573+
if n.children == nil {
574+
n.children = make(map[string]*boundedTrieNode)
575+
}
576+
head := segments[0]
577+
tail := segments[1:]
578+
wasEmpty := len(n.children) == 0
579+
child, ok := n.children[head]
580+
if !ok {
581+
child = newBoundedTrieNode()
582+
n.children[head] = child
583+
}
584+
delta := 0
585+
if !ok {
586+
if wasEmpty {
587+
delta = 0
588+
} else {
589+
delta = 1
590+
}
591+
}
592+
if len(tail) > 0 {
593+
delta += child.add(tail)
594+
}
595+
n.size += delta
596+
return delta
597+
}
598+
599+
func (n *boundedTrieNode) merge(other *boundedTrieNode) int {
600+
if n.truncated || other == nil {
601+
return 0
602+
}
603+
if other.truncated {
604+
delta := 1 - n.size
605+
n.truncated = true
606+
n.children = nil
607+
n.size += delta
608+
return delta
609+
}
610+
if len(other.children) == 0 {
611+
return 0
612+
}
613+
if n.children == nil || len(n.children) == 0 {
614+
if n.children == nil {
615+
n.children = make(map[string]*boundedTrieNode, len(other.children))
616+
}
617+
delta := other.size - n.size
618+
for prefix, child := range other.children {
619+
n.children[prefix] = child.clone()
620+
}
621+
n.size += delta
622+
return delta
623+
}
624+
delta := 0
625+
for prefix, otherChild := range other.children {
626+
if n.children == nil {
627+
n.children = make(map[string]*boundedTrieNode)
628+
}
629+
if selfChild, ok := n.children[prefix]; ok {
630+
delta += selfChild.merge(otherChild.clone())
631+
} else {
632+
cloned := otherChild.clone()
633+
n.children[prefix] = cloned
634+
delta += cloned.size
635+
}
636+
}
637+
n.size += delta
638+
return delta
639+
}
640+
641+
func (n *boundedTrieNode) trim() int {
642+
if n.children == nil || len(n.children) == 0 {
643+
return 0
644+
}
645+
var maxChild *boundedTrieNode
646+
for _, child := range n.children {
647+
if maxChild == nil || child.size > maxChild.size {
648+
maxChild = child
649+
}
650+
}
651+
if maxChild == nil {
652+
return 0
653+
}
654+
var delta int
655+
if maxChild.size == 1 {
656+
delta = 1 - n.size
657+
n.truncated = true
658+
n.children = nil
659+
} else {
660+
delta = maxChild.trim()
661+
}
662+
n.size += delta
663+
return delta
664+
}
665+
666+
func (n *boundedTrieNode) toProto() *pipepb.BoundedTrieNode {
667+
if n == nil {
668+
return nil
669+
}
670+
protoNode := &pipepb.BoundedTrieNode{
671+
Truncated: n.truncated,
672+
}
673+
if !n.truncated && len(n.children) > 0 {
674+
protoNode.Children = make(map[string]*pipepb.BoundedTrieNode, len(n.children))
675+
for prefix, child := range n.children {
676+
protoNode.Children[prefix] = child.toProto()
677+
}
678+
}
679+
return protoNode
680+
}
681+
682+
type boundedTrieData struct {
683+
singleton []string
684+
root *boundedTrieNode
685+
bound int32
686+
}
687+
688+
func boundedTrieDataFromProto(protoData *pipepb.BoundedTrie) *boundedTrieData {
689+
if protoData == nil {
690+
return &boundedTrieData{}
691+
}
692+
data := &boundedTrieData{
693+
bound: protoData.GetBound(),
694+
}
695+
if len(protoData.GetSingleton()) > 0 {
696+
data.singleton = append([]string(nil), protoData.GetSingleton()...)
697+
}
698+
if protoData.GetRoot() != nil {
699+
data.root = boundedTrieNodeFromProto(protoData.GetRoot())
700+
}
701+
return data
702+
}
703+
704+
func (d *boundedTrieData) isEmpty() bool {
705+
return d == nil || (d.root == nil && len(d.singleton) == 0)
706+
}
707+
708+
func (d *boundedTrieData) clone() *boundedTrieData {
709+
if d == nil {
710+
return &boundedTrieData{}
711+
}
712+
clone := &boundedTrieData{
713+
bound: d.bound,
714+
}
715+
if len(d.singleton) > 0 {
716+
clone.singleton = append([]string(nil), d.singleton...)
717+
}
718+
if d.root != nil {
719+
clone.root = d.root.clone()
720+
}
721+
return clone
722+
}
723+
724+
func (d *boundedTrieData) asTrie() *boundedTrieNode {
725+
if d.root != nil {
726+
return d.root
727+
}
728+
root := newBoundedTrieNode()
729+
if len(d.singleton) > 0 {
730+
root.add(d.singleton)
731+
}
732+
return root
733+
}
734+
735+
func (d *boundedTrieData) combine(other *boundedTrieData) *boundedTrieData {
736+
if d == nil || d.isEmpty() {
737+
return other.clone()
738+
}
739+
if other == nil || other.isEmpty() {
740+
return d.clone()
741+
}
742+
primary := d
743+
secondary := other
744+
if primary.root == nil && secondary.root != nil {
745+
primary, secondary = secondary, primary
746+
}
747+
combined := primary.asTrie().clone()
748+
if secondary.root != nil {
749+
combined.merge(secondary.root.clone())
750+
} else if len(secondary.singleton) > 0 {
751+
combined.add(secondary.singleton)
752+
}
753+
bound := primary.bound
754+
if bound == 0 || (secondary.bound > 0 && secondary.bound < bound) {
755+
bound = secondary.bound
756+
}
757+
if bound > 0 {
758+
for combined.size > int(bound) {
759+
combined.trim()
760+
}
761+
}
762+
return &boundedTrieData{
763+
root: combined,
764+
bound: bound,
765+
}
766+
}
767+
768+
func (d *boundedTrieData) toProto() *pipepb.BoundedTrie {
769+
if d == nil {
770+
return &pipepb.BoundedTrie{}
771+
}
772+
protoData := &pipepb.BoundedTrie{Bound: d.bound}
773+
if len(d.singleton) > 0 {
774+
protoData.Singleton = append([]string(nil), d.singleton...)
775+
}
776+
if d.root != nil {
777+
protoData.Root = d.root.toProto()
778+
}
779+
return protoData
780+
}
781+
471782
type durability int
472783

473784
const (

sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipep
5353
return info
5454
}
5555

56+
func boundedTriePayload(t testing.TB, trie *pipepb.BoundedTrie) []byte {
57+
t.Helper()
58+
bytes, err := proto.Marshal(trie)
59+
if err != nil {
60+
t.Fatalf("failed to marshal bounded trie: %v", err)
61+
}
62+
return bytes
63+
}
64+
5665
// This test validates that multiple contributions are correctly summed up and accumulated.
5766
func Test_metricsStore_ContributeMetrics(t *testing.T) {
5867

@@ -166,6 +175,37 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
166175
want: []*pipepb.MonitoringInfo{
167176
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}),
168177
},
178+
}, {
179+
name: "boundedTrie",
180+
input: []map[string][]byte{
181+
{"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"a"}})},
182+
{"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"z", "z", "z"}})},
183+
},
184+
shortIDs: map[string]*pipepb.MonitoringInfo{
185+
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE),
186+
},
187+
want: []*pipepb.MonitoringInfo{
188+
makeInfoWBytes(
189+
pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE,
190+
boundedTriePayload(t, &pipepb.BoundedTrie{
191+
Bound: 100,
192+
Root: &pipepb.BoundedTrieNode{
193+
Children: map[string]*pipepb.BoundedTrieNode{
194+
"a": {},
195+
"z": {
196+
Children: map[string]*pipepb.BoundedTrieNode{
197+
"z": {
198+
Children: map[string]*pipepb.BoundedTrieNode{
199+
"z": {},
200+
},
201+
},
202+
},
203+
},
204+
},
205+
},
206+
}),
207+
),
208+
},
169209
},
170210
}
171211

sdks/python/apache_beam/runners/portability/prism_runner_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def test_custom_window_type(self):
232232
" https://github.com/apache/beam/issues/31921")
233233

234234
def test_metrics(self):
235-
super().test_metrics(check_bounded_trie=False)
235+
super().test_metrics(check_bounded_trie=True)
236236

237237
def construct_timestamped(k, t):
238238
return window.TimestampedValue((k, t), t)

0 commit comments

Comments
 (0)