1010import com .backbase .stream .investment .service .InvestmentAssetPriceService ;
1111import com .backbase .stream .investment .service .InvestmentAssetUniverseService ;
1212import com .backbase .stream .investment .service .InvestmentClientService ;
13+ import com .backbase .stream .investment .service .InvestmentIntradayAssetPriceService ;
1314import com .backbase .stream .investment .service .InvestmentPortfolioService ;
1415import com .backbase .stream .worker .StreamTaskExecutor ;
1516import com .backbase .stream .worker .model .StreamTask ;
@@ -59,32 +60,34 @@ public class InvestmentAssetUniversSaga implements StreamTaskExecutor<Investment
5960
6061 private final InvestmentAssetUniverseService assetUniverseService ;
6162 private final InvestmentAssetPriceService investmentAssetPriceService ;
63+ private final InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService ;
6264 private final InvestmentIngestionConfigurationProperties coreConfigurationProperties ;
6365
6466 @ Override
6567 public Mono <InvestmentAssetsTask > executeTask (InvestmentAssetsTask streamTask ) {
6668 if (!coreConfigurationProperties .isAssetUniversEnabled ()) {
67- log .warn ("Skip investment asset univers saga execution: taskId={}, taskName={}" ,
69+ log .warn ("Skip investment asset universe saga execution: taskId={}, taskName={}" ,
6870 streamTask .getId (), streamTask .getName ());
6971 return Mono .just (streamTask );
7072 }
71- log .info ("Starting investment saga execution: taskId={}, taskName={}" ,
73+ log .info ("Starting investment asset universe saga execution: taskId={}, taskName={}" ,
7274 streamTask .getId (), streamTask .getName ());
73- return createMarkets (streamTask )
74- .flatMap (this ::createMarketSpecialDays )
75- .flatMap (this ::createAssetCategoryTypes )
76- .flatMap (this ::createAssetCategories )
75+ return upsertMarkets (streamTask )
76+ .flatMap (this ::upsertMarketSpecialDays )
77+ .flatMap (this ::upsertAssetCategoryTypes )
78+ .flatMap (this ::upsertAssetCategories )
7779 .flatMap (this ::createAssets )
7880 .flatMap (this ::upsertPrices )
81+ .flatMap (this ::upsertIntradayPrices )
7982 .doOnSuccess (completedTask -> log .info (
80- "Successfully completed investment saga: taskId={}, taskName={}, state={}" ,
83+ "Successfully completed investment asset universe saga: taskId={}, taskName={}, state={}" ,
8184 completedTask .getId (), completedTask .getName (), completedTask .getState ()))
8285 .doOnError (throwable -> {
83- log .error ("Failed to execute investment saga: taskId={}, taskName={}" ,
86+ log .error ("Failed to execute investment asset universe saga: taskId={}, taskName={}" ,
8487 streamTask .getId (), streamTask .getName (), throwable );
8588 streamTask .error (INVESTMENT , OP_UPSERT , RESULT_FAILED ,
8689 streamTask .getName (), streamTask .getId (),
87- "Investment saga failed: " + throwable .getMessage ());
90+ "Investment asset universe saga failed: " + throwable .getMessage ());
8891 streamTask .setState (State .FAILED );
8992 })
9093 .onErrorResume (throwable -> Mono .just (streamTask ));
@@ -96,6 +99,11 @@ private Mono<InvestmentAssetsTask> upsertPrices(InvestmentAssetsTask investmentT
9699 .map (investmentTask ::setPriceTasks );
97100 }
98101
102+ private Mono <InvestmentAssetsTask > upsertIntradayPrices (InvestmentAssetsTask investmentTask ) {
103+ return investmentIntradayAssetPriceService .ingestIntradayPrices ()
104+ .map (investmentTask ::setIntradayPriceTasks );
105+ }
106+
99107 /**
100108 * Rollback is not implemented for investment saga.
101109 *
@@ -112,7 +120,7 @@ public Mono<InvestmentAssetsTask> rollBack(InvestmentAssetsTask streamTask) {
112120 return Mono .empty ();
113121 }
114122
115- public Mono <InvestmentAssetsTask > createMarkets (InvestmentAssetsTask investmentTask ) {
123+ public Mono <InvestmentAssetsTask > upsertMarkets (InvestmentAssetsTask investmentTask ) {
116124 InvestmentAssetData investmentData = investmentTask .getData ();
117125 int marketCount = investmentData .getMarkets () != null ? investmentData .getMarkets ().size () : 0 ;
118126 log .info ("Starting investment market creation: taskId={}, marketCount={}" ,
@@ -161,15 +169,15 @@ public Mono<InvestmentAssetsTask> createMarkets(InvestmentAssetsTask investmentT
161169 }
162170
163171 /**
164- * Creates or upserts market special days for the investment task.
172+ * Upserts market special days for the investment task.
165173 *
166174 * <p>This method processes each market special day in the task data by invoking
167175 * the asset universe service. It updates the task state and logs progress for observability.
168176 *
169177 * @param investmentTask the investment task containing market special day data
170178 * @return Mono emitting the updated investment task with market special days set
171179 */
172- public Mono <InvestmentAssetsTask > createMarketSpecialDays (InvestmentAssetsTask investmentTask ) {
180+ public Mono <InvestmentAssetsTask > upsertMarketSpecialDays (InvestmentAssetsTask investmentTask ) {
173181 InvestmentAssetData investmentData = investmentTask .getData ();
174182 int marketSpecialDayCount =
175183 investmentData .getMarketSpecialDays () != null ? investmentData .getMarketSpecialDays ().size () : 0 ;
@@ -219,15 +227,15 @@ public Mono<InvestmentAssetsTask> createMarketSpecialDays(InvestmentAssetsTask i
219227 }
220228
221229 /**
222- * Creates or upserts asset categories for the investment task.
230+ * Upserts asset categories for the investment task.
223231 *
224232 * <p>This method processes each asset category in the task data by invoking
225233 * the asset universe service. It updates the task state and logs progress for observability.
226234 *
227235 * @param investmentTask the investment task containing asset category data
228236 * @return Mono emitting the updated investment task with asset categories set
229237 */
230- public Mono <InvestmentAssetsTask > createAssetCategories (InvestmentAssetsTask investmentTask ) {
238+ public Mono <InvestmentAssetsTask > upsertAssetCategories (InvestmentAssetsTask investmentTask ) {
231239 InvestmentAssetData investmentData = investmentTask .getData ();
232240 int categoryCount =
233241 investmentData .getAssetCategories () != null ? investmentData .getAssetCategories ().size () : 0 ;
@@ -274,7 +282,7 @@ public Mono<InvestmentAssetsTask> createAssetCategories(InvestmentAssetsTask inv
274282 });
275283 }
276284
277- public Mono <InvestmentAssetsTask > createAssetCategoryTypes (InvestmentAssetsTask investmentTask ) {
285+ public Mono <InvestmentAssetsTask > upsertAssetCategoryTypes (InvestmentAssetsTask investmentTask ) {
278286 InvestmentAssetData investmentData = investmentTask .getData ();
279287 int typeCount =
280288 investmentData .getAssetCategoryTypes () != null ? investmentData .getAssetCategoryTypes ().size () : 0 ;
0 commit comments