Skip to content

Commit 368b3cf

Browse files
authored
query API client impl (#226)
* query API client impl * update based on comments * add unit test to cover case where history contains more events after decision task started
1 parent 70e6c21 commit 368b3cf

15 files changed

+507
-48
lines changed

client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"go.uber.org/cadence/common/metrics"
3030
)
3131

32+
// QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call
33+
// stack of the workflow. The result will be a string encoded in the EncodedValue.
34+
const QueryTypeStackTrace string = "__stack_trace"
35+
3236
type (
3337
// Client is the client for starting and getting information about a workflow executions as well as
3438
// completing activities asynchronously.
@@ -127,6 +131,26 @@ type (
127131
// - InternalServiceError
128132
// - EntityNotExistError
129133
ListOpenWorkflow(request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error)
134+
135+
// QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID
136+
// and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the
137+
// target workflow execution that this query will be send to. If runID is not specified (empty string), server will
138+
// use the currently running execution of that workflowID. The queryType specifies the type of query you want to
139+
// run. By default, cadence supports "__stack_trace" as a standard query type, which will return string value
140+
// representing the call stack of the target workflow. The target workflow could also setup different query handler
141+
// to handle custom query types.
142+
// See comments at cadence.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details
143+
// on how to setup query handler within the target workflow.
144+
// - workflowID is required.
145+
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
146+
// - queryType is the type of the query.
147+
// - args... are the optional query parameters.
148+
// The errors it can return:
149+
// - BadRequestError
150+
// - InternalServiceError
151+
// - EntityNotExistError
152+
// - QueryFailError
153+
QueryWorkflow(workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error)
130154
}
131155

132156
// ClientOptions are optional parameters for Client creation.

internal_event_handlers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type (
8282
completeHandler completionHandler // events completion handler
8383
cancelHandler func() // A cancel handler to be invoked on a cancel notification
8484
signalHandler func(name string, input []byte) // A signal handler to be invoked on a signal event
85+
queryHandler func(queryType string, queryArgs []byte) ([]byte, error)
8586

8687
logger *zap.Logger
8788
isReplay bool // flag to indicate if workflow is in replay mode
@@ -241,6 +242,10 @@ func (wc *workflowEnvironmentImpl) RegisterSignalHandler(handler func(name strin
241242
wc.signalHandler = handler
242243
}
243244

245+
func (wc *workflowEnvironmentImpl) RegisterQueryHandler(handler func(string, []byte) ([]byte, error)) {
246+
wc.queryHandler = handler
247+
}
248+
244249
func (wc *workflowEnvironmentImpl) GetLogger() *zap.Logger {
245250
return wc.logger
246251
}
@@ -572,6 +577,13 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
572577
return weh.decisionsHelper.getDecisions(true), nil
573578
}
574579

580+
func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) {
581+
if queryType == QueryTypeStackTrace {
582+
return getHostEnvironment().encodeArg(weh.StackTrace())
583+
}
584+
return weh.queryHandler(queryType, queryArgs)
585+
}
586+
575587
func (weh *workflowExecutionEventHandlerImpl) StackTrace() string {
576588
return weh.workflowDefinition.StackTrace()
577589
}

internal_public.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type (
4545
ProcessWorkflowTask(
4646
task *s.PollForDecisionTaskResponse,
4747
getHistoryPage GetHistoryPage,
48-
emitStack bool) (response *s.RespondDecisionTaskCompletedRequest, stackTrace string, err error)
48+
emitStack bool) (response interface{}, stackTrace string, err error)
4949
}
5050

5151
// ActivityTaskHandler represents activity task handlers.

internal_task_handlers.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type (
5252
// Process a single event and return the assosciated decisions.
5353
// Return List of decisions made, any error.
5454
ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) ([]*s.Decision, error)
55+
// ProcessQuery process a query request.
56+
ProcessQuery(queryType string, queryArgs []byte) ([]byte, error)
5557
StackTrace() string
5658
// Close for cleaning up resources on this event handler
5759
Close()
@@ -319,7 +321,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
319321
task *s.PollForDecisionTaskResponse,
320322
getHistoryPage GetHistoryPage,
321323
emitStack bool,
322-
) (result *s.RespondDecisionTaskCompletedRequest, stackTrace string, err error) {
324+
) (result interface{}, stackTrace string, err error) {
323325
if task == nil {
324326
return nil, "", errors.New("nil workflowtask provided")
325327
}
@@ -439,10 +441,13 @@ ProcessEvents:
439441
}
440442
}
441443
}
442-
// check if decisions from reply matches to the history events
443-
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
444-
wth.logger.Error("Replay and history mismatch.", zap.Error(err))
445-
return nil, "", err
444+
445+
if task.Query == nil {
446+
// check if decisions from reply matches to the history events
447+
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
448+
wth.logger.Error("Replay and history mismatch.", zap.Error(err))
449+
return nil, "", err
450+
}
446451
}
447452

