@@ -8,7 +8,6 @@ const ElasticsearchWritableStream = require('./ElasticSearchWriteableStream')
88const logger = require ( './logger' )
99
1010let _esClient
11-
1211/*
1312This module is used for connecting to an Elasticsearch instance, writing records,
1413searching records, and managing the indexes. It looks for the ES_HOST environment
@@ -77,7 +76,6 @@ async function esClient() {
7776// Create STAC mappings
7877async function prepare ( index ) {
7978 // TODO - different mappings for collection and item
80- let ready
8179 const props = {
8280 'type' : 'object' ,
8381 properties : {
@@ -106,7 +104,6 @@ async function prepare(index) {
106104 } ]
107105 const client = await esClient ( )
108106 const indexExists = await client . indices . exists ( { index } )
109-
110107 if ( ! indexExists ) {
111108 const payload = {
112109 index,
@@ -133,49 +130,71 @@ async function prepare(index) {
133130 try {
134131 await client . indices . create ( payload )
135132 logger . info ( `Created index: ${ JSON . stringify ( payload ) } ` )
136- ready = 0
137133 } catch ( error ) {
138134 const debugMessage = `Error creating index, already created: ${ error } `
139135 logger . debug ( debugMessage )
140136 }
141137 }
142- return ready
143138}
144139
145140// Given an input stream and a transform, write records to an elasticsearch instance
146141async function _stream ( ) {
147- const toEs = through2 . obj ( { objectMode : true } , ( data , encoding , next ) => {
148- let index = ''
149- if ( data && data . hasOwnProperty ( 'extent' ) ) {
150- index = 'collections'
151- } else if ( data && data . hasOwnProperty ( 'geometry' ) ) {
152- index = 'items'
153- } else {
154- next ( )
155- return
156- }
157- // remove any hierarchy links in a non-mutating way
158- const hlinks = [ 'self' , 'root' , 'parent' , 'child' , 'collection' , 'item' ]
159- const links = data . links . filter ( ( link ) => hlinks . includes ( link ) )
160- const dataNoLinks = Object . assign ( { } , data , { links } )
161-
162- // create ES record
163- const record = {
164- index,
165- type : 'doc' ,
166- id : dataNoLinks . id ,
167- action : 'update' ,
168- _retry_on_conflict : 3 ,
169- body : {
170- doc : dataNoLinks ,
171- doc_as_upsert : true
172- }
173- }
174- next ( null , record )
175- } )
176142 let esStreams
177143 try {
144+ let collections = [ ]
178145 const client = await esClient ( )
146+ const indexExists = await client . indices . exists ( { index : 'collections' } )
147+ if ( indexExists ) {
148+ const body = { query : { match_all : { } } }
149+ const searchParams = {
150+ index : 'collections' ,
151+ body
152+ }
153+ const resultBody = await client . search ( searchParams )
154+ collections = resultBody . hits . hits . map ( ( r ) => ( r . _source ) )
155+ }
156+
157+ const toEs = through2 . obj ( { objectMode : true } , ( data , encoding , next ) => {
158+ let index = ''
159+ if ( data && data . hasOwnProperty ( 'extent' ) ) {
160+ index = 'collections'
161+ } else if ( data && data . hasOwnProperty ( 'geometry' ) ) {
162+ index = 'items'
163+ } else {
164+ next ( )
165+ return
166+ }
167+ // remove any hierarchy links in a non-mutating way
168+ const hlinks = [ 'self' , 'root' , 'parent' , 'child' , 'collection' , 'item' ]
169+ const links = data . links . filter ( ( link ) => hlinks . includes ( link ) )
170+ let esDataObject = Object . assign ( { } , data , { links } )
171+ if ( index === 'items' ) {
172+ const collectionId = data . properties . collection
173+ const itemCollection =
174+ collections . find ( ( collection ) => ( collectionId === collection . id ) )
175+ if ( itemCollection ) {
176+ const flatProperties =
177+ Object . assign ( { } , itemCollection . properties , data . properties )
178+ esDataObject = Object . assign ( { } , esDataObject , { properties : flatProperties } )
179+ } else {
180+ logger . error ( `${ data . id } has no collection` )
181+ }
182+ }
183+
184+ // create ES record
185+ const record = {
186+ index,
187+ type : 'doc' ,
188+ id : esDataObject . id ,
189+ action : 'update' ,
190+ _retry_on_conflict : 3 ,
191+ body : {
192+ doc : esDataObject ,
193+ doc_as_upsert : true
194+ }
195+ }
196+ next ( null , record )
197+ } )
179198 const esStream = new ElasticsearchWritableStream ( { client : client } , {
180199 objectMode : true ,
181200 highWaterMark : process . env . ES_BATCH_SIZE || 500
@@ -243,9 +262,11 @@ function buildDatetimeQuery(parameters) {
243262
244263function buildQuery ( parameters ) {
245264 const eq = 'eq'
246- const { query, parentCollections , intersects } = parameters
265+ const { query, intersects } = parameters
247266 let must = [ ]
248267 if ( query ) {
268+ // Using reduce rather than map as we don't currently support all
269+ // stac query operators.
249270 must = Object . keys ( query ) . reduce ( ( accumulator , property ) => {
250271 const operatorsObject = query [ property ]
251272 const operators = Object . keys ( operatorsObject )
@@ -265,6 +286,7 @@ function buildQuery(parameters) {
265286 return accumulator
266287 } , must )
267288 }
289+
268290 if ( intersects ) {
269291 const { geometry } = intersects
270292 must . push ( {
@@ -279,19 +301,7 @@ function buildQuery(parameters) {
279301 must . push ( datetimeQuery )
280302 }
281303
282- let filter
283- if ( parentCollections && parentCollections . length !== 0 ) {
284- filter = {
285- bool : {
286- should : [
287- { terms : { 'properties.collection' : parentCollections } } ,
288- { bool : { must } }
289- ]
290- }
291- }
292- } else {
293- filter = { bool : { must } }
294- }
304+ const filter = { bool : { must } }
295305 const queryBody = {
296306 constant_score : { filter }
297307 }
0 commit comments