Skip to content

Commit f4a06eb

Browse files
authored
Tracking cache invalidation for different types of workflow depends on their runtime length (#1277)
inject workflow execution timeout into getOrCreateWorkflowContext func and emit a new tag based on that timeout
1 parent 9909396 commit f4a06eb

File tree

2 files changed

+43
-21
lines changed

2 files changed

+43
-21
lines changed

internal/internal_logging_tags.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,26 @@
2121
package internal
2222

2323
const (
24-
tagActivityID = "ActivityID"
25-
tagActivityType = "ActivityType"
26-
tagDomain = "Domain"
27-
tagEventID = "EventID"
28-
tagEventType = "EventType"
29-
tagRunID = "RunID"
30-
tagTaskList = "TaskList"
31-
tagTimerID = "TimerID"
32-
tagWorkflowID = "WorkflowID"
33-
tagWorkflowType = "WorkflowType"
34-
tagWorkerID = "WorkerID"
35-
tagWorkerType = "WorkerType"
36-
tagSideEffectID = "SideEffectID"
37-
tagChildWorkflowID = "ChildWorkflowID"
38-
tagLocalActivityType = "LocalActivityType"
39-
tagLocalActivityID = "LocalActivityID"
40-
tagQueryType = "QueryType"
41-
tagVisibilityQuery = "VisibilityQuery"
42-
tagPanicError = "PanicError"
43-
tagPanicStack = "PanicStack"
44-
causeTag = "pollerrorcause"
24+
tagActivityID = "ActivityID"
25+
tagActivityType = "ActivityType"
26+
tagDomain = "Domain"
27+
tagEventID = "EventID"
28+
tagEventType = "EventType"
29+
tagRunID = "RunID"
30+
tagTaskList = "TaskList"
31+
tagTimerID = "TimerID"
32+
tagWorkflowID = "WorkflowID"
33+
tagWorkflowType = "WorkflowType"
34+
tagWorkerID = "WorkerID"
35+
tagWorkerType = "WorkerType"
36+
tagSideEffectID = "SideEffectID"
37+
tagChildWorkflowID = "ChildWorkflowID"
38+
tagLocalActivityType = "LocalActivityType"
39+
tagLocalActivityID = "LocalActivityID"
40+
tagQueryType = "QueryType"
41+
tagVisibilityQuery = "VisibilityQuery"
42+
tagPanicError = "PanicError"
43+
tagPanicStack = "PanicStack"
44+
causeTag = "pollerrorcause"
45+
tagworkflowruntimelength = "workflowruntimelength"
4546
)

internal/internal_task_handlers.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ const (
5151
defaultStickyCacheSize = 10000
5252

5353
noRetryBackoff = time.Duration(-1)
54+
55+
defaultInstantLivedWorkflowTimeoutUpperLimitInSec = 1
56+
57+
defaultShortLivedWorkflowTimeoutUpperLimitInSec = 1 * 1800
58+
59+
defaultMediumLivedWorkflowTimeoutUpperLimitInSec = 8 * 3600
5460
)
5561

5662
type (
@@ -677,6 +683,9 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
677683

678684
if workflowContext != nil {
679685
workflowContext.Lock()
686+
// add new tag on metrics scope with workflow runtime length category
687+
executionRuntimeType := workflowCategorizedByTimeout(workflowContext.workflowInfo.ExecutionStartToCloseTimeoutSeconds)
688+
metricsScope = metricsScope.Tagged(map[string]string{tagworkflowruntimelength: executionRuntimeType})
680689
if task.Query != nil && !isFullHistory {
681690
// query task and we have a valid cached state
682691
metricsScope.Counter(metrics.StickyCacheHit).Inc(1)
@@ -1825,3 +1834,15 @@ func traceLog(fn func()) {
18251834
fn()
18261835
}
18271836
}
1837+
1838+
func workflowCategorizedByTimeout(executionTimeout int32) string {
1839+
if executionTimeout <= defaultInstantLivedWorkflowTimeoutUpperLimitInSec {
1840+
return "instant"
1841+
} else if executionTimeout <= defaultShortLivedWorkflowTimeoutUpperLimitInSec {
1842+
return "short"
1843+
} else if executionTimeout <= defaultMediumLivedWorkflowTimeoutUpperLimitInSec {
1844+
return "intermediate"
1845+
} else {
1846+
return "long"
1847+
}
1848+
}

0 commit comments

Comments
 (0)