Skip to content

Commit 1d3252f

Browse files
authored
Merge branch 'master' into mandychen/remove-buildkite
2 parents c73549d + 3d6e75c commit 1d3252f

27 files changed

+743
-33
lines changed

.github/workflows/ci-checks.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jobs:
2929
docker compose -f docker/github_actions/docker-compose.yml run unit-test bash -c "make unit_test && ./scripts/gen_coverage_metadata.sh .build/metadata.txt"
3030
3131
- name: Upload coverage artifacts
32+
if: always()
3233
uses: actions/upload-artifact@v4
3334
with:
3435
name: go-unit-test-coverage
@@ -82,6 +83,7 @@ jobs:
8283
docker compose -f docker/github_actions/docker-compose.yml run integ-test bash -c "make integ_test_sticky_off"
8384
8485
- name: Upload coverage artifacts
86+
if: always()
8587
uses: actions/upload-artifact@v4
8688
with:
8789
name: go-integration-sticky-off-coverage
@@ -113,6 +115,7 @@ jobs:
113115
docker compose -f docker/github_actions/docker-compose.yml run integ-test bash -c "make integ_test_sticky_on"
114116
115117
- name: Upload coverage artifacts
118+
if: always()
116119
uses: actions/upload-artifact@v4
117120
with:
118121
name: go-integration-sticky-on-coverage
@@ -144,6 +147,7 @@ jobs:
144147
docker compose -f docker/github_actions/docker-compose.yml run integ-test-grpc bash -c "make integ_test_grpc"
145148
146149
- name: Upload coverage artifacts
150+
if: always()
147151
uses: actions/upload-artifact@v4
148152
with:
149153
name: go-integration-grpc-coverage

activity/activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ func GetInfo(ctx context.Context) Info {
102102
return internal.GetActivityInfo(ctx)
103103
}
104104

