@@ -30,7 +30,7 @@ class CiscoampCollector extends PawsCollector {
3030 super ( context , creds , packageJson . version ) ;
3131 }
3232
33- pawsInitCollectionState ( event , callback ) {
33+ pawsInitCollectionState ( event ) {
3434 const startTs = process . env . paws_collection_start_ts ?
3535 process . env . paws_collection_start_ts :
3636 moment ( ) . toISOString ( ) ;
@@ -46,17 +46,17 @@ class CiscoampCollector extends PawsCollector {
4646 totalLogsCount : 0 ,
4747 poll_interval_sec : 1
4848 } ) ) ;
49- return callback ( null , initialStates , 1 ) ;
49+ return { state : initialStates , nextInvocationTimeout : 1 } ;
5050 }
5151
52- pawsGetRegisterParameters ( event , callback ) {
52+ pawsGetRegisterParameters ( event ) {
5353 const regValues = {
5454 ciscoampResourceNames : process . env . collector_streams
5555 } ;
56- callback ( null , regValues ) ;
56+ return regValues ;
5757 }
5858
59- pawsGetLogs ( state , callback ) {
59+ async pawsGetLogs ( state ) {
6060 let collector = this ;
6161 // This code can remove once exsisting code set stream and collector_streams env variable
6262 if ( ! process . env . collector_streams ) {
@@ -67,11 +67,11 @@ class CiscoampCollector extends PawsCollector {
6767 }
6868 const clientSecret = collector . secret ;
6969 if ( ! clientSecret ) {
70- return callback ( "The Client Secret was not found!" ) ;
70+ throw new Error ( "The Client Secret was not found!" ) ;
7171 }
7272 const clientId = collector . clientId ;
7373 if ( ! clientId ) {
74- return callback ( "The Client ID was not found!" ) ;
74+ throw new Error ( "The Client ID was not found!" ) ;
7575 }
7676
7777 const baseUrl = process . env . paws_endpoint . replace ( / ^ h t t p s : \/ \/ | \/ $ / g, '' ) ;
@@ -84,7 +84,7 @@ class CiscoampCollector extends PawsCollector {
8484
8585 var resourceDetails = utils . getAPIDetails ( state ) ;
8686 if ( ! resourceDetails . url ) {
87- return callback ( "The resource name was not found!" ) ;
87+ throw new Error ( "The resource name was not found!" ) ;
8888 }
8989
9090 typeIdPaths = resourceDetails . typeIdPaths ;
@@ -93,52 +93,54 @@ class CiscoampCollector extends PawsCollector {
9393 let apiUrl = state . nextPage ? state . nextPage : resourceDetails . url ;
9494
9595 AlLogger . info ( `CAMP000001 Collecting data for ${ state . stream } from ${ state . since } till ${ state . until } ` ) ;
96- utils . getAPILogs ( baseUrl , base64EncodedString , apiUrl , state , [ ] , process . env . paws_max_pages_per_invocation )
97- . then ( ( { accumulator, nextPage, newSince } ) => {
98- state . apiQuotaResetDate = null ;
99- if ( state . stream === Events && apiUrl === resourceDetails . url && newSince ) {
100- // Added 1 more secs in received date to avoid duplication of message
101- state . until = moment ( newSince ) . add ( 1 , 'seconds' ) . toISOString ( ) ;
96+ try {
97+ const { accumulator, nextPage, newSince } = await utils . getAPILogs ( baseUrl , base64EncodedString , apiUrl , state , [ ] , process . env . paws_max_pages_per_invocation ) ;
98+ state . apiQuotaResetDate = null ;
99+ if ( state . stream === Events && apiUrl === resourceDetails . url && newSince ) {
100+ // Added 1 more secs in received date to avoid duplication of message
101+ state . until = moment ( newSince ) . add ( 1 , 'seconds' ) . toISOString ( ) ;
102+ }
103+
104+ let newState ;
105+ if ( nextPage === undefined ) {
106+ newState = this . _getNextCollectionState ( state ) ;
107+ } else {
108+ newState = this . _getNextCollectionStateWithNextPage ( state , nextPage , accumulator . length ) ;
109+ }
110+ AlLogger . info ( `CAMP000004 Next collection in ${ newState . poll_interval_sec } seconds` ) ;
111+ return [ accumulator , newState , newState . poll_interval_sec ] ;
112+ } catch ( error ) {
113+
114+ const errResponse = typeof error !== 'string' ? error . response : null ;
115+ const hasApiErrors = errResponse && errResponse . data && Array . isArray ( errResponse . data . errors ) && errResponse . data . errors . length > 0 ;
116+ const apiErrorCode = hasApiErrors ? errResponse . data . errors [ 0 ] . error_code : null ;
117+ if ( apiErrorCode === API_THROTTLING_ERROR ) {
118+ const rateLimitResetSecs = parseInt ( errResponse . headers [ 'x-ratelimit-reset' ] , 10 ) ;
119+ const extraBufferSeconds = 60 ;
120+ const resetSeconds = Number . isNaN ( rateLimitResetSecs ) ? extraBufferSeconds : rateLimitResetSecs + extraBufferSeconds ;
121+ state . apiQuotaResetDate = moment ( ) . add ( resetSeconds , "seconds" ) . toISOString ( ) ;
122+ AlLogger . info ( `CAMP000003 API hourly Limit Exceeded. The quota will be reset at ${ state . apiQuotaResetDate } ` ) ;
123+ state . poll_interval_sec = resetSeconds ;
124+ // Reduce time interval to half till 60 sec and try to fetch data again.
125+ const currentInterval = moment ( state . until ) . diff ( state . since , 'seconds' ) ;
126+ if ( currentInterval > 120 ) {
127+ state . until = moment ( state . since ) . add ( Math . ceil ( currentInterval / 2 ) , 'seconds' ) . toISOString ( ) ;
102128 }
103-
104- let newState ;
105- if ( nextPage === undefined ) {
106- newState = this . _getNextCollectionState ( state ) ;
107- } else {
108- newState = this . _getNextCollectionStateWithNextPage ( state , nextPage , accumulator . length ) ;
109- }
110- AlLogger . info ( `CAMP000004 Next collection in ${ newState . poll_interval_sec } seconds` ) ;
111- return callback ( null , accumulator , newState , newState . poll_interval_sec ) ;
112- } ) . catch ( ( error ) => {
113- const errResponse = typeof error !== 'string' ? error . response : null ;
114- if ( errResponse && errResponse . data . errors [ 0 ] . error_code === API_THROTTLING_ERROR ) {
115- const rateLimitResetSecs = parseInt ( errResponse [ 'headers' ] [ 'x-ratelimit-reset' ] ) ;
116- const extraBufferSeconds = 60 ;
117- const resetSeconds = rateLimitResetSecs + extraBufferSeconds ;
118- state . apiQuotaResetDate = moment ( ) . add ( resetSeconds , "seconds" ) . toISOString ( ) ;
119- AlLogger . info ( `CAMP000003 API hourly Limit Exceeded. The quota will be reset at ${ state . apiQuotaResetDate } ` ) ;
120- state . poll_interval_sec = resetSeconds ;
121- // Reduce time interval to half till 60 sec and try to fetch data again.
122- const currentInterval = moment ( state . until ) . diff ( state . since , 'seconds' ) ;
123- if ( currentInterval > 120 ) {
124- state . until = moment ( state . since ) . add ( Math . ceil ( currentInterval / 2 ) , 'seconds' ) . toISOString ( ) ;
125- }
126- collector . reportApiThrottling ( function ( ) {
127- return callback ( null , [ ] , state , state . poll_interval_sec ) ;
128- } ) ;
129+ await collector . reportApiThrottling ( ) ;
130+ return [ [ ] , state , state . poll_interval_sec ] ;
131+ }
132+ else {
133+ // set errorCode if not available in error object to showcase client error on DDMetric
134+ if ( hasApiErrors ) {
135+ errResponse . data . errorCode = apiErrorCode ;
136+ throw errResponse . data ;
129137 }
130138 else {
131- // set errorCode if not available in error object to showcase client error on DDMetric
132- if ( errResponse && errResponse . data && errResponse . data . errors ) {
133- errResponse . data . errorCode = errResponse . data . errors [ 0 ] . error_code ;
134- return callback ( errResponse . data ) ;
135- }
136- else {
137- error . errorCode = error . response . status ;
138- return callback ( error ) ;
139- }
139+ error . errorCode = ( errResponse && errResponse . status ) ? errResponse . status : error . status ;
140+ throw error ;
140141 }
141- } ) ;
142+ }
143+ }
142144 }
143145
144146 _getNextCollectionState ( curState ) {
0 commit comments