11const cds = require ( '@sap/cds' )
22const log = cds . log ( 'cds.data' )
33
4- if ( cds . requires . data_federation ) cds . on ( 'loaded' , csn => {
5-
6- const federated = e => e . is_entity && e [ '@federated' ]
7- const todos = cds . linked ( csn ) . collect ( federated , entity => {
8- let source = entity . source // the external source entity
9- let srv = source . service // the external service
10- let conf = cds . requires [ srv ?. name ] // the service's binding point
11- if ( ! conf ?. credentials ) return // not bound to remote, skipping
12- entity [ '@cds.persistence.table' ] = true
13- entity [ '@cds.persistence.skip' ] = false
14- return { entity : entity . name , from : srv . name }
15- } )
16- if ( ! todos . length ) return
174
5+ cds . on ( 'loaded' , csn => {
6+
7+ const todos = [ ]
8+ for ( let each of cds . linked ( csn ) . definitions ) {
9+
10+ // only entities tagged with @federated :true or @federated:'initial-load'
11+ let fed = each . is_entity && each [ '@federated' ]
12+ if ( ! fed || fed !== 'initial-load' && fed !== true ) continue
13+
14+ // only bound services
15+ let srv = each . source ?. service
16+ let conf = cds . requires [ srv ?. name ]
17+ if ( ! conf ?. credentials ?. url ) continue
18+
19+ // only if default federation config matches
20+ if ( fed === true && ( conf . federation ?. [ each . name ] || conf . federation ?. default ) !== 'initial-load' ) continue
21+
22+ // use consumption view as table for replication
23+ each [ '@cds.persistence.table' ] = true
24+ todos . push ( { entity : each . name , from : srv . name } )
25+ }
26+ if ( ! todos . length ) return // nothing to do
27+
1828 cds . once ( 'served' , async ( ) => {
1929 const dfs = await cds . connect . to ( DataFederationService )
2030 return Promise . all ( todos . map (
@@ -35,8 +45,8 @@ class DataFederationService extends cds.Service { init() {
3545 const { entity, from : remote } = req . data
3646 const srv = await cds . connect . to ( remote )
3747 const rows = await srv . read ( entity ) // @Bob : can we stream this?
38- await INSERT . into ( entity ) . entries ( rows )
39- log . info ( 'initially loaded:' , { entity, from : remote , via : srv . kind } )
48+ await cds . insert ( rows ) . into ( entity )
49+ log . info ( 'initially loaded' , rows . length , 'rows for :', { entity, from : remote , via : srv . kind } )
4050 } )
4151
4252} }
0 commit comments