|
22 | 22 | import java.util.function.Function; |
23 | 23 | import java.util.function.IntFunction; |
24 | 24 | import java.util.logging.Logger; |
25 | | -import java.util.stream.Collectors; |
26 | 25 |
|
27 | 26 | final class TaskOrchestrationExecutor { |
28 | 27 |
|
@@ -511,10 +510,13 @@ private void handleTaskCompleted(HistoryEvent e) { |
511 | 510 | rawResult != null ? rawResult : "(null)")); |
512 | 511 |
|
513 | 512 | } |
514 | | - |
515 | | - Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); |
516 | 513 | CompletableTask task = record.getTask(); |
517 | | - task.complete(result); |
| 514 | + try { |
| 515 | + Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); |
| 516 | + task.complete(result); |
| 517 | + } catch (Exception ex) { |
| 518 | + task.completeExceptionally(ex); |
| 519 | + } |
518 | 520 | } |
519 | 521 |
|
520 | 522 | private void handleTaskFailed(HistoryEvent e) { |
@@ -558,11 +560,15 @@ private void handleEventRaised(HistoryEvent e) { |
558 | 560 | this.outstandingEvents.remove(eventName); |
559 | 561 | } |
560 | 562 | String rawResult = eventRaised.getInput().getValue(); |
561 | | - Object result = this.dataConverter.deserialize( |
562 | | - rawResult, |
563 | | - matchingTaskRecord.getDataType()); |
564 | 563 | CompletableTask task = matchingTaskRecord.getTask(); |
565 | | - task.complete(result); |
| 564 | + try { |
| 565 | + Object result = this.dataConverter.deserialize( |
| 566 | + rawResult, |
| 567 | + matchingTaskRecord.getDataType()); |
| 568 | + task.complete(result); |
| 569 | + } catch (Exception ex) { |
| 570 | + task.completeExceptionally(ex); |
| 571 | + } |
566 | 572 | } |
567 | 573 |
|
568 | 574 | private void handleEventWhileSuspended (HistoryEvent historyEvent){ |
@@ -694,10 +700,13 @@ private void handleSubOrchestrationCompleted(HistoryEvent e) { |
694 | 700 | rawResult != null ? rawResult : "(null)")); |
695 | 701 |
|
696 | 702 | } |
697 | | - |
698 | | - Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); |
699 | 703 | CompletableTask task = record.getTask(); |
700 | | - task.complete(result); |
| 704 | + try { |
| 705 | + Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); |
| 706 | + task.complete(result); |
| 707 | + } catch (Exception ex) { |
| 708 | + task.completeExceptionally(ex); |
| 709 | + } |
701 | 710 | } |
702 | 711 |
|
703 | 712 | private void handleSubOrchestrationFailed(HistoryEvent e){ |
@@ -787,12 +796,17 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) { |
787 | 796 | // We don't check the event in the pass event list to avoid duplicated events. |
788 | 797 | Set<HistoryEvent> externalEvents = new HashSet<>(this.unprocessedEvents); |
789 | 798 | List<HistoryEvent> newEvents = this.historyEventPlayer.getNewEvents(); |
| 799 | + int currentHistoryIndex = this.historyEventPlayer.getCurrentHistoryIndex(); |
| 800 | + |
| 801 | + // Only add events that haven't been processed to the carryOverEvents |
| 802 | + // currentHistoryIndex will point to the first unprocessed event |
| 803 | + for (int i = currentHistoryIndex; i < newEvents.size(); i++) { |
| 804 | + HistoryEvent historyEvent = newEvents.get(i); |
| 805 | + if (historyEvent.getEventTypeCase() == HistoryEvent.EventTypeCase.EVENTRAISED) { |
| 806 | + externalEvents.add(historyEvent); |
| 807 | + } |
| 808 | + } |
790 | 809 |
|
791 | | - Set<HistoryEvent> filteredEvents = newEvents.stream() |
792 | | - .filter(e -> e.getEventTypeCase() == HistoryEvent.EventTypeCase.EVENTRAISED) |
793 | | - .collect(Collectors.toSet()); |
794 | | - |
795 | | - externalEvents.addAll(filteredEvents); |
796 | 810 | externalEvents.forEach(builder::addCarryoverEvents); |
797 | 811 | } |
798 | 812 |
|
@@ -946,7 +960,11 @@ public boolean moveNext() { |
946 | 960 | } |
947 | 961 |
|
948 | 962 | List<HistoryEvent> getNewEvents() { |
949 | | - return newEvents; |
| 963 | + return this.newEvents; |
| 964 | + } |
| 965 | + |
| 966 | + int getCurrentHistoryIndex() { |
| 967 | + return this.currentHistoryIndex; |
950 | 968 | } |
951 | 969 | } |
952 | 970 |
|
@@ -1322,6 +1340,10 @@ protected void handleException(Throwable e) { |
1322 | 1340 | throw (CompositeTaskFailedException)e; |
1323 | 1341 | } |
1324 | 1342 |
|
| 1343 | + if (e instanceof DataConverter.DataConverterException) { |
| 1344 | + throw (DataConverter.DataConverterException)e; |
| 1345 | + } |
| 1346 | + |
1325 | 1347 | throw new RuntimeException("Unexpected failure in the task execution", e); |
1326 | 1348 | } |
1327 | 1349 |
|
|
0 commit comments