Skip to content

Commit 137ca72

Browse files
Consistent query test (#916)
1 parent 442cb70 commit 137ca72

File tree

5 files changed

+65
-1
lines changed

5 files changed

+65
-1
lines changed

internal/internal_task_pollers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}
368368
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
369369
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
370370
}
371+
} else {
372+
request.ReturnNewDecisionTask = common.BoolPtr(false)
371373
}
372374
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
373375
if err1 != nil {

internal/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ const LibraryVersion = "0.10.1"
4242
// This can be used for client capability check, on
4343
// Cadence server, for backward compatibility
4444
// Format: MAJOR.MINOR.PATCH
45-
const FeatureVersion = "1.4.0"
45+
const FeatureVersion = "1.5.0"

test/activity_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func (a *Activities) Sleep(ctx context.Context, delay time.Duration) error {
4848
return nil
4949
}
5050

51+
func LocalSleep(ctx context.Context, delay time.Duration) error {
52+
time.Sleep(delay)
53+
return nil
54+
}
55+
5156
func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.Duration) (int, error) {
5257
a.append("heartbeatAndSleep")
5358
if activity.HasHeartbeatDetails(ctx) {

test/integration_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,39 @@ func (ts *IntegrationTestSuite) TestStackTraceQuery() {
239239
ts.True(strings.Contains(trace, "go.uber.org/cadence/test.(*Workflows).Basic"))
240240
}
241241

242+
func (ts *IntegrationTestSuite) TestConsistentQuery() {
243+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
244+
defer cancel()
245+
// this workflow will start a local activity which blocks for long enough
246+
// to ensure that consistent query must wait in order to satisfy consistency
247+
wfOpts := ts.startWorkflowOptions("test-consistent-query")
248+
wfOpts.DecisionTaskStartToCloseTimeout = 5 *time.Second
249+
run, err := ts.libClient.ExecuteWorkflow(ctx, wfOpts, ts.workflows.ConsistentQueryWorkflow, 3*time.Second)
250+
ts.Nil(err)
251+
// Wait for a second to ensure that first decision task gets started and completed before we send signal.
252+
// Query cannot be run until first decision task has been completed.
253+
// If signal occurs right after workflow start then WorkflowStarted and Signal events will both be part of the same
254+
// decision task. So query will be blocked waiting for signal to complete, this is not what we want because it
255+
// will not exercise the consistent query code path.
256+
<-time.After(time.Second)
257+
err = ts.libClient.SignalWorkflow(ctx, "test-consistent-query", run.GetRunID(), consistentQuerySignalCh, "signal-input")
258+
ts.NoError(err)
259+
260+
value, err := ts.libClient.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
261+
WorkflowID: "test-consistent-query",
262+
RunID: run.GetRunID(),
263+
QueryType: "consistent_query",
264+
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
265+
})
266+
ts.Nil(err)
267+
ts.NotNil(value)
268+
ts.NotNil(value.QueryResult)
269+
ts.Nil(value.QueryRejected)
270+
var queryResult string
271+
ts.Nil(value.QueryResult.Get(&queryResult))
272+
ts.Equal("signal-input", queryResult)
273+
}
274+
242275
func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {
243276
var result string
244277
err := ts.executeWorkflow(

test/workflow_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ import (
3333
"go.uber.org/cadence/workflow"
3434
)
3535

36+
const (
37+
consistentQuerySignalCh = "consistent-query-signal-chan"
38+
)
39+
3640
type Workflows struct{}
3741

3842
func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) {
@@ -409,6 +413,25 @@ func (w *Workflows) LargeQueryResultWorkflow(ctx workflow.Context) (string, erro
409413
return "hello", nil
410414
}
411415

416+
func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Duration) error {
417+
queryResult := "starting-value"
418+
err := workflow.SetQueryHandler(ctx, "consistent_query", func() (string, error) {
419+
return queryResult, nil
420+
})
421+
if err != nil {
422+
return errors.New("failed to register query handler")
423+
}
424+
ch := workflow.GetSignalChannel(ctx, consistentQuerySignalCh)
425+
var signalData string
426+
ch.Receive(ctx, &signalData)
427+
laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
428+
ScheduleToCloseTimeout: 5 * time.Second,
429+
})
430+
workflow.ExecuteLocalActivity(laCtx, LocalSleep, delay).Get(laCtx, nil)
431+
queryResult = signalData
432+
return nil
433+
}
434+
412435
func (w *Workflows) RetryTimeoutStableErrorWorkflow(ctx workflow.Context) ([]string, error) {
413436
ao := workflow.ActivityOptions{
414437
ScheduleToStartTimeout: time.Second * 2,
@@ -486,6 +509,7 @@ func (w *Workflows) register() {
486509
workflow.Register(w.SimplestWorkflow)
487510
workflow.Register(w.LargeQueryResultWorkflow)
488511
workflow.Register(w.RetryTimeoutStableErrorWorkflow)
512+
workflow.Register(w.ConsistentQueryWorkflow)
489513
}
490514

491515
func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {

0 commit comments

Comments
 (0)