Skip to content

Commit de46aa8

Browse files
authored
Add Sometimes assertions (#8571)
## What changed? WISOTT ## Why? Sometimes assertions help our internal testing platform to identify interesting test cases. ## How did you test it? - [x] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks Performance overhead in production; but it should be minimal as the assertions are not placed on the hot path but on error cases.
1 parent 463cbf5 commit de46aa8

File tree

9 files changed

+114
-30
lines changed

9 files changed

+114
-30
lines changed

common/softassert/softassert.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import (
1616
"go.temporal.io/server/common/log/tag"
1717
)
1818

19+
type sometimesLogger struct {
20+
logger log.Logger
21+
}
22+
1923
// That performs a soft assertion by logging an error if the given condition is false.
2024
// It is meant to indicate a condition is always expected to be true.
2125
// Returns true if the condition is met, otherwise false.
@@ -44,27 +48,42 @@ func Fail(logger log.Logger, staticMessage string, tags ...tag.Tag) {
4448
logger.Error("failed assertion: "+staticMessage, append([]tag.Tag{tag.FailedAssertion}, tags...)...)
4549
}
4650

47-
// ThatSometimes is used to conditionally log a debug message of a noteworthy but non-problematic event.
51+
// Sometimes is used to log a message of a noteworthy but non-problematic event.
52+
//
53+
// Example:
54+
// softassert.Sometimes(logger).Warn("termination event", tag.NewStringTag("state", object.state))
55+
func Sometimes(logger log.Logger) *sometimesLogger {
56+
return &sometimesLogger{logger: logger}
57+
}
58+
59+
// Debug logs a message at debug level.
4860
//
4961
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
5062
// Dynamic information should be passed via `tags`.
63+
func (s *sometimesLogger) Debug(staticMessage string, tags ...tag.Tag) {
64+
s.logger.Debug(staticMessage, tags...)
65+
}
66+
67+
// Info logs a message at info level.
5168
//
52-
// Example:
53-
// softassert.ThatSometimes(logger, object.state == "terminated", "termination event", tag.NewStringTag("state", object.state))
54-
func ThatSometimes(logger log.Logger, condition bool, staticMessage string, tags ...tag.Tag) bool {
55-
if !condition {
56-
logger.Debug(staticMessage, tags...)
57-
}
58-
return condition
69+
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
70+
// Dynamic information should be passed via `tags`.
71+
func (s *sometimesLogger) Info(staticMessage string, tags ...tag.Tag) {
72+
s.logger.Info(staticMessage, tags...)
5973
}
6074

61-
// Sometimes is used to log a debug message of a noteworthy but non-problematic event.
75+
// Warn logs a message at warn level.
6276
//
6377
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
6478
// Dynamic information should be passed via `tags`.
79+
func (s *sometimesLogger) Warn(staticMessage string, tags ...tag.Tag) {
80+
s.logger.Warn(staticMessage, tags...)
81+
}
82+
83+
// Error logs a message at error level.
6584
//
66-
// Example:
67-
// softassert.Sometimes(logger, "termination event", tag.NewStringTag("state", object.state))
68-
func Sometimes(logger log.Logger, staticMessage string, tags ...tag.Tag) {
69-
logger.Debug(staticMessage, tags...)
85+
// `staticMessage` is expected to be a static string to help with grouping and searching logs.
86+
// Dynamic information should be passed via `tags`.
87+
func (s *sometimesLogger) Error(staticMessage string, tags ...tag.Tag) {
88+
s.logger.Error(staticMessage, tags...)
7089
}

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.temporal.io/server/common/persistence"
3131
"go.temporal.io/server/common/persistence/visibility/manager"
3232
"go.temporal.io/server/common/searchattribute"
33+
"go.temporal.io/server/common/softassert"
3334
"go.temporal.io/server/common/tasktoken"
3435
"go.temporal.io/server/common/worker_versioning"
3536
"go.temporal.io/server/service/history/api"
@@ -129,6 +130,12 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
129130
metrics.StaleMutableStateCounter.With(handler.metricsHandler).Record(
130131
1,
131132
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope))
133+
softassert.Sometimes(handler.logger).Debug("stale mutable state detected",
134+
tag.WorkflowID(token.GetWorkflowId()),
135+
tag.WorkflowRunID(token.GetRunId()),
136+
tag.WorkflowScheduledEventID(token.GetScheduledEventId()),
137+
tag.NewInt64("mutable-state-next-event-id", mutableState.GetNextEventID()),
138+
)
132139
return false
133140
}
134141
return true
@@ -166,7 +173,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
166173
// This is NOT 100% bulletproof solution because this write operation may also fail.
167174
// TODO: remove this call when GetWorkflowExecutionHistory includes speculative WFT events.
168175
if clearStickyErr := handler.clearStickyTaskQueue(ctx, workflowLease.GetContext()); clearStickyErr != nil {
169-
handler.logger.Error("Failed to clear stickiness after speculative workflow task failed to complete.",
176+
softassert.Sometimes(handler.logger).Error("Failed to clear stickiness after speculative workflow task failed to complete.",
170177
tag.NewErrorTag("clear-sticky-error", clearStickyErr),
171178
tag.Error(retError),
172179
tag.WorkflowID(token.GetWorkflowId()),
@@ -225,7 +232,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
225232
if retError != nil {
226233
cancelled := effects.Cancel(ctx)
227234
if cancelled {
228-
handler.logger.Info("Canceled effects due to error.",
235+
softassert.Sometimes(handler.logger).Info("Canceled effects due to error",
229236
tag.Error(retError),
230237
tag.WorkflowID(token.GetWorkflowId()),
231238
tag.WorkflowRunID(token.GetRunId()),
@@ -284,6 +291,11 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
284291
metrics.NamespaceTag(nsName),
285292
)
286293
metrics.WorkflowTaskHeartbeatTimeoutCounter.With(scope).Record(1)
294+
softassert.Sometimes(handler.logger).Debug("workflow task heartbeat timed out",
295+
tag.WorkflowNamespaceID(nsName),
296+
tag.WorkflowID(token.GetWorkflowId()),
297+
tag.WorkflowRunID(token.GetRunId()),
298+
)
287299
completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask)
288300
if err != nil {
289301
return nil, err
@@ -338,6 +350,12 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
338350
// and admitted updates are lost. Uncomment this check when durable admitted is implemented
339351
// or updates stay in the registry after WFT is failed.
340352
hasBufferedEventsOrMessages := ms.HasBufferedEvents() // || updateRegistry.HasOutgoingMessages(false)
353+
if hasBufferedEventsOrMessages {
354+
softassert.Sometimes(handler.logger).Debug("workflow has buffered events/messages",
355+
tag.WorkflowID(token.GetWorkflowId()),
356+
tag.WorkflowRunID(token.GetRunId()),
357+
)
358+
}
341359
if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil {
342360
wtFailedCause = newWorkflowTaskFailedCause(
343361
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY,
@@ -447,11 +465,14 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
447465
metrics.FailureTag(wtFailedCause.failedCause.String()),
448466
metrics.FirstAttemptTag(currentWorkflowTask.Attempt),
449467
)
450-
handler.logger.Info("Failing the workflow task.",
468+
softassert.Sometimes(handler.logger).Info("Failing the workflow task.",
451469
tag.Value(wtFailedCause.Message()),
452470
tag.WorkflowID(token.GetWorkflowId()),
453471
tag.WorkflowRunID(token.GetRunId()),
454-
tag.WorkflowNamespaceID(namespaceEntry.ID().String()))
472+
tag.WorkflowNamespaceID(namespaceEntry.ID().String()),
473+
tag.Attempt(currentWorkflowTask.Attempt),
474+
tag.Cause(wtFailedCause.failedCause.String()),
475+
)
455476
if currentWorkflowTask.Attempt > 1 && wtFailedCause.failedCause != enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND {
456477
// drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout.
457478
return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message())
@@ -620,6 +641,12 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
620641
// if updateErr resulted in TransactionSizeLimitError then fail workflow
621642
switch updateErr.(type) {
622643
case *persistence.TransactionSizeLimitError:
644+
softassert.Sometimes(handler.logger).Debug("workflow terminated due to size limit",
645+
tag.WorkflowID(token.GetWorkflowId()),
646+
tag.WorkflowRunID(token.GetRunId()),
647+
tag.Error(updateErr),
648+
)
649+
623650
// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
624651
// clears mutable state if error is returned
625652
ms, err = weContext.LoadMutableState(ctx, handler.shardContext)

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"go.temporal.io/server/common/payloads"
3535
"go.temporal.io/server/common/protocol"
3636
"go.temporal.io/server/common/searchattribute"
37+
"go.temporal.io/server/common/softassert"
3738
"go.temporal.io/server/common/tasktoken"
3839
"go.temporal.io/server/common/worker_versioning"
3940
"go.temporal.io/server/service/history/api"
@@ -239,14 +240,15 @@ func (handler *workflowTaskCompletedHandler) rejectUnprocessedUpdates(
239240
handler.effects)
240241

241242
if len(rejectedUpdateIDs) > 0 {
242-
handler.logger.Warn(
243+
softassert.Sometimes(handler.logger).Warn(
243244
"Workflow task completed w/o processing updates.",
244245
tag.WorkflowNamespaceID(wfKey.NamespaceID),
245246
tag.WorkflowID(wfKey.WorkflowID),
246247
tag.WorkflowRunID(wfKey.RunID),
247248
tag.WorkflowEventID(workflowTaskScheduledEventID),
248249
tag.NewStringTag("worker-identity", workerIdentity),
249250
tag.NewStringsTag("update-ids", rejectedUpdateIDs),
251+
tag.NewInt("rejected-count", len(rejectedUpdateIDs)),
250252
)
251253
}
252254

@@ -372,12 +374,21 @@ func (handler *workflowTaskCompletedHandler) handleMessage(
372374
}
373375
if upd == nil {
374376
// Update was not found in the registry and can't be resurrected.
377+
softassert.Sometimes(handler.logger).Debug("update lost and cannot be resurrected",
378+
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
379+
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
380+
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId))
375381
return handler.failWorkflowTask(
376382
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE,
377383
serviceerror.NewNotFoundf("update %s wasn't found on the server. This is most likely a transient error which will be resolved automatically by retries", message.ProtocolInstanceId))
378384
}
379385

380386
if err := upd.OnProtocolMessage(message, workflow.WithEffects(handler.effects, handler.mutableState)); err != nil {
387+
softassert.Sometimes(handler.logger).Debug("update failed",
388+
tag.WorkflowID(handler.mutableState.GetExecutionInfo().WorkflowId),
389+
tag.WorkflowRunID(handler.mutableState.GetExecutionState().RunId),
390+
tag.NewStringTag("protocol-instance-id", message.ProtocolInstanceId),
391+
tag.NewStringTag("error", err.Error()))
381392
return handler.failWorkflowTaskOnInvalidArgument(
382393
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_UPDATE_WORKFLOW_EXECUTION_MESSAGE, err)
383394
}

service/history/queues/dlq_writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.temporal.io/server/common/metrics"
1212
"go.temporal.io/server/common/namespace"
1313
"go.temporal.io/server/common/persistence"
14+
"go.temporal.io/server/common/softassert"
1415
"go.temporal.io/server/service/history/tasks"
1516
)
1617

@@ -118,11 +119,12 @@ func (q *DLQWriter) WriteTaskToDLQ(
118119
} else {
119120
namespaceTag = tag.WorkflowNamespace(string(ns.Name()))
120121
}
121-
q.logger.Warn("Task enqueued to DLQ",
122+
softassert.Sometimes(q.logger).Warn("Task enqueued to DLQ",
122123
tag.DLQMessageID(resp.Metadata.ID),
123124
tag.SourceCluster(sourceCluster),
124125
tag.TargetCluster(targetCluster),
125126
tag.TaskType(task.GetType()),
127+
tag.NewStringTag("task-category", task.GetCategory().Name()),
126128
namespaceTag,
127129
)
128130
return nil

service/history/queues/executable.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.temporal.io/server/common/log/tag"
2929
"go.temporal.io/server/common/metrics"
3030
"go.temporal.io/server/common/namespace"
31+
"go.temporal.io/server/common/softassert"
3132
ctasks "go.temporal.io/server/common/tasks"
3233
"go.temporal.io/server/common/telemetry"
3334
"go.temporal.io/server/common/util"
@@ -440,6 +441,7 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
440441
if errors.As(err, &resourceExhaustedErr) {
441442
switch resourceExhaustedErr.Cause { //nolint:exhaustive
442443
case enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW:
444+
softassert.Sometimes(e.logger).Debug("task throttled due to busy workflow", tag.TaskType(e.GetType()))
443445
err = consts.ErrResourceExhaustedBusyWorkflow
444446
case enumspb.RESOURCE_EXHAUSTED_CAUSE_APS_LIMIT:
445447
err = consts.ErrResourceExhaustedAPSLimit
@@ -482,7 +484,14 @@ func (e *executableImpl) isExpectedRetryableError(err error) (isRetryable bool,
482484
func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
483485
var terr MaybeTerminalTaskError
484486
if errors.As(err, &terr) {
485-
return terr.IsTerminalTaskError()
487+
isTerminal := terr.IsTerminalTaskError()
488+
if isTerminal {
489+
softassert.Sometimes(e.logger).Debug("terminal task error detected",
490+
tag.TaskType(e.GetType()),
491+
tag.Error(err),
492+
)
493+
}
494+
return isTerminal
486495
}
487496

488497
if _, isDataLoss := err.(*serviceerror.DataLoss); isDataLoss {
@@ -492,8 +501,13 @@ func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
492501
isInternalError := common.IsInternalError(err)
493502
if isInternalError {
494503
metrics.TaskInternalErrorCounter.With(e.metricsHandler).Record(1)
504+
softassert.Sometimes(e.logger).Debug("internal non-retryable task processing error",
505+
tag.TaskType(e.GetType()),
506+
tag.Error(err),
507+
)
495508
// Only DQL/drop when configured to
496-
return e.dlqInternalErrors()
509+
shouldDLQ := e.dlqInternalErrors()
510+
return shouldDLQ
497511
}
498512

499513
return false
@@ -538,11 +552,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
538552
tag.Attempt(int32(attempt)),
539553
tag.UnexpectedErrorAttempts(int32(e.unexpectedErrorAttempts)),
540554
tag.LifeCycleProcessingFailed,
555+
tag.NewStringTag("task-category", e.GetCategory().Name()),
541556
)
542557
if attempt > taskCriticalLogMetricAttempts {
543-
logger.Error("Critical error processing task, retrying.", tag.OperationCritical)
558+
softassert.Sometimes(logger).Error("Critical error processing task, retrying.", tag.OperationCritical)
544559
} else {
545-
logger.Warn("Fail to process task")
560+
softassert.Sometimes(logger).Warn("Fail to process task")
546561
}
547562

548563
if e.isUnexpectedNonRetryableError(err) {

service/history/replication/executable_task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func (e *ExecutableTaskImpl) Resend(
340340
) (bool, error) {
341341
remainingAttempt--
342342
if remainingAttempt < 0 {
343-
e.Logger.Error("resend history attempts exceeded",
343+
softassert.Sometimes(e.Logger).Error("resend history attempts exceeded",
344344
tag.WorkflowNamespaceID(retryErr.NamespaceId),
345345
tag.WorkflowID(retryErr.WorkflowId),
346346
tag.WorkflowRunID(retryErr.RunId),
@@ -768,7 +768,7 @@ func (e *ExecutableTaskImpl) MarkPoisonPill() error {
768768
taskInfo := e.ReplicationTask().GetRawTaskInfo()
769769

770770
if e.markPoisonPillAttempts >= MarkPoisonPillMaxAttempts {
771-
e.Logger.Error("MarkPoisonPill reached max attempts",
771+
softassert.Sometimes(e.Logger).Error("MarkPoisonPill reached max attempts",
772772
tag.SourceCluster(e.SourceClusterName()),
773773
tag.ReplicationTask(taskInfo),
774774
)

service/history/timer_queue_active_task_executor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.temporal.io/server/common/primitives/timestamp"
2525
"go.temporal.io/server/common/priorities"
2626
"go.temporal.io/server/common/resource"
27+
"go.temporal.io/server/common/softassert"
2728
"go.temporal.io/server/service/history/configs"
2829
"go.temporal.io/server/service/history/consts"
2930
"go.temporal.io/server/service/history/deletemanager"
@@ -536,7 +537,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
536537
}
537538

538539
if task.Attempt < activityInfo.Attempt || activityInfo.StartedEventId != common.EmptyEventID {
539-
t.logger.Info("Duplicate activity retry timer task",
540+
softassert.Sometimes(t.logger).Info("Duplicate activity retry timer task",
540541
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId),
541542
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
542543
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId),
@@ -909,6 +910,12 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag(
909910
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
910911
metrics.HeartbeatTimeoutCounter.With(metricsScope).Record(1)
911912
}
913+
914+
softassert.Sometimes(t.logger).Debug("timer queue task timed out",
915+
tag.NewStringTag("timer-type", timerType.String()),
916+
tag.Operation(operation),
917+
tag.Attempt(taskAttempt),
918+
)
912919
}
913920

914921
func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules(

service/history/workflow/update/update.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (u *Update) abort(
251251
return
252252
}
253253

254-
u.instrumentation.countAborted(reason)
254+
u.instrumentation.countAborted(u.id, reason)
255255
prevState := u.setState(stateProvisionallyAborted)
256256

257257
effects.OnAfterCommit(func(context.Context) {

service/history/workflow/update/util.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,13 @@ func (i *instrumentation) countTooMany() {
6161
i.oneOf(metrics.WorkflowExecutionUpdateTooMany.Name())
6262
}
6363

64-
func (i *instrumentation) countAborted(reason AbortReason) {
64+
func (i *instrumentation) countAborted(updateID string, reason AbortReason) {
6565
i.metrics.Counter(metrics.WorkflowExecutionUpdateAborted.Name()).
6666
Record(1, metrics.ReasonTag(metrics.ReasonString(reason.String())))
67+
softassert.Sometimes(i.log).Debug("update aborted",
68+
tag.NewStringTag("reason", reason.String()),
69+
tag.NewStringTag("update-id", updateID),
70+
)
6771
}
6872

6973
func (i *instrumentation) countContinueAsNewSuggestions() {
@@ -99,8 +103,7 @@ func (i *instrumentation) oneOf(counterName string) {
99103
}
100104

101105
func (i *instrumentation) stateChange(updateID string, from, to state) {
102-
softassert.Sometimes(
103-
i.log,
106+
softassert.Sometimes(i.log).Debug(
104107
"update state change",
105108
tag.ComponentWorkflowUpdate,
106109
tag.NewStringTag("update-id", updateID),

0 commit comments

Comments
 (0)