Skip to content

Commit 810b373

Browse files
Add CHASM Visibility registration and task processing (#8491)
## What changed? Adds CHASM search attribute provider and mapper implementation. To use CHASM search attributes, embed your component with ComponentSearchAttributesProvider and register the search attributes to the CHASM Registry. To define a component search attribute, define a SearchAttribute as a package/global scoped variable (CHASM search attributes should reference exported variables like `SearchAttributeFieldInt01`, `SearchAttributeFieldDateTime01`: ``` var ( TestComponentStartTimeSearchAttribute = NewSearchAttributeTime(testComponentStartTimeSAKey, SearchAttributeFieldDateTime01) _ VisibilitySearchAttributesProvider = (*TestComponent)(nil) _ VisibilityMemoProvider = (*TestComponent)(nil) _ VisibilitySearchAttributesMapper = (*TestComponent)(nil) ) ``` To update SearchAttributes during an execution of a CHASM archetype, implement the provider `SearchAttributes` in the root component, and call the `NewValue` method on a SearchAttribute within the Provider method to generate a new SearchAttributeValue. On SearchAttribute updates during CHASM transactions, the framework will detect changes and submit a Visibility task for persistence. ``` type TestComponent struct { UnimplementedComponent ... Visibility Field[*Visibility] } func (tc *TestComponent) SearchAttributes(ctx Context) []SearchAttributeValue { return []SearchAttributeValue{ TestComponentStartTimeSearchAttribute.NewValue(time) } } ``` To register the search attribute with the CHASM Registry, `WithSearchAttributes` must be passed as a RegistrableComponentOption to the root component of the library. This is required to support Visibility queries. ``` return []*chasm.RegistrableComponent{ chasm.NewRegistrableComponent[*Scheduler]("scheduler", WIthSearchAttributes([]SearchAttributeDefinition{TestComponentStartTimeSearchAttribute}), chasm.NewRegistrableComponent[*Generator]("generator"), chasm.NewRegistrableComponent[*Invoker]("invoker"), chasm.NewRegistrableComponent[*Backfiller]("backfiller"), } ``` ## Why? Required to support CHASM Search Attributes. ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s) ## Potential risks TBD --------- Co-authored-by: Rodrigo Zhou <[email protected]>
1 parent 4eafd4a commit 810b373

21 files changed

+592
-53
lines changed

chasm/lib/tests/library.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ func (l *library) Name() string {
1616

1717
func (l *library) Components() []*chasm.RegistrableComponent {
1818
return []*chasm.RegistrableComponent{
19-
chasm.NewRegistrableComponent[*PayloadStore]("payloadStore"),
19+
chasm.NewRegistrableComponent[*PayloadStore]("payloadStore",
20+
chasm.WithSearchAttributes(
21+
PayloadTotalCountSearchAttribute,
22+
PayloadTotalSizeSearchAttribute,
23+
chasm.SearchAttributeTemporalScheduledByID,
24+
),
25+
),
2026
}
2127
}
2228

chasm/lib/tests/payload.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"go.temporal.io/server/chasm"
77
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
88
"go.temporal.io/server/common"
9-
"go.temporal.io/server/common/searchattribute"
109
"google.golang.org/protobuf/types/known/timestamppb"
1110
)
1211

@@ -15,15 +14,16 @@ const (
1514
TotalSizeMemoFieldName = "TotalSize"
1615
)
1716

18-
// TODO: Register proper SA for TotalCount and TotalSize
19-
// For now, CHASM framework does NOT support Per-Component SearchAttributes
20-
// so just update a random existing pre-defined SA to make sure the logic works.
2117
const (
22-
TestKeywordSAFieldName = searchattribute.TemporalScheduledById
23-
TestKeywordSAFieldValue = "test-keyword-value"
18+
TestScheduleID = "TestScheduleID"
19+
PayloadTotalCountSAAlias = "PayloadTotalCount"
20+
PayloadTotalSizeSAAlias = "PayloadTotalSize"
2421
)
2522

2623
var (
24+
PayloadTotalCountSearchAttribute = chasm.NewSearchAttributeInt(PayloadTotalCountSAAlias, chasm.SearchAttributeFieldInt01)
25+
PayloadTotalSizeSearchAttribute = chasm.NewSearchAttributeInt(PayloadTotalSizeSAAlias, chasm.SearchAttributeFieldInt02)
26+
2727
_ chasm.VisibilitySearchAttributesProvider = (*PayloadStore)(nil)
2828
_ chasm.VisibilityMemoProvider = (*PayloadStore)(nil)
2929
)
@@ -147,12 +147,12 @@ func (s *PayloadStore) LifecycleState(
147147

148148
// SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface
149149
func (s *PayloadStore) SearchAttributes(
150-
_ chasm.Context,
151-
) map[string]chasm.VisibilityValue {
152-
// TODO: UpsertSearchAttribute as well when CHASM framework supports Per-Component SearchAttributes
153-
// For now, we just update a random existing pre-defined SA to make sure the logic works.
154-
return map[string]chasm.VisibilityValue{
155-
TestKeywordSAFieldName: chasm.VisibilityValueString(TestKeywordSAFieldValue),
150+
ctx chasm.Context,
151+
) []chasm.SearchAttributeKeyValue {
152+
return []chasm.SearchAttributeKeyValue{
153+
PayloadTotalCountSearchAttribute.Value(s.State.TotalCount),
154+
PayloadTotalSizeSearchAttribute.Value(s.State.TotalSize),
155+
chasm.SearchAttributeTemporalScheduledByID.Value(TestScheduleID),
156156
}
157157
}
158158

chasm/registrable_component.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package chasm
22

33
import (
4+
"fmt"
45
"reflect"
6+
7+
enumspb "go.temporal.io/api/enums/v1"
58
)
69

710
type (
@@ -13,6 +16,8 @@ type (
1316
ephemeral bool
1417
singleCluster bool
1518
shardingFn func(EntityKey) string
19+
20+
searchAttributesMapper *VisibilitySearchAttributesMapper
1621
}
1722

1823
RegistrableComponentOption func(*RegistrableComponent)
@@ -56,6 +61,40 @@ func WithShardingFn(
5661
}
5762
}
5863

64+
func WithSearchAttributes(
65+
searchAttributes ...SearchAttribute,
66+
) RegistrableComponentOption {
67+
return func(rc *RegistrableComponent) {
68+
if len(searchAttributes) == 0 {
69+
return
70+
}
71+
rc.searchAttributesMapper = &VisibilitySearchAttributesMapper{
72+
aliasToField: make(map[string]string, len(searchAttributes)),
73+
fieldToAlias: make(map[string]string, len(searchAttributes)),
74+
saTypeMap: make(map[string]enumspb.IndexedValueType, len(searchAttributes)),
75+
}
76+
77+
for _, sa := range searchAttributes {
78+
alias := sa.definition().alias
79+
field := sa.definition().field
80+
valueType := sa.definition().valueType
81+
82+
if _, ok := rc.searchAttributesMapper.aliasToField[alias]; ok {
83+
//nolint:forbidigo
84+
panic(fmt.Sprintf("registrable component validation error: search attribute alias %q is already defined", alias))
85+
}
86+
if _, ok := rc.searchAttributesMapper.fieldToAlias[field]; ok {
87+
//nolint:forbidigo
88+
panic(fmt.Sprintf("registrable component validation error: search attribute field %q is already defined", field))
89+
}
90+
91+
rc.searchAttributesMapper.aliasToField[alias] = field
92+
rc.searchAttributesMapper.fieldToAlias[field] = alias
93+
rc.searchAttributesMapper.saTypeMap[field] = valueType
94+
}
95+
}
96+
}
97+
5998
// fqType returns the fully qualified name of the component, which is a combination of
6099
// the library name and the component type. This is used to uniquely identify
61100
// the component in the registry.

0 commit comments

Comments
 (0)