1515 */
1616'use strict'
1717import { DataPoint } from './DataPoint' ;
18- import got , { Options } from 'got' ;
1918import dayjs from 'dayjs' ;
2019import utc from 'dayjs/plugin/utc' ;
21- import { URLSearchParams } from "url" ;
20+ import { URL , URLSearchParams } from "url" ;
21+ import https , { RequestOptions as httpsRequestOpts } from "https" ;
22+ import http , { RequestOptions as httpRequestOpts } from "http" ;
2223
2324dayjs . extend ( utc )
2425
@@ -28,24 +29,56 @@ dayjs.extend(utc)
2829export class Warp10 {
2930
3031 private url : string ;
31- private options : Options = { } ;
32- private timeoutOptions : any = { } ;
32+ private options : any = { } ;
33+ private timeoutOptions = 0 ;
34+ private client : any ;
35+ private endpoint : URL ;
3336
3437 /**
3538 * Create new Warp 10™ connector.
3639 *
37- * @param url Warp 10 endpoint, without '/api/v0' at the end.
38- * @param requestTimeout
39- * @param connectTimeout
40- * @param retry
40+ * @param endpoint Warp 10 endpoint, without '/api/v0' at the end.
4141 */
42- constructor ( url : string , requestTimeout ?: number , connectTimeout ?: number , retry ?: number ) {
42+ constructor ( endpoint : string ) {
4343 // remove trailing slash if any
44- this . url = url . replace ( / \/ + $ / , '' ) ;
45- this . setTimeout ( requestTimeout , connectTimeout , retry ) ;
44+ this . url = endpoint . replace ( / \/ + $ / , '' ) ;
4645 this . options . headers = { 'Content-Type' : 'text/plain; charset=UTF-8' , 'X-Warp10-Token' : '' } ;
46+ this . client = this . url . startsWith ( 'https' ) ? https : http ;
47+ this . endpoint = new URL ( endpoint ) ;
4748 }
4849
50+ private async send ( options : httpsRequestOpts | httpRequestOpts , data ?: any ) : Promise < any > {
51+ let body : string = '' ;
52+ return new Promise ( ( resolve , reject ) => {
53+ const req : any = this . client . request ( options , ( res : any ) => {
54+ res . on ( "data" , ( chunk : any ) => body += chunk ) ;
55+ res . on ( "error" , ( err : any ) => reject ( err ) ) ;
56+ res . on ( "end" , ( ) => {
57+ try {
58+ resolve ( { body, headers : res . headers } ) ;
59+ } catch ( err ) {
60+ reject ( err ) ;
61+ }
62+ } ) ;
63+ } ) ;
64+ req . on ( "error" , ( err : any ) => reject ( err ) ) ;
65+ req . on ( 'timeout' , ( err : any ) => {
66+ reject ( err ) ;
67+ req . abort ( ) ;
68+ } ) ;
69+ req . on ( 'uncaughtException' , ( err : any ) => {
70+ req . abort ( ) ;
71+ reject ( err ) ;
72+ } ) ;
73+ if ( data ) {
74+ req . write ( data ) ;
75+ }
76+ // end the request to prevent ECONNRESET and socket hung errors
77+ req . end ( ( ) => { } ) ;
78+ } ) ;
79+ }
80+
81+
4982 private formatLabels ( labels : any ) {
5083 return `{${ Object . keys ( labels ) . map ( k => `${ k } =${ encodeURIComponent ( `${ labels [ k ] } ` ) } ` ) } }`
5184 }
@@ -54,36 +87,23 @@ export class Warp10 {
5487 return ( typeof value === 'string' ) ? `'${ encodeURIComponent ( value ) } '` : value ;
5588 }
5689
57- /**
58- * Exposed for unit tests and dynamic adjustment on embedded systems
59- * @param requestTimeout from socket opened to answer request. Default is no limit.
60- * @param connectTimeout lookup + connect phase + https handshake. Default is 10 seconds.
61- * @param retry number of retry to do the request. Default is 1.
62- */
63- setTimeout ( requestTimeout ?: number , connectTimeout ?: number , retry ?: number ) {
64- this . timeoutOptions . connect = connectTimeout || 10000 ;
65- this . timeoutOptions . secureConnect = connectTimeout || 10000 ;
66- this . timeoutOptions . lookup = connectTimeout || 10000 ;
67- this . timeoutOptions . socket = connectTimeout || 10000 ;
68- this . timeoutOptions . response = requestTimeout || undefined ;
69- this . options . timeout = this . timeoutOptions ;
70- this . options . retry = retry || 1 ;
71- }
72-
7390 /**
7491 * Build got request options from defined options
75- * @param body the got request payload
92+ * @param path request path
93+ * @param method request method
7694 * @param warpToken the X-Warp10-Token, if any
7795 */
78- private getOptions ( body ? : string , warpToken ?: string ) : Options {
96+ private getOptions ( path : string , method : string = 'GET' , warpToken ?: string ) : any {
7997 return {
80- retry : this . options . retry ,
81- timeout : this . timeoutOptions ,
98+ hostname : this . endpoint . hostname ,
99+ port : this . endpoint . port ,
100+ path,
101+ method,
102+ bodyTimeout : this . timeoutOptions ,
82103 headers : {
83104 'Content-Type' : 'text/plain; charset=UTF-8' ,
84105 'X-Warp10-Token' : warpToken || ''
85- } ,
86- body
106+ }
87107 }
88108 }
89109
@@ -94,13 +114,13 @@ export class Warp10 {
94114 exec ( warpscript : string ) {
95115 return new Promise < { result : any [ ] , meta : { elapsed : number , ops : number , fetched : number } } > ( async ( resolve , reject ) => {
96116 try {
97- const response = await got . post ( ` ${ this . url } /api/v0/exec`, this . getOptions ( warpscript ) ) as any ;
117+ const { headers , body } = await this . send ( this . getOptions ( ` /api/v0/exec`, 'POST' ) , warpscript ) as any ;
98118 resolve ( {
99- result : JSON . parse ( response . body ) ,
119+ result : JSON . parse ( body ) ,
100120 meta : {
101- elapsed : parseInt ( ( response . headers [ 'x-warp10-elapsed' ] || [ '0' ] ) [ 0 ] , 10 ) ,
102- ops : parseInt ( ( response . headers [ 'x-warp10-ops' ] || [ '0' ] ) [ 0 ] , 10 ) ,
103- fetched : parseInt ( ( response . headers [ 'x-warp10-fetched' ] || [ '0' ] ) [ 0 ] , 10 )
121+ elapsed : parseInt ( ( headers [ 'x-warp10-elapsed' ] || [ '0' ] ) [ 0 ] , 10 ) ,
122+ ops : parseInt ( ( headers [ 'x-warp10-ops' ] || [ '0' ] ) [ 0 ] , 10 ) ,
123+ fetched : parseInt ( ( headers [ 'x-warp10-fetched' ] || [ '0' ] ) [ 0 ] , 10 )
104124 }
105125 } ) ;
106126 } catch ( error ) {
@@ -134,13 +154,13 @@ export class Warp10 {
134154 }
135155 return new Promise < { result : string [ ] , meta : { elapsed : number , ops : number , fetched : number } } > ( async ( resolve , reject ) => {
136156 try {
137- const response = await got . get ( ` ${ this . url } /api/v0/fetch?${ params . toString ( ) } `, this . getOptions ( undefined , readToken ) ) as any ;
157+ const { headers , body } = await this . send ( this . getOptions ( ` /api/v0/fetch?${ params . toString ( ) } `, 'GET' , readToken ) ) as any ;
138158 resolve ( {
139- result : response . body . split ( '\n' ) ,
159+ result : body . split ( '\n' ) ,
140160 meta : {
141- elapsed : parseInt ( ( response . headers [ 'x-warp10-elapsed' ] || [ '0' ] ) [ 0 ] , 10 ) ,
142- ops : parseInt ( ( response . headers [ 'x-warp10-ops' ] || [ '0' ] ) [ 0 ] , 10 ) ,
143- fetched : parseInt ( ( response . headers [ 'x-warp10-fetched' ] || [ '0' ] ) [ 0 ] , 10 )
161+ elapsed : parseInt ( ( headers [ 'x-warp10-elapsed' ] || [ '0' ] ) [ 0 ] , 10 ) ,
162+ ops : parseInt ( ( headers [ 'x-warp10-ops' ] || [ '0' ] ) [ 0 ] , 10 ) ,
163+ fetched : parseInt ( ( headers [ 'x-warp10-fetched' ] || [ '0' ] ) [ 0 ] , 10 )
144164 }
145165 } ) ;
146166 } catch ( error ) {
@@ -166,12 +186,13 @@ export class Warp10 {
166186 return `${ d . timestamp || dayjs . utc ( ) . valueOf ( ) * 1000 } /${ pos } /${ d . elev || '' } ${ d . className } ${ this . formatLabels ( d . labels ) } ${ Warp10 . formatValues ( d . value ) } ` ;
167187 }
168188 } ) ;
169- return new Promise < { response : string , count : number } > ( async ( resolve , reject ) => {
189+ return new Promise < { response : string | undefined , count : number } > ( async ( resolve , reject ) => {
170190 try {
171- const response = await got . post ( `${ this . url } /api/v0/update` , this . getOptions ( payload . join ( '\n' ) , writeToken ) ) as any ;
172- resolve ( { response : response . body , count : payload . length } ) ;
191+ const opts = this . getOptions ( `/api/v0/update` , 'POST' , writeToken ) ;
192+ const { body} = await this . send ( opts , payload . join ( '\n' ) ) as any ;
193+ resolve ( { response : body , count : payload . length } ) ;
173194 } catch ( error ) {
174- reject ( error ) ;
195+ reject ( { error, payload } ) ;
175196 }
176197 } ) ;
177198 }
@@ -202,8 +223,8 @@ export class Warp10 {
202223 }
203224 return new Promise < { result : string } > ( async ( resolve , reject ) => {
204225 try {
205- const response = await got . get ( ` ${ this . url } /api/v0/delete?${ params . toString ( ) } `, this . getOptions ( undefined , deleteToken ) ) as any ;
206- resolve ( { result : response . body } ) ;
226+ const { body } = await this . send ( this . getOptions ( ` /api/v0/delete?${ params . toString ( ) } `, 'GET' , deleteToken ) ) as any ;
227+ resolve ( { result : body } ) ;
207228 } catch ( error ) {
208229 reject ( error ) ;
209230 }
@@ -219,11 +240,15 @@ export class Warp10 {
219240 const payload = meta . map ( m => encodeURIComponent ( m . className ) + this . formatLabels ( m . labels ) + this . formatLabels ( m . attributes ) ) ;
220241 return new Promise < { response : string , count : number } > ( async ( resolve , reject ) => {
221242 try {
222- const response = await got . post ( ` ${ this . url } /api/v0/meta`, this . getOptions ( payload . join ( '\n' ) , writeToken ) ) as any ;
223- resolve ( { response : response . body , count : payload . length } ) ;
243+ const { body } = await this . send ( this . getOptions ( ` /api/v0/meta`, 'POST' , writeToken ) , payload . join ( '\n' ) ) as any ;
244+ resolve ( { response : body , count : payload . length } ) ;
224245 } catch ( error ) {
225246 reject ( error ) ;
226247 }
227248 } ) ;
228249 }
250+
251+ setTimeout ( to : number ) {
252+ this . timeoutOptions = to ;
253+ }
229254}
0 commit comments