@@ -6,6 +6,7 @@ import { pipeline, finished } from 'stream/promises';
66import YadamuLibrary from '../lib/yadamuLibrary.js'
77import YadamuConstants from '../lib/yadamuConstants.js'
88import YadamuWriter from '../dbi/base/yadamuWriter.js'
9+ import DBIConstants from '../dbi/base/dbiConstants.js'
910
1011import Yadamu from './yadamu.js'
1112import { YadamuError , ExportError , DatabaseError , IterativeInsertError , InputStreamError } from './yadamuException.js'
@@ -116,13 +117,6 @@ class DBReader extends Readable {
116117 } )
117118 }
118119
119- setReader ( pipeline ) {
120- const writer = pipeline . find ( ( s ) => { return s instanceof YadamuWriter } )
121- if ( writer ) {
122- writer . setReader ( pipeline [ 0 ] )
123- }
124- }
125-
126120 async pipelineTable ( task , readerDBI , writerDBI , retryOnError ) {
127121
128122 retryOnError = retryOnError && ( readerDBI . ON_ERROR === 'RETRY' )
@@ -131,17 +125,15 @@ class DBReader extends Readable {
131125
132126 const yadamuPipeline = [ ]
133127 const activeStreams = [ ]
134- let pipelineMetrics
128+ const pipelineState = DBIConstants . PIPELINE_STATE
135129
136130 try {
137131 queryInfo = readerDBI . generateSQLQuery ( task )
138132 queryInfo . TARGET_DATA_TYPES = writerDBI . metadata ?. [ queryInfo . TABLE_NAME ] ?. dataTypes ?? [ ]
139133 queryInfo . TARGET_COLUMN_NAMES = writerDBI . metadata ?. [ queryInfo . TABLE_NAME ] ?. columnNames ?? [ ]
140- const inputStreams = await readerDBI . getInputStreams ( queryInfo , writerDBI . PARSE_DELAY )
141- pipelineMetrics = inputStreams [ 0 ] . COPY_METRICS
142- yadamuPipeline . push ( ...inputStreams )
143- const outputStreams = await writerDBI . getOutputStreams ( queryInfo . MAPPED_TABLE_NAME , pipelineMetrics )
144- yadamuPipeline . push ( ...outputStreams )
134+ const inputStreams = await readerDBI . getInputStreams ( queryInfo , pipelineState )
135+ const outputStreams = await writerDBI . getOutputStreams ( queryInfo . MAPPED_TABLE_NAME , pipelineState )
136+ yadamuPipeline . push ( ...inputStreams , ...outputStreams )
145137 activeStreams . push ( ...yadamuPipeline . map ( ( s ) => { return finished ( s ) } ) )
146138 } catch ( e ) {
147139 this . yadamuLogger . handleException ( [ 'PIPELINE' , 'STREAM INITIALIZATION' , readerDBI . DATABASE_VENDOR , writerDBI . DATABASE_VENDOR , task . TABLE_NAME ] , e )
@@ -156,18 +148,21 @@ class DBReader extends Readable {
156148 } )
157149 this . activeReaders . add ( activeReader )
158150 // Pass the Reader to the YadamuWriter instance so it can calculate lost rows correctly in the event of an error
159- this . setReader ( yadamuPipeline )
151+
160152 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME],`${yadamuPipeline.map((s) => { return s.constructor.name }).join(' => ')}`)
161-
162153 // this.traceStreamEvents(yadamuPipeline,queryInfo.TABLE_NAME)
163154
164- pipelineMetrics . pipeStartTime = performance . now ( ) ;
155+ pipelineState . startTime = performance . now ( ) ;
165156 await pipeline ( ...yadamuPipeline )
166- pipelineMetrics . pipeEndTime = performance . now ( ) ;
157+ pipelineState . endTime = performance . now ( ) ;
158+
159+ // Report Pipeline state
160+ // console.log(pipelineState)
161+ writerDBI . reportPipelineStatus ( pipelineState )
162+
167163 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME],`${yadamuPipeline.map((s) => { return `${s.constructor.name}:${s.destroyed}` }).join(' => ') }`)
168-
169-
170164 } catch ( err ) {
165+ pipelineState . endTime = performance . now ( ) ;
171166
172167 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME,readerDBI.ON_ERROR,'FAILED'],`${err.constructor.name},${err.message}`)
173168
@@ -181,18 +176,18 @@ class DBReader extends Readable {
181176 throw err
182177 }
183178
184- // Wait for all components of the pipeline to finish before closing connections
185-
179+ // Wait for all components of the pipeline to finish before closing connections
186180 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE','FAILED','STREAMS_COMPLETE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,readerDBI.ON_ERROR,readerDBI.getWorkerNumber(),task.TABLE_NAME,`${yadamuPipeline.map((s) => { return `${s.constructor.name}`}).join(' => ') }`],'WAITING')
187181 await Promise . allSettled ( activeStreams )
188182 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE','FAILED','STREAMS_COMPLETE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,readerDBI.ON_ERROR,readerDBI.getWorkerNumber(),task.TABLE_NAME,`${yadamuPipeline.map((s) => { return `${s.constructor.name}`}).join(' => ') }`],'COMPLETED')
189-
190- // console.log(pipelineMetrics)
191- // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME],`${yadamuPipeline.map((s) => { return `${s.constructor.name}:[${s.readableLength},${s.writableLength}]` }).join(',') }`)
192183
193- // Determine the underlying cause of the error.
194- const cause = pipelineMetrics . readerError || pipelineMetrics . parserError || yadamuPipeline . find ( ( s ) => { return s . underlyingError instanceof Error } ) ?. underlyingError || err
195-
184+
185+ // Report Pipeline state and determine the underlying cause of the error.
186+ // console.log(pipelineState)
187+ let cause = writerDBI . reportPipelineStatus ( pipelineState , err )
188+
189+ // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME],`${yadamuPipeline.map((s) => { return `${s.constructor.name}:[${s.readableLength},${s.writableLength}]` }).join(',') }`)
190+
196191 // Verify all components of the pipeline have been destroyed.
197192 // this.yadamuLogger.trace([this.constructor.name,'PIPELINE',readerDBI.DATABASE_VENDOR,writerDBI.DATABASE_VENDOR,queryInfo.MAPPED_TABLE_NAME],`${yadamuPipeline.map((s) => { return `${s.constructor.name}:${s.destroyed}` }).join(' => ') }`)
198193 // yadamuPipeline.map((s) => { if (!s.destroyed){ s.destroy(cause)})
@@ -202,56 +197,61 @@ class DBReader extends Readable {
202197 throw cause ;
203198 }
204199
205- if ( YadamuError . lostConnection ( pipelineMetrics . readerError ) || YadamuError . lostConnection ( pipelineMetrics . parserError ) ) {
200+ if ( YadamuError . lostConnection ( pipelineState [ DBIConstants . INPUT_STREAM_ID ] . error ) || YadamuError . lostConnection ( pipelineState [ DBIConstants . PARSER_STREAM_ID ] . error ) ) {
206201 // If the reader or parser failed with a lost connection error re-establish the input stream connection
207202 await readerDBI . reconnect ( cause , 'READER' )
208203
209204 }
210205 else {
211206 this . yadamuLogger . handleException ( [ 'PIPELINE' , readerDBI . DATABASE_VENDOR , writerDBI . DATABASE_VENDOR , queryInfo . MAPPED_TABLE_NAME ] , cause )
212207 }
213-
208+
214209 if ( retryOnError ) {
215- await this . retryPipelineTable ( task , readerDBI , writerDBI , pipelineMetrics )
210+ await this . retryPipelineTable ( task , readerDBI , writerDBI , pipelineState )
216211 }
217212
218- this . dbi . resetExceptionTracking ( )
219213 }
220214
221- return pipelineMetrics
215+ return pipelineState
222216
223217 }
224218
225- pipelineSucceeded ( metrics ) {
226- return ( metrics . read === metrics . committed )
219+ pipelineSucceeded ( pipelineState ) {
220+ return ( pipelineState . read === pipelineState . committed )
227221 }
228222
229- duplicateCause ( originalMetrics , latestMetrics ) {
230- return ( originalMetrics . readerError === latestMetrics . readerError )
231- && ( originalMetrics . parserError === latestMetrics . parserError )
232- && ( originalMetrics . managerError === latestMetrics . managerError )
233- && ( originalMetrics . writerError === latestMetrics . writerError )
223+ isDuplicateException ( a , b ) {
224+
225+ return ( ( a instanceof Error ) &&
226+ ( b instanceof Error ) &&
227+ ( a . message === b . message ) )
228+
229+ }
230+
231+ duplicateCause ( previousState , currentState ) {
232+ return this . isDuplicateException ( previousState [ DBIConstants . INPUT_STREAM_ID ] . error , currentState [ DBIConstants . INPUT_STREAM_ID ] . error )
233+ || this . isDuplicateException ( previousState [ DBIConstants . PARSER_STREAM_ID ] . error , currentState [ DBIConstants . PARSER_STREAM_ID ] . error )
234+ || this . isDuplicateException ( previousState [ DBIConstants . TRANSFORMATION_STREAM_ID ] . error , currentState [ DBIConstants . TRANSFORMATION_STREAM_ID ] . error )
235+ || this . isDuplicateException ( previousState [ DBIConstants . OUTPUT_STREAM_ID ] . error , currentState [ DBIConstants . OUTPUT_STREAM_ID ] . error )
234236 }
235237
236- async retryPipelineTable ( task , readerDBI , writerDBI , originalMetrics ) {
238+ async retryPipelineTable ( task , readerDBI , writerDBI , previousState ) {
237239
238240 /*
239241 **
240242 ** Retry the operation. If the operation up to RETRY_COUNT times. If the operation fails with a similar exception ABORT.
241243 **
242244 */
243-
244245 let retryCount = 0
245- let retryMetrics = originalMetrics
246+ let currentState = previousState
246247 do {
247- const errorType = readerDBI . raisedError ( retryMetrics . readerError ) ? 'READER ERROR' : writerDBI . raisedError ( retryMetrics . readerError ) ? 'WRITER ERROR' : 'TRANSFORM ERROR'
248+ const errorType = readerDBI . raisedError ( currentState [ DBIConstants . INPUT_STREAM_ID ] . error ) ? 'READER ERROR' : writerDBI . raisedError ( currentState [ DBIConstants . OUTPUT_STREAM_ID ] . error ) ? 'WRITER ERROR' : 'TRANSFORM ERROR'
248249 this . yadamuLogger . info ( [ 'PIPELINE' , errorType , readerDBI . DATABASE_VENDOR , writerDBI . DATABASE_VENDOR , task . TABLE_NAME , readerDBI . ON_ERROR ] , `Retrying Operation.` )
249250 await writerDBI . truncateTable ( writerDBI . CURRENT_SCHEMA , task . TABLE_NAME )
250251 readerDBI . adjustQuery ( task )
251252 retryCount ++
252- retryMetrics = await this . pipelineTable ( task , readerDBI , writerDBI , false )
253- } while ( ! this . pipelineSucceeded ( retryMetrics ) && ! this . duplicateCause ( originalMetrics , retryMetrics ) && ( retryCount < 6 ) )
254-
253+ currentState = await this . pipelineTable ( task , readerDBI , writerDBI , false )
254+ } while ( ! this . pipelineSucceeded ( currentState ) && ! this . duplicateCause ( previousState , currentState ) && ( retryCount < 6 ) )
255255 }
256256
257257 async pipelineTables ( taskList , readerDBI , writerDBI ) {
@@ -303,7 +303,7 @@ class DBReader extends Readable {
303303 return [ this ]
304304 }
305305 else {
306- return this . dbi . getInputStreams ( )
306+ return this . dbi . getInputStreams ( DBIConstants . PIPELINE_STATE )
307307 }
308308
309309 }
@@ -373,10 +373,7 @@ class DBReader extends Readable {
373373 async doDestroy ( err ) {
374374 // this.yadamuLogger.trace([this.constructor.name,`destroy`,this.dbi.DATABASE_VENDOR],``)
375375 try {
376- // Try to clean up the DBI gracefully
377- // this.yadamuLogger.trace([this.constructor.name,this.nextPhase],'DDL COMPLETE: WAITING')
378- // await this.dbWriter.dbi.ddlComplete
379- // this.yadamuLogger.trace([this.constructor.name,this.nextPhase],'DDL COMPLETE: PROCESSING')
376+ await this . dbi . finalizeRead ( )
380377 await this . dbi . finalizeExport ( ) ;
381378 await this . dbi . final ( )
382379 }
0 commit comments