diff --git a/go.mod b/go.mod index b317e09bc..4d9c91a47 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.3.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.53.0 + go.temporal.io/api v1.55.1-0.20251017163919-d527807d0d4e golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 6b93a5410..d25fe5104 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.53.0 h1:6vAFpXaC584AIELa6pONV56MTpkm4Ha7gPWL2acNAjo= -go.temporal.io/api v1.53.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.55.1-0.20251017163919-d527807d0d4e h1:8Y8p3tBzobAYO3KjuoiGXjDxeKlPsrEjjDRfLlB7O7c= +go.temporal.io/api v1.55.1-0.20251017163919-d527807d0d4e/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/client.go b/internal/client.go index 066a071c7..5b32851c5 100644 --- a/internal/client.go +++ b/internal/client.go @@ -40,6 +40,9 @@ const ( // QueryTypeWorkflowMetadata is the query name for the workflow metadata. QueryTypeWorkflowMetadata string = "__temporal_workflow_metadata" + + // SignalTypeSuggestContinueAsNew is the signal name to set ContinueAsNewSuggested=true in the workflow env. + SignalTypeSuggestContinueAsNew = "__suggest_continue_as_new" ) type ( diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 2bbeabc97..0dc791b1c 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1199,6 +1199,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( // Update workflow info fields weh.workflowInfo.currentHistoryLength = int(event.EventId) weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() + weh.workflowInfo.continueAsNewSuggestedReason = convertContinueAsNewSuggestedReason( + event.GetWorkflowTaskStartedEventAttributes().GetContinueAsNewSuggestedReason(), + ) weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes()) // Reset the counter on command helper used for generating ID for commands weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId()) @@ -1751,9 +1754,35 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *lo func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled( attributes *historypb.WorkflowExecutionSignaledEventAttributes, ) error { + switch attributes.GetSignalName() { + case SignalTypeSuggestContinueAsNew: + weh.workflowInfo.continueAsNewSuggested = true + var reason enumspb.ContinueAsNewSuggestedReason + err := weh.dataConverter.FromPayload(attributes.GetInput().GetPayloads()[0], &reason) + if err != nil { + return fmt.Errorf("unable to convert continue-as-new suggested reason: %w", err) + } + weh.workflowInfo.continueAsNewSuggestedReason = convertContinueAsNewSuggestedReason(reason) + } return weh.signalHandler(attributes.GetSignalName(), attributes.Input, attributes.Header) } +// TODO(carlydf): move this helper somewhere appropriate +func convertContinueAsNewSuggestedReason(reason enumspb.ContinueAsNewSuggestedReason) ContinueAsNewSuggestedReason { + switch reason { + case enumspb.CONTINUE_AS_NEW_SUGGESTED_REASON_UNSPECIFIED: + return ContinueAsNewSuggestedReasonUnspecified + case enumspb.CONTINUE_AS_NEW_SUGGESTED_REASON_UPDATE_REGISTRY_TOO_LARGE: + return ContinueAsNewSuggestedReasonUpdateRegistryTooLarge + case enumspb.CONTINUE_AS_NEW_SUGGESTED_REASON_HISTORY_SIZE_TOO_LARGE: + return ContinueAsNewSuggestedReasonHistorySizeTooLarge + case enumspb.CONTINUE_AS_NEW_SUGGESTED_REASON_WORKER_DEPLOYMENT_VERSION_CHANGED: + return ContinueAsNewSuggestedReasonWorkerDeploymentVersionChanged + default: + return ContinueAsNewSuggestedReasonUnspecified + } +} + func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *historypb.HistoryEvent) error { attributes := event.GetStartChildWorkflowExecutionFailedEventAttributes() childWorkflowID := attributes.GetWorkflowId() diff --git a/internal/workflow.go b/internal/workflow.go index 65622851a..f27bd7ff4 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -69,6 +69,39 @@ const ( VersioningBehaviorAutoUpgrade ) +// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReason] +type ContinueAsNewSuggestedReason int + +const ( + // ContinueAsNewSuggestedReasonUnspecified - ContinueAsNewSuggestedReason unknown. + // Reason will be Unspecified if the server is not configured to suggest continue as new, + // if the server is not configured to provide a reason for continue as new suggested, or + // if continue as new is not suggested. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonUnspecified] + ContinueAsNewSuggestedReasonUnspecified ContinueAsNewSuggestedReason = iota + + // ContinueAsNewSuggestedReasonHistorySizeTooLarge - Workflow History size is getting too large. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonHistorySizeTooLarge] + ContinueAsNewSuggestedReasonHistorySizeTooLarge + + // ContinueAsNewSuggestedReasonUpdateRegistryTooLarge - Workflow Update registry is getting too large. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonUpdateRegistryTooLarge] + ContinueAsNewSuggestedReasonUpdateRegistryTooLarge + + // ContinueAsNewSuggestedReasonWorkerDeploymentVersionChanged - Workflow's Worker Deployment has a new Ramping + // Version or Current Version, and the workflow's Versioning Behavior is PinnedUntilContinueAsNew. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonWorkerDeploymentVersionChanged] + ContinueAsNewSuggestedReasonWorkerDeploymentVersionChanged +) + // NexusOperationCancellationType specifies what action should be taken for a Nexus operation when the // caller is cancelled. // @@ -1350,9 +1383,10 @@ type WorkflowInfo struct { // this worker currentTaskBuildID string - continueAsNewSuggested bool - currentHistorySize int - currentHistoryLength int + continueAsNewSuggested bool + continueAsNewSuggestedReason ContinueAsNewSuggestedReason + currentHistorySize int + currentHistoryLength int // currentRunID is the current run ID of the workflow task, deterministic over reset currentRunID string } @@ -1404,6 +1438,12 @@ func (wInfo *WorkflowInfo) GetContinueAsNewSuggested() bool { return wInfo.continueAsNewSuggested } +// GetContinueAsNewSuggestedReason returns the reason for ContinueAsNewSuggested if one exists. +// This value may change throughout the life of the workflow. +func (wInfo *WorkflowInfo) GetContinueAsNewSuggestedReason() ContinueAsNewSuggestedReason { + return wInfo.continueAsNewSuggestedReason +} + // GetWorkflowInfo extracts info of a current workflow from a context. // // Exposed as: [go.temporal.io/sdk/workflow.GetInfo] diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 15b690744..a880b48b5 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -53,6 +53,11 @@ type ( // // NOTE: Experimental AwaitOptions = internal.AwaitOptions + + // ContinueAsNewSuggestedReason is the reason for ContinueAsNewSuggested + // + // NOTE: Experimental + ContinueAsNewSuggestedReason = internal.ContinueAsNewSuggestedReason ) // Await blocks the calling thread until condition() returns true.