105+
// HasInfo returns if the context contains activity information
106+
func HasInfo(ctx context.Context) bool {
107+
return internal.HasActivityInfo(ctx)
108+
}
109+
105110
// GetLogger returns a logger that can be used in activity
106111
func GetLogger(ctx context.Context) *zap.Logger {
107112
return internal.GetActivityLogger(ctx)

client/client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,20 @@ type (
7878
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions
7979
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse
8080

81+
// GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions
82+
GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest
83+
84+
// DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions
85+
DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest
86+
8187
// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
8288
ParentClosePolicy = internal.ParentClosePolicy
8389

90+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
91+
// It will default to the cluster's configuration if not specified.
92+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
93+
QueryConsistencyLevel = internal.QueryConsistencyLevel
94+
8495
// CancelOption values are functional options for the CancelWorkflow method.
8596
// Supported values can be created with:
8697
// - WithCancelReason(...)
@@ -226,6 +237,26 @@ type (
226237
// }
227238
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
228239

240+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
241+
// See GetWorkflowHistoryWithOptionsRequest for more information.
242+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
243+
// Example:-
244+
// To iterate all events,
245+
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
246+
// events := []*shared.HistoryEvent{}
247+
// for iter.HasNext() {
248+
// event, err := iter.Next()
249+
// if err != nil {
250+
// return err
251+
// }
252+
// events = append(events, event)
253+
// }
254+
// Returns the following errors:
255+
// - EntityNotExistsError
256+
// - BadRequestError
257+
// - InternalServiceError
258+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator
259+
229260
// CompleteActivity reports activity completed.
230261
// activity Execute method can return activity.ErrResultPending to
231262
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -383,6 +414,14 @@ type (
383414
// - EntityNotExistError
384415
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
385416

417+
// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
418+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
419+
// The errors it can return:
420+
// - BadRequestError
421+
// - InternalServiceError
422+
// - EntityNotExistError
423+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
424+
386425
// DescribeTaskList returns information about the target tasklist, right now this API returns the
387426
// pollers which polled this tasklist in last few minutes.
388427
// The errors it can return:
@@ -458,6 +497,15 @@ const (
458497
ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon
459498
)
460499

500+
const (
501+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
502+
QueryConsistencyLevelUnspecified = internal.QueryConsistencyLevelUnspecified
503+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
504+
QueryConsistencyLevelEventual = internal.QueryConsistencyLevelEventual
505+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
506+
QueryConsistencyLevelStrong = internal.QueryConsistencyLevelStrong
507+
)
508+
461509
// NewClient creates an instance of a workflow client
462510
func NewClient(service workflowserviceclient.Interface, domain string, options *Options) Client {
463511
return internal.NewClient(service, domain, options)

internal/activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ func GetActivityInfo(ctx context.Context) ActivityInfo {
224224
}
225225
}
226226

227+
// HasActivityInfo returns if the context contains activity information
228+
func HasActivityInfo(ctx context.Context) bool {
229+
return hasActivityEnv(ctx)
230+
}
231+
227232
// HasHeartbeatDetails checks if there is heartbeat details from last attempt.
228233
func HasHeartbeatDetails(ctx context.Context) bool {
229234
env := getActivityEnv(ctx)

internal/activity_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,3 +233,25 @@ func (s *activityTestSuite) TestGetWorkerStopChannel() {
233233
channel := GetWorkerStopChannel(ctx)
234234
s.NotNil(channel)
235235
}
236+
237+
func (s *activityTestSuite) TestHasActivityInfo() {
238+
// Test context without activity info
239+
ctx := context.Background()
240+
s.False(HasActivityInfo(ctx))
241+
242+
// Test context with activity info
243+
activityEnv := &activityEnvironment{
244+
activityID: "test-activity-id",
245+
activityType: ActivityType{Name: "test-activity-type"},
246+
}
247+
ctxWithActivity := context.WithValue(ctx, activityEnvContextKey, activityEnv)
248+
s.True(HasActivityInfo(ctxWithActivity))
249+
250+
// Test context with nil activity env
251+
ctxWithNilActivity := context.WithValue(ctx, activityEnvContextKey, nil)
252+
s.False(HasActivityInfo(ctxWithNilActivity))
253+
254+
// Test context with other values in context
255+
ctxWithOtherValue := context.WithValue(ctx, activityOptionsContextKey, "other-value")
256+
s.False(HasActivityInfo(ctxWithOtherValue))
257+
}

internal/client.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ type (
219219
// }
220220
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
221221

222+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
223+
// See GetWorkflowHistoryWithOptionsRequest for more information.
224+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
225+
// The errors it can return:
226+
// - EntityNotExistsError
227+
// - BadRequestError
228+
// - InternalServiceError
229+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator
230+
222231
// CompleteActivity reports activity completed.
223232
// activity Execute method can return acitivity.activity.ErrResultPending to
224233
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -367,6 +376,14 @@ type (
367376
// - EntityNotExistError
368377
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
369378

379+
// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
380+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
381+
// The errors it can return:
382+
// - BadRequestError
383+
// - InternalServiceError
384+
// - EntityNotExistError
385+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
386+
370387
// DescribeTaskList returns information about the target tasklist, right now this API returns the
371388
// pollers which polled this tasklist in last few minutes.
372389
// The errors it can return:
@@ -569,6 +586,11 @@ type (
569586
// ParentClosePolicy defines the action on children when parent is closed
570587
ParentClosePolicy int
571588

589+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
590+
// It will default to the cluster's configuration if not specified.
591+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
592+
QueryConsistencyLevel int
593+
572594
// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
573595
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
574596
// Individual workflows can be configured to be active in one of the active clusters of the domain.
@@ -620,6 +642,15 @@ const (
620642
WorkflowIDReusePolicyTerminateIfRunning
621643
)
622644

645+
const (
646+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
647+
QueryConsistencyLevelUnspecified QueryConsistencyLevel = iota
648+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
649+
QueryConsistencyLevelEventual
650+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
651+
QueryConsistencyLevelStrong
652+
)
653+
623654
func getFeatureFlags(options *ClientOptions) FeatureFlags {
624655
if options != nil {
625656
return FeatureFlags{

internal/compatibility/proto/decision.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ func Decision(d *shared.Decision) *apiv1.Decision {
142142
Memo: Memo(attr.Memo),
143143
SearchAttributes: SearchAttributes(attr.SearchAttributes),
144144
JitterStart: secondsToDuration(attr.JitterStartSeconds),
145+
CronOverlapPolicy: CronOverlapPolicy(attr.CronOverlapPolicy),
146+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(attr.ActiveClusterSelectionPolicy),
145147
},
146148
}
147149
case shared.DecisionTypeStartChildWorkflowExecution:
@@ -163,6 +165,8 @@ func Decision(d *shared.Decision) *apiv1.Decision {
163165
Header: Header(attr.Header),
164166
Memo: Memo(attr.Memo),
165167
SearchAttributes: SearchAttributes(attr.SearchAttributes),
168+
CronOverlapPolicy: CronOverlapPolicy(attr.CronOverlapPolicy),
169+
ActiveClusterSelectionPolicy: ActiveClusterSelectionPolicy(attr.ActiveClusterSelectionPolicy),
166170
},
167171
}
168172
case shared.DecisionTypeSignalExternalWorkflowExecution:

internal/compatibility/proto/request.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ const (
519519
DomainUpdateVisibilityArchivalURIField = "visibility_archival_uri"
520520
DomainUpdateActiveClusterNameField = "active_cluster_name"
521521
DomainUpdateClustersField = "clusters"
522+
DomainUpdateActiveClustersField = "active_clusters"
522523
DomainUpdateDeleteBadBinaryField = "delete_bad_binary"
523524
DomainUpdateFailoverTimeoutField = "failover_timeout"
524525
)
@@ -583,6 +584,10 @@ func UpdateDomainRequest(t *shared.UpdateDomainRequest) *apiv1.UpdateDomainReque
583584
request.Clusters = ClusterReplicationConfigurationArray(replicationConfiguration.Clusters)
584585
fields = append(fields, DomainUpdateClustersField)
585586
}
587+
if replicationConfiguration.ActiveClusters != nil {
588+
request.ActiveClusters = ActiveClusters(replicationConfiguration.ActiveClusters)
589+
fields = append(fields, DomainUpdateActiveClustersField)
590+
}
586591
}
587592
if t.DeleteBadBinary != nil {
588593
request.DeleteBadBinary = *t.DeleteBadBinary

internal/compatibility/proto/response.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func DescribeDomainResponse(t *shared.DescribeDomainResponse) *apiv1.DescribeDom
6262
if repl := t.ReplicationConfiguration; repl != nil {
6363
domain.ActiveClusterName = repl.GetActiveClusterName()
6464
domain.Clusters = ClusterReplicationConfigurationArray(repl.Clusters)
65+
domain.ActiveClusters = ActiveClusters(repl.ActiveClusters)
6566
}
6667
return &apiv1.DescribeDomainResponse{Domain: &domain}
6768
}
@@ -341,6 +342,7 @@ func UpdateDomainResponse(t *shared.UpdateDomainResponse) *apiv1.UpdateDomainRes
341342
if repl := t.ReplicationConfiguration; repl != nil {
342343
domain.ActiveClusterName = repl.GetActiveClusterName()
343344
domain.Clusters = ClusterReplicationConfigurationArray(repl.Clusters)
345+
domain.ActiveClusters = ActiveClusters(repl.ActiveClusters)
344346
}
345347
return &apiv1.UpdateDomainResponse{
346348
Domain: domain,

internal/compatibility/proto/types.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ func WorkflowExecutionInfo(t *shared.WorkflowExecutionInfo) *apiv1.WorkflowExecu
417417
AutoResetPoints: ResetPoints(t.AutoResetPoints),
418418
TaskList: t.GetTaskList(),
419419
IsCron: t.GetIsCron(),
420+
CronOverlapPolicy: CronOverlapPolicy(t.CronOverlapPolicy),
420421
}
421422
}
422423

@@ -552,6 +553,7 @@ func DescribeDomainResponseDomain(t *shared.DescribeDomainResponse) *apiv1.Domai
552553
if repl := t.ReplicationConfiguration; repl != nil {
553554
domain.ActiveClusterName = repl.GetActiveClusterName()
554555
domain.Clusters = ClusterReplicationConfigurationArray(repl.Clusters)
556+
domain.ActiveClusters = ActiveClusters(repl.ActiveClusters)
555557
}
556558
return &domain
557559
}
@@ -695,3 +697,21 @@ func ActivityLocalDispatchInfoMap(t map[string]*shared.ActivityLocalDispatchInfo
695697
}
696698
return v
697699
}
700+
701+
func ActiveClusters(ac *shared.ActiveClusters) *apiv1.ActiveClusters {
702+
if ac == nil {
703+
return nil
704+
}
705+
706+
regToCl := make(map[string]*apiv1.ActiveClusterInfo)
707+
for reg, clusterInfo := range ac.ActiveClustersByRegion {
708+
regToCl[reg] = &apiv1.ActiveClusterInfo{
709+
ActiveClusterName: clusterInfo.GetActiveClusterName(),
710+
FailoverVersion: clusterInfo.GetFailoverVersion(),
711+
}
712+
}
713+
714+
return &apiv1.ActiveClusters{
715+
RegionToCluster: regToCl,
716+
}
717+
}

0 commit comments

Comments
 (0)