@@ -105,7 +105,7 @@ func ExecuteOperation[IN, OUT, DEP any](
105105 return Report [IN , OUT ]{}, fmt .Errorf ("operation %s input: %w" , operation .def .ID , ErrNotSerializable )
106106 }
107107
108- if previousReport , found := loadPreviousSuccessfulReport [IN , OUT ](b , operation .def , input ); found {
108+ if previousReport , ok := loadPreviousSuccessfulReport [IN , OUT ](b , operation .def , input ); ok {
109109 b .Logger .Infow ("Operation already executed. Returning previous result" , "id" , operation .def .ID ,
110110 "version" , operation .def .Version , "description" , operation .def .Description )
111111
@@ -123,29 +123,7 @@ func ExecuteOperation[IN, OUT, DEP any](
123123 var err error
124124
125125 if executeConfig .retryConfig .Enabled {
126- var inputTemp = input
127-
128- // Generate the configurable options for the retry
129- retryOpts := executeConfig .retryConfig .Policy .options ()
130- // Use the operation context in the retry
131- retryOpts = append (retryOpts , retry .Context (b .GetContext ()))
132- // Append the retry logic which will log the retry and attempt to transform the input
133- // if the user provided a custom input hook.
134- retryOpts = append (retryOpts , retry .OnRetry (func (attempt uint , err error ) {
135- b .Logger .Infow ("Operation failed. Retrying..." ,
136- "operation" , operation .def .ID , "attempt" , attempt , "error" , err )
137-
138- if executeConfig .retryConfig .InputHook != nil {
139- inputTemp = executeConfig .retryConfig .InputHook (attempt , err , inputTemp , deps )
140- }
141- }))
142-
143- output , err = retry .DoWithData (
144- func () (OUT , error ) {
145- return operation .execute (b , deps , inputTemp )
146- },
147- retryOpts ... ,
148- )
126+ output , err = executeWithRetry (b , operation , deps , input , executeConfig )
149127 } else {
150128 output , err = operation .execute (b , deps , input )
151129 }
@@ -166,6 +144,113 @@ func ExecuteOperation[IN, OUT, DEP any](
166144 return report , nil
167145}
168146
147+ // ExecuteOperationN executes the given operation multiple n times with the given input and dependencies.
148+ // Execution will return the previous successful execution results and skip execution if there were
149+ // previous successful runs found in the Reports.
150+ // executionSeriesID is used to identify the multiple executions as a single unit.
151+ // It is important to use a unique executionSeriesID for different sets of multiple executions.
152+ func ExecuteOperationN [IN , OUT , DEP any ](
153+ b Bundle , operation * Operation [IN , OUT , DEP ], deps DEP , input IN , seriesID string , n uint ,
154+ opts ... ExecuteOption [IN , DEP ],
155+ ) ([]Report [IN , OUT ], error ) {
156+ if ! IsSerializable (b .Logger , input ) {
157+ return []Report [IN , OUT ]{}, fmt .Errorf ("operation %s input: %w" , operation .def .ID , ErrNotSerializable )
158+ }
159+
160+ results , ok := loadSuccessfulExecutionSeriesReports [IN , OUT ](b , operation .def , input , seriesID )
161+ resultsLen := uint (len (results ))
162+ if ok {
163+ // if there are more reports than n, we return only the first n reports
164+ if resultsLen >= n {
165+ b .Logger .Infow ("Operations already executed in an execution series. Returning previous results" , "id" , operation .def .ID ,
166+ "version" , operation .def .Version , "description" , operation .def .Description , "executionSeriesID" , seriesID )
167+
168+ return results [:n ], nil
169+ }
170+ }
171+ remainingTimesToRun := n - resultsLen
172+
173+ b .Logger .Infow ("Executing operation multiple times" ,
174+ "executionSeriesID" , seriesID ,
175+ "n" , n ,
176+ "remainingTimesToRun" , remainingTimesToRun )
177+
178+ executeConfig := & ExecuteConfig [IN , DEP ]{
179+ retryConfig : newDisabledRetryConfig [IN , DEP ](),
180+ }
181+ for _ , opt := range opts {
182+ opt (executeConfig )
183+ }
184+
185+ order := resultsLen
186+ for range remainingTimesToRun {
187+ var output OUT
188+ var err error
189+
190+ if executeConfig .retryConfig .Enabled {
191+ output , err = executeWithRetry (b , operation , deps , input , executeConfig )
192+ } else {
193+ output , err = operation .execute (b , deps , input )
194+ }
195+
196+ if err == nil && ! IsSerializable (b .Logger , output ) {
197+ return []Report [IN , OUT ]{}, fmt .Errorf ("operation %s output: %w" , operation .def .ID , ErrNotSerializable )
198+ }
199+
200+ report := NewReport (operation .def , input , output , err )
201+ report .ExecutionSeries = & ExecutionSeries {
202+ ID : seriesID ,
203+ Order : order ,
204+ }
205+ order ++
206+ if err = b .reporter .AddReport (genericReport (report )); err != nil {
207+ return []Report [IN , OUT ]{}, err
208+ }
209+
210+ if report .Err != nil {
211+ return []Report [IN , OUT ]{}, report .Err
212+ }
213+
214+ results = append (results , report )
215+ }
216+
217+ return results , nil
218+ }
219+
220+ func executeWithRetry [IN , OUT , DEP any ](
221+ b Bundle ,
222+ operation * Operation [IN , OUT , DEP ],
223+ deps DEP ,
224+ input IN ,
225+ executeConfig * ExecuteConfig [IN , DEP ],
226+ ) (OUT , error ) {
227+ var inputTemp = input
228+
229+ // Generate the configurable options for the retry
230+ retryOpts := executeConfig .retryConfig .Policy .options ()
231+ // Use the operation context in the retry
232+ retryOpts = append (retryOpts , retry .Context (b .GetContext ()))
233+ // Append the retry logic which will log the retry and attempt to transform the input
234+ // if the user provided a custom input hook.
235+ retryOpts = append (retryOpts , retry .OnRetry (func (attempt uint , err error ) {
236+ b .Logger .Infow ("Operation failed. Retrying..." ,
237+ "operation" , operation .def .ID , "attempt" , attempt , "error" , err )
238+
239+ if executeConfig .retryConfig .InputHook != nil {
240+ inputTemp = executeConfig .retryConfig .InputHook (attempt , err , inputTemp , deps )
241+ }
242+ }))
243+
244+ output , err := retry .DoWithData (
245+ func () (OUT , error ) {
246+ return operation .execute (b , deps , inputTemp )
247+ },
248+ retryOpts ... ,
249+ )
250+
251+ return output , err
252+ }
253+
169254// ExecuteSequence executes a Sequence and returns a SequenceReport.
170255// The SequenceReport contains a report for the Sequence and also the execution reports which are all
171256// the operations that were executed as part of this sequence.
@@ -189,7 +274,7 @@ func ExecuteSequence[IN, OUT, DEP any](
189274 return SequenceReport [IN , OUT ]{}, fmt .Errorf ("sequence %s input: %w" , sequence .def .ID , ErrNotSerializable )
190275 }
191276
192- if previousReport , found := loadPreviousSuccessfulReport [IN , OUT ](b , sequence .def , input ); found {
277+ if previousReport , ok := loadPreviousSuccessfulReport [IN , OUT ](b , sequence .def , input ); ok {
193278 executionReports , err := b .reporter .GetExecutionReports (previousReport .ID )
194279 if err != nil {
195280 return SequenceReport [IN , OUT ]{}, err
@@ -291,3 +376,56 @@ func loadPreviousSuccessfulReport[IN, OUT any](
291376 // No previous execution was found
292377 return Report [IN , OUT ]{}, false
293378}
379+
380+ // loadSuccessfulExecutionSeriesReports loads all successful reports for an operation in an execution series.
381+ func loadSuccessfulExecutionSeriesReports [IN , OUT any ](
382+ b Bundle , def Definition , input IN , seriesID string ,
383+ ) ([]Report [IN , OUT ], bool ) {
384+ prevReports , err := b .reporter .GetReports ()
385+ if err != nil {
386+ b .Logger .Errorw ("Failed to get reports" , "error" , err )
387+ return []Report [IN , OUT ]{}, false
388+ }
389+ currentHash , err := constructUniqueHashFrom (b .reportHashCache , def , input )
390+ if err != nil {
391+ b .Logger .Errorw ("Failed to construct unique hash" , "error" , err )
392+ return []Report [IN , OUT ]{}, false
393+ }
394+
395+ var foundReports []Report [IN , OUT ]
396+ for _ , report := range prevReports {
397+ // if the report is not part of the same execution series, skip it
398+ if report .ExecutionSeries == nil || report .ExecutionSeries .ID != seriesID {
399+ continue
400+ }
401+ reportHash , err := constructUniqueHashFrom (b .reportHashCache , report .Def , report .Input )
402+ if err != nil {
403+ b .Logger .Errorw ("Failed to construct unique hash for previous report" , "error" , err )
404+ continue
405+ }
406+ if reportHash == currentHash && report .Err == nil {
407+ typedReport , ok := typeReport [IN , OUT ](report )
408+ if ! ok {
409+ b .Logger .Debugw (fmt .Sprintf ("Previous %s execution found but couldn't find its matching Report" , def .ID ), "report_id" , report .ID )
410+ continue
411+ }
412+
413+ b .Logger .Debugw (fmt .Sprintf ("Previous %s execution found" , def .ID ), "report_id" , report .ID )
414+
415+ foundReports = append (foundReports , typedReport )
416+ }
417+ }
418+
419+ b .Logger .Infof ("Found %d reports for ExecutionSeriesID %q" , len (foundReports ), seriesID )
420+
421+ if len (foundReports ) == 0 {
422+ return []Report [IN , OUT ]{}, false
423+ }
424+
425+ results := make ([]Report [IN , OUT ], len (foundReports ))
426+ for _ , foundReport := range foundReports {
427+ results [foundReport .ExecutionSeries .Order ] = foundReport
428+ }
429+
430+ return results , true
431+ }
0 commit comments