|
| 1 | +package operations |
| 2 | + |
| 3 | +import ( |
| 4 | + "errors" |
| 5 | + "fmt" |
| 6 | + |
| 7 | + "github.com/avast/retry-go/v4" |
| 8 | +) |
| 9 | + |
| 10 | +var ErrNotSerializable = errors.New("data cannot be safely written to disk without data lost, " + |
| 11 | + "avoid type that can't be serialized") |
| 12 | + |
| 13 | +// ExecuteConfig is the configuration for the ExecuteOperation function. |
| 14 | +type ExecuteConfig[IN, DEP any] struct { |
| 15 | + retryConfig RetryConfig[IN, DEP] |
| 16 | +} |
| 17 | + |
| 18 | +type ExecuteOption[IN, DEP any] func(*ExecuteConfig[IN, DEP]) |
| 19 | + |
| 20 | +type RetryConfig[IN, DEP any] struct { |
| 21 | + // DisableRetry disables the retry mechanism if set to true. |
| 22 | + DisableRetry bool |
| 23 | + // InputHook is a function that returns an updated input before retrying the operation. |
| 24 | + // The operation when retried will use the input returned by this function. |
| 25 | + // This is useful for scenarios like updating the gas limit. |
| 26 | + // This will be ignored if DisableRetry is set to true. |
| 27 | + InputHook func(input IN, deps DEP) IN |
| 28 | +} |
| 29 | + |
| 30 | +// WithRetryConfig is an ExecuteOption that sets the retry configuration. |
| 31 | +func WithRetryConfig[IN, DEP any](config RetryConfig[IN, DEP]) ExecuteOption[IN, DEP] { |
| 32 | + return func(c *ExecuteConfig[IN, DEP]) { |
| 33 | + c.retryConfig = config |
| 34 | + } |
| 35 | +} |
| 36 | + |
| 37 | +// ExecuteOperation executes an operation with the given input and dependencies. |
| 38 | +// Execution will return the previous successful execution result and skip execution if there was a |
| 39 | +// previous successful run found in the Reports. |
| 40 | +// If previous unsuccessful execution was found, the execution will not be skipped. |
| 41 | +// |
| 42 | +// Note: |
| 43 | +// Operations that were skipped will not be added to the reporter. |
| 44 | +// |
| 45 | +// Retry: |
| 46 | +// By default, it retries the operation up to 10 times with exponential backoff if it fails. |
| 47 | +// Use WithRetryConfig to customize the retry behavior. |
| 48 | +// To cancel the retry early, return an error with NewUnrecoverableError. |
| 49 | +// |
| 50 | +// Input & Output: |
| 51 | +// The input and output must be JSON serializable. If the input is not serializable, it will return an error. |
| 52 | +// To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. |
| 53 | +// IsSerializable can be used to check if the input or output is serializable. |
| 54 | +func ExecuteOperation[IN, OUT, DEP any]( |
| 55 | + b Bundle, |
| 56 | + operation *Operation[IN, OUT, DEP], |
| 57 | + deps DEP, |
| 58 | + input IN, |
| 59 | + opts ...ExecuteOption[IN, DEP], |
| 60 | +) (Report[IN, OUT], error) { |
| 61 | + if !IsSerializable(b.Logger, input) { |
| 62 | + return Report[IN, OUT]{}, fmt.Errorf("operation %s input: %w", operation.def.ID, ErrNotSerializable) |
| 63 | + } |
| 64 | + |
| 65 | + if previousReport, found := loadPreviousSuccessfulReport[IN, OUT](b, operation.def, input); found { |
| 66 | + b.Logger.Infow("Operation already executed. Returning previous result", "id", operation.def.ID, |
| 67 | + "version", operation.def.Version, "description", operation.def.Description) |
| 68 | + return previousReport, nil |
| 69 | + } |
| 70 | + |
| 71 | + executeConfig := &ExecuteConfig[IN, DEP]{retryConfig: RetryConfig[IN, DEP]{}} |
| 72 | + for _, opt := range opts { |
| 73 | + opt(executeConfig) |
| 74 | + } |
| 75 | + |
| 76 | + var output OUT |
| 77 | + var err error |
| 78 | + |
| 79 | + if executeConfig.retryConfig.DisableRetry { |
| 80 | + output, err = operation.execute(b, deps, input) |
| 81 | + } else { |
| 82 | + var inputTemp = input |
| 83 | + output, err = retry.DoWithData(func() (OUT, error) { |
| 84 | + return operation.execute(b, deps, inputTemp) |
| 85 | + }, retry.OnRetry(func(attempt uint, err error) { |
| 86 | + b.Logger.Infow("Operation failed. Retrying...", |
| 87 | + "operation", operation.def.ID, "attempt", attempt, "error", err) |
| 88 | + |
| 89 | + if executeConfig.retryConfig.InputHook != nil { |
| 90 | + inputTemp = executeConfig.retryConfig.InputHook(inputTemp, deps) |
| 91 | + } |
| 92 | + })) |
| 93 | + } |
| 94 | + |
| 95 | + if err == nil && !IsSerializable(b.Logger, output) { |
| 96 | + return Report[IN, OUT]{}, fmt.Errorf("operation %s output: %w", operation.def.ID, ErrNotSerializable) |
| 97 | + } |
| 98 | + |
| 99 | + report := NewReport(operation.def, input, output, err) |
| 100 | + err = b.reporter.AddReport(genericReport(report)) |
| 101 | + if err != nil { |
| 102 | + return Report[IN, OUT]{}, err |
| 103 | + } |
| 104 | + if report.Err != nil { |
| 105 | + return report, report.Err |
| 106 | + } |
| 107 | + return report, nil |
| 108 | +} |
| 109 | + |
| 110 | +// ExecuteSequence executes a Sequence and returns a SequenceReport. |
| 111 | +// The SequenceReport contains a report for the Sequence and also the execution reports which are all |
| 112 | +// the operations that were executed as part of this sequence. |
| 113 | +// The latter is useful when we want to return all the executed reports to the changeset output. |
| 114 | +// Execution will return the previous successful execution result and skip execution if there was a |
| 115 | +// previous successful run found in the Reports. |
| 116 | +// If previous unsuccessful execution was found, the execution will not be skipped. |
| 117 | +// |
| 118 | +// Note: |
| 119 | +// Sequences or Operations that were skipped will not be added to the reporter. |
| 120 | +// The ExecutionReports do not include Sequences or Operations that were skipped. |
| 121 | +// |
| 122 | +// Input & Output: |
| 123 | +// The input and output must be JSON serializable. If the input is not serializable, it will return an error. |
| 124 | +// To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. |
| 125 | +// IsSerializable can be used to check if the input or output is serializable. |
| 126 | +func ExecuteSequence[IN, OUT, DEP any]( |
| 127 | + b Bundle, sequence *Sequence[IN, OUT, DEP], deps DEP, input IN, |
| 128 | +) (SequenceReport[IN, OUT], error) { |
| 129 | + if !IsSerializable(b.Logger, input) { |
| 130 | + return SequenceReport[IN, OUT]{}, fmt.Errorf("sequence %s input: %w", sequence.def.ID, ErrNotSerializable) |
| 131 | + } |
| 132 | + |
| 133 | + if previousReport, found := loadPreviousSuccessfulReport[IN, OUT](b, sequence.def, input); found { |
| 134 | + executionReports, err := b.reporter.GetExecutionReports(previousReport.ID) |
| 135 | + if err != nil { |
| 136 | + return SequenceReport[IN, OUT]{}, err |
| 137 | + } |
| 138 | + b.Logger.Infow("Sequence already executed. Returning previous result", "id", sequence.def.ID, |
| 139 | + "version", sequence.def.Version, "description", sequence.def.Description) |
| 140 | + return SequenceReport[IN, OUT]{previousReport, executionReports}, nil |
| 141 | + } |
| 142 | + |
| 143 | + b.Logger.Infow("Executing sequence", "id", sequence.def.ID, |
| 144 | + "version", sequence.def.Version, "description", sequence.def.Description) |
| 145 | + recentReporter := NewRecentMemoryReporter(b.reporter) |
| 146 | + newBundle := Bundle{ |
| 147 | + Logger: b.Logger, |
| 148 | + GetContext: b.GetContext, |
| 149 | + reporter: recentReporter, |
| 150 | + reportHashCache: b.reportHashCache, |
| 151 | + } |
| 152 | + ret, err := sequence.handler(newBundle, deps, input) |
| 153 | + if errors.Is(err, ErrNotSerializable) { |
| 154 | + return SequenceReport[IN, OUT]{}, err |
| 155 | + } |
| 156 | + |
| 157 | + if err == nil && !IsSerializable(b.Logger, ret) { |
| 158 | + return SequenceReport[IN, OUT]{}, fmt.Errorf("sequence %s output: %w", sequence.def.ID, ErrNotSerializable) |
| 159 | + } |
| 160 | + |
| 161 | + recentReports := recentReporter.GetRecentReports() |
| 162 | + childReports := make([]string, 0, len(recentReports)) |
| 163 | + for _, rep := range recentReports { |
| 164 | + childReports = append(childReports, rep.ID) |
| 165 | + } |
| 166 | + |
| 167 | + report := NewReport( |
| 168 | + sequence.def, |
| 169 | + input, |
| 170 | + ret, |
| 171 | + err, |
| 172 | + childReports..., |
| 173 | + ) |
| 174 | + |
| 175 | + err = b.reporter.AddReport(genericReport(report)) |
| 176 | + if err != nil { |
| 177 | + return SequenceReport[IN, OUT]{}, err |
| 178 | + } |
| 179 | + executionReports, err := b.reporter.GetExecutionReports(report.ID) |
| 180 | + if err != nil { |
| 181 | + return SequenceReport[IN, OUT]{}, err |
| 182 | + } |
| 183 | + if report.Err != nil { |
| 184 | + return SequenceReport[IN, OUT]{report, executionReports}, report.Err |
| 185 | + } |
| 186 | + return SequenceReport[IN, OUT]{report, executionReports}, nil |
| 187 | +} |
| 188 | + |
| 189 | +// NewUnrecoverableError creates an error that indicates an unrecoverable error. |
| 190 | +// If this error is returned inside an operation, the operation will no longer retry. |
| 191 | +// This allows the operation to fail fast if it encounters an unrecoverable error. |
| 192 | +func NewUnrecoverableError(err error) error { |
| 193 | + return retry.Unrecoverable(err) |
| 194 | +} |
| 195 | + |
| 196 | +func loadPreviousSuccessfulReport[IN, OUT any]( |
| 197 | + b Bundle, def Definition, input IN, |
| 198 | +) (Report[IN, OUT], bool) { |
| 199 | + prevReports, err := b.reporter.GetReports() |
| 200 | + if err != nil { |
| 201 | + b.Logger.Errorw("Failed to get reports", "error", err) |
| 202 | + return Report[IN, OUT]{}, false |
| 203 | + } |
| 204 | + currentHash, err := constructUniqueHashFrom(b.reportHashCache, def, input) |
| 205 | + if err != nil { |
| 206 | + b.Logger.Errorw("Failed to construct unique hash", "error", err) |
| 207 | + return Report[IN, OUT]{}, false |
| 208 | + } |
| 209 | + |
| 210 | + for _, report := range prevReports { |
| 211 | + // Check if operation/sequence was run previously and return the report if successful |
| 212 | + reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input) |
| 213 | + if err != nil { |
| 214 | + b.Logger.Errorw("Failed to construct unique hash for previous report", "error", err) |
| 215 | + continue |
| 216 | + } |
| 217 | + if reportHash == currentHash && report.Err == nil { |
| 218 | + typedReport, ok := typeReport[IN, OUT](report) |
| 219 | + if !ok { |
| 220 | + b.Logger.Debugw(fmt.Sprintf("Previous %s execution found but couldn't find its matching Report", def.ID), "report_id", report.ID) |
| 221 | + continue |
| 222 | + } |
| 223 | + b.Logger.Debugw(fmt.Sprintf("Previous %s execution found. Returning its result from Report storage", def.ID), "report_id", report.ID) |
| 224 | + return typedReport, true |
| 225 | + } |
| 226 | + } |
| 227 | + // No previous execution was found |
| 228 | + return Report[IN, OUT]{}, false |
| 229 | +} |
0 commit comments