448453
startEvent, err := reorderedHistory.GetWorkflowStartedEvent()
@@ -482,28 +487,44 @@ ProcessEvents:
482487
}
483488
}
484489

485-
// Fill the response.
486-
taskCompletionRequest := &s.RespondDecisionTaskCompletedRequest{
487-
TaskToken: task.TaskToken,
488-
Decisions: decisions,
489-
Identity: common.StringPtr(wth.identity),
490-
// ExecutionContext:
491-
}
492-
493-
traceLog(func() {
494-
var buf bytes.Buffer
495-
for i, d := range decisions {
496-
buf.WriteString(fmt.Sprintf("%v: %v\n", i, util.DecisionToString(d)))
490+
var completeRequest interface{}
491+
if task.Query != nil {
492+
// for query task
493+
result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.GetQueryArgs_())
494+
queryTaskCompleteRequest := &s.RespondQueryTaskCompletedRequest{
495+
TaskToken: task.TaskToken,
497496
}
498-
wth.logger.Debug("new_decisions",
499-
zap.Int("DecisionCount", len(decisions)),
500-
zap.String("Decisions", buf.String()))
501-
})
497+
if err != nil {
498+
queryTaskCompleteRequest.CompletedType = s.QueryTaskCompletedTypePtr(s.QueryTaskCompletedType_FAILED)
499+
queryTaskCompleteRequest.ErrorMessage = common.StringPtr(err.Error())
500+
} else {
501+
queryTaskCompleteRequest.CompletedType = s.QueryTaskCompletedTypePtr(s.QueryTaskCompletedType_COMPLETED)
502+
queryTaskCompleteRequest.QueryResult_ = result
503+
}
504+
completeRequest = queryTaskCompleteRequest
505+
} else {
506+
// Fill the response.
507+
completeRequest = &s.RespondDecisionTaskCompletedRequest{
508+
TaskToken: task.TaskToken,
509+
Decisions: decisions,
510+
Identity: common.StringPtr(wth.identity),
511+
// ExecutionContext:
512+
}
513+
traceLog(func() {
514+
var buf bytes.Buffer
515+
for i, d := range decisions {
516+
buf.WriteString(fmt.Sprintf("%v: %v\n", i, util.DecisionToString(d)))
517+
}
518+
wth.logger.Debug("new_decisions",
519+
zap.Int("DecisionCount", len(decisions)),
520+
zap.String("Decisions", buf.String()))
521+
})
522+
}
502523

503524
if emitStack {
504525
stackTrace = eventHandler.StackTrace()
505526
}
506-
return taskCompletionRequest, stackTrace, nil
527+
return completeRequest, stackTrace, nil
507528
}
508529

509530
func isVersionMarkerDecision(d *s.Decision) bool {

internal_task_handlers_interfaces_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type sampleWorkflowTaskHandler struct {
7070
func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask(
7171
task *m.PollForDecisionTaskResponse,
7272
getHistoryPage GetHistoryPage,
73-
emitStack bool) (*m.RespondDecisionTaskCompletedRequest, string, error) {
73+
emitStack bool) (interface{}, string, error) {
7474
return &m.RespondDecisionTaskCompletedRequest{
7575
TaskToken: task.TaskToken,
7676
}, "", nil
@@ -126,7 +126,8 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() {
126126

127127
// Process task and respond to the service.
128128
taskHandler := newSampleWorkflowTaskHandler()
129-
completionRequest, _, err := taskHandler.ProcessWorkflowTask(response, nil, false)
129+
request, _, err := taskHandler.ProcessWorkflowTask(response, nil, false)
130+
completionRequest := request.(*m.RespondDecisionTaskCompletedRequest)
130131
s.NoError(err)
131132

132133
err = service.RespondDecisionTaskCompleted(ctx, completionRequest)

internal_task_handlers_test.go

Lines changed: 111 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,17 @@ func createTestEventDecisionTaskStarted(eventID int64) *s.HistoryEvent {
117117
EventType: common.EventTypePtr(s.EventType_DecisionTaskStarted)}
118118
}
119119

120+
func createTestEventWorkflowExecutionSignaled(eventID int64, signalName string) *s.HistoryEvent {
121+
return &s.HistoryEvent{
122+
EventId: common.Int64Ptr(eventID),
123+
EventType: common.EventTypePtr(s.EventType_WorkflowExecutionSignaled),
124+
WorkflowExecutionSignaledEventAttributes: &s.WorkflowExecutionSignaledEventAttributes{
125+
SignalName: common.StringPtr(signalName),
126+
Identity: common.StringPtr("test-identity"),
127+
},
128+
}
129+
}
130+
120131
func createTestEventDecisionTaskCompleted(eventID int64, attr *s.DecisionTaskCompletedEventAttributes) *s.HistoryEvent {
121132
return &s.HistoryEvent{
122133
EventId: common.Int64Ptr(eventID),
@@ -140,6 +151,19 @@ func createWorkflowTask(
140151
}
141152
}
142153

154+
func createQueryTask(
155+
events []*s.HistoryEvent,
156+
previousStartEventID int64,
157+
workflowName string,
158+
queryType string,
159+
) *s.PollForDecisionTaskResponse {
160+
task := createWorkflowTask(events, previousStartEventID, workflowName)
161+
task.Query = &s.WorkflowQuery{
162+
QueryType: common.StringPtr(queryType),
163+
}
164+
return task
165+
}
166+
143167
func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted() {
144168
taskList := "tl1"
145169
testEvents := []*s.HistoryEvent{
@@ -152,7 +176,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted() {
152176
Logger: t.logger,
153177
}
154178
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
155-
response, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
179+
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
180+
response := request.(*s.RespondDecisionTaskCompletedRequest)
156181
t.NoError(err)
157182
t.NotNil(response)
158183
t.Equal(1, len(response.GetDecisions()))
@@ -181,7 +206,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
181206
Logger: t.logger,
182207
}
183208
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
184-
response, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
209+
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
210+
response := request.(*s.RespondDecisionTaskCompletedRequest)
185211

186212
t.NoError(err)
187213
t.NotNil(response)
@@ -191,14 +217,86 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
191217

192218
// Schedule an activity and see if we complete workflow, Having only one last decision.
193219
task = createWorkflowTask(testEvents, 2, "HelloWorld_Workflow")
194-
response, _, err = taskHandler.ProcessWorkflowTask(task, nil, false)
220+
request, _, err = taskHandler.ProcessWorkflowTask(task, nil, false)
221+
response = request.(*s.RespondDecisionTaskCompletedRequest)
195222
t.NoError(err)
196223
t.NotNil(response)
197224
t.Equal(1, len(response.GetDecisions()))
198225
t.Equal(s.DecisionType_CompleteWorkflowExecution, response.GetDecisions()[0].GetDecisionType())
199226
t.NotNil(response.GetDecisions()[0].GetCompleteWorkflowExecutionDecisionAttributes())
200227
}
201228

229+
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() {
230+
// Schedule an activity and see if we complete workflow.
231+
taskList := "tl1"
232+
testEvents := []*s.HistoryEvent{
233+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
234+
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
235+
createTestEventDecisionTaskStarted(3),
236+
createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}),
237+
createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{
238+
ActivityId: common.StringPtr("0"),
239+
ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")},
240+
TaskList: &s.TaskList{Name: &taskList},
241+
}),
242+
createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}),
243+
createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}),
244+
createTestEventDecisionTaskStarted(8),
245+
createTestEventWorkflowExecutionSignaled(9, "test-signal"),
246+
}
247+
params := workerExecutionParameters{
248+
TaskList: taskList,
249+
Identity: "test-id-1",
250+
Logger: t.logger,
251+
}
252+
253+
// query after first decision task (notice the previousStartEventID is always the last eventID for query task)
254+
task := createQueryTask(testEvents[0:3], 3, "HelloWorld_Workflow", "test-query")
255+
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
256+
response, _, _ := taskHandler.ProcessWorkflowTask(task, nil, false)
257+
t.verifyQueryResult(response, "waiting-activity-result")
258+
259+
// query after activity task complete but before second decision task started
260+
task = createQueryTask(testEvents[0:7], 7, "HelloWorld_Workflow", "test-query")
261+
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
262+
response, _, _ = taskHandler.ProcessWorkflowTask(task, nil, false)
263+
t.verifyQueryResult(response, "waiting-activity-result")
264+
265+
// query after second decision task
266+
task = createQueryTask(testEvents[0:8], 8, "HelloWorld_Workflow", "test-query")
267+
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
268+
response, _, _ = taskHandler.ProcessWorkflowTask(task, nil, false)
269+
t.verifyQueryResult(response, "done")
270+
271+
// query after second decision task with extra events
272+
task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "test-query")
273+
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
274+
response, _, _ = taskHandler.ProcessWorkflowTask(task, nil, false)
275+
t.verifyQueryResult(response, "done")
276+
277+
task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "invalid-query-type")
278+
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
279+
response, _, _ = taskHandler.ProcessWorkflowTask(task, nil, false)
280+
t.NotNil(response)
281+
queryResp, ok := response.(*s.RespondQueryTaskCompletedRequest)
282+
t.True(ok)
283+
t.NotNil(queryResp.ErrorMessage)
284+
t.Contains(*queryResp.ErrorMessage, "unkonwn queryType")
285+
}
286+
287+
func (t *TaskHandlersTestSuite) verifyQueryResult(response interface{}, expectedResult string) {
288+
t.NotNil(response)
289+
queryResp, ok := response.(*s.RespondQueryTaskCompletedRequest)
290+
t.True(ok)
291+
t.Nil(queryResp.ErrorMessage)
292+
t.NotNil(queryResp.QueryResult_)
293+
encodedValue := EncodedValue(queryResp.QueryResult_)
294+
var queryResult string
295+
err := encodedValue.Get(&queryResult)
296+
t.NoError(err)
297+
t.Equal(expectedResult, queryResult)
298+
}
299+
202300
func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
203301
taskList := "taskList"
204302
testEvents := []*s.HistoryEvent{
@@ -216,17 +314,17 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
216314
Logger: zap.NewNop(),
217315
}
218316
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
219-
response, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
220-
317+
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
318+
response := request.(*s.RespondDecisionTaskCompletedRequest)
221319
// there should be no error as the history events matched the decisions.
222320
t.NoError(err)
223321
t.NotNil(response)
224322

225323
// now change the history event so it does not match to decision produced via replay
226324
testEvents[1].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("some-other-activity")
227-
response, _, err = taskHandler.ProcessWorkflowTask(task, nil, false)
325+
request, _, err = taskHandler.ProcessWorkflowTask(task, nil, false)
228326
t.Error(err)
229-
t.Nil(response)
327+
t.Nil(request)
230328
t.Contains(err.Error(), "nondeterministic")
231329
}
232330

@@ -246,8 +344,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() {
246344
Logger: t.logger,
247345
}
248346
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
249-
response, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
250-
347+
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
348+
response := request.(*s.RespondDecisionTaskCompletedRequest)
251349
t.NoError(err)
252350
t.NotNil(response)
253351
//t.printAllDecisions(response.GetDecisions())
@@ -275,10 +373,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PressurePoints() {
275373
Logger: t.logger,
276374
}
277375
taskHandler := newWorkflowTaskHandler(testDomain, params, ppMgr, getHostEnvironment())
278-
response, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
279-
376+
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
280377
t.Error(err)
281-
t.Nil(response)
378+
t.Nil(request)
282379
}
283380

284381
func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() {
@@ -304,8 +401,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() {
304401
return &s.History{nextEvents}, nil, nil
305402
}
306403
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
307-
response, _, err := taskHandler.ProcessWorkflowTask(task, iteratorfn, false)
308-
404+
request, _, err := taskHandler.ProcessWorkflowTask(task, iteratorfn, false)
405+
response := request.(*s.RespondDecisionTaskCompletedRequest)
309406
t.NoError(err)
310407
t.NotNil(response)
311408
}

0 commit comments

Comments
 (0)