1
- import { pipeline , Writable } from 'stream'
1
+ import { pipeline , Transform , Writable } from 'stream'
2
2
import tmp from 'tmp'
3
3
import fs from 'fs'
4
4
import http from 'http'
5
5
import https from 'https'
6
6
import { doesHaveValue } from '../value_checker'
7
7
8
- // https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
9
- type HttpMethod =
10
- | 'GET'
11
- | 'HEAD'
12
- | 'POST'
13
- | 'PUT'
14
- | 'DELETE'
15
- | 'CONNECT'
16
- | 'OPTIONS'
17
- | 'TRACE'
18
- | 'PATCH'
8
+ type HttpMethod = 'GET' | 'POST' | 'PUT'
19
9
20
10
/**
21
11
* This Writable writes data to a HTTP/HTTPS URL.
@@ -27,18 +17,18 @@ type HttpMethod =
27
17
*
28
18
* 3xx redirects are not currently followed.
29
19
*/
30
- export default class HttpStream extends Writable {
20
+ export default class HttpStream extends Transform {
31
21
private tempFilePath : string
32
22
private tempFile : Writable
33
- private responseBodyFromGet : string | null = null
34
23
35
24
constructor (
36
25
private readonly url : string ,
37
26
private readonly method : HttpMethod ,
38
- private readonly headers : { [ name : string ] : string } ,
39
- private readonly reportLocation : ( content : string ) => void
27
+ private readonly headers : http . OutgoingHttpHeaders
40
28
) {
41
- super ( )
29
+ super ( {
30
+ readableObjectMode : true ,
31
+ } )
42
32
}
43
33
44
34
_write (
@@ -49,7 +39,6 @@ export default class HttpStream extends Writable {
49
39
if ( this . tempFile === undefined ) {
50
40
tmp . file ( ( err , name , fd ) => {
51
41
if ( doesHaveValue ( err ) ) return callback ( err )
52
-
53
42
this . tempFilePath = name
54
43
this . tempFile = fs . createWriteStream ( name , { fd } )
55
44
this . tempFile . write ( chunk , encoding , callback )
@@ -61,79 +50,97 @@ export default class HttpStream extends Writable {
61
50
62
51
_final ( callback : ( error ?: Error | null ) => void ) : void {
63
52
this . tempFile . end ( ( ) => {
64
- this . sendRequest (
53
+ this . sendHttpRequest (
65
54
this . url ,
66
55
this . method ,
67
- ( err : Error | null | undefined ) => {
68
- if ( doesHaveValue ( err ) ) return callback ( err )
69
- this . reportLocation ( this . responseBodyFromGet )
70
- callback ( null )
56
+ this . headers ,
57
+ ( err1 , res1 ) => {
58
+ if ( doesHaveValue ( err1 ) ) return callback ( err1 )
59
+ this . pushResponseBody ( res1 , ( ) => {
60
+ this . emitErrorUnlessHttp2xx ( res1 , this . url , this . method )
61
+ if (
62
+ res1 . statusCode === 202 &&
63
+ res1 . headers . location !== undefined
64
+ ) {
65
+ this . sendHttpRequest (
66
+ res1 . headers . location ,
67
+ 'PUT' ,
68
+ { } ,
69
+ ( err2 , res2 ) => {
70
+ if ( doesHaveValue ( err2 ) ) return callback ( err2 )
71
+ this . emitErrorUnlessHttp2xx ( res2 , this . url , this . method )
72
+ callback ( )
73
+ }
74
+ )
75
+ } else {
76
+ callback ( )
77
+ }
78
+ } )
71
79
}
72
80
)
73
81
} )
74
82
}
75
83
76
- private sendRequest (
84
+ private pushResponseBody ( res : http . IncomingMessage , done : ( ) => void ) : void {
85
+ let body = Buffer . alloc ( 0 )
86
+ res . on ( 'data' , ( chunk ) => {
87
+ body = Buffer . concat ( [ body , chunk ] )
88
+ } )
89
+ res . on ( 'end' , ( ) => {
90
+ this . push ( body . toString ( 'utf-8' ) )
91
+ done ( )
92
+ } )
93
+ }
94
+
95
+ private emitErrorUnlessHttp2xx (
96
+ res : http . IncomingMessage ,
97
+ url : string ,
98
+ method : string
99
+ ) : void {
100
+ if ( res . statusCode >= 300 )
101
+ this . emit (
102
+ 'error' ,
103
+ new Error (
104
+ `Unexpected http status ${ res . statusCode } from ${ method } ${ url } `
105
+ )
106
+ )
107
+ }
108
+
109
+ private sendHttpRequest (
77
110
url : string ,
78
111
method : HttpMethod ,
79
- callback : ( err : Error | null | undefined , url ?: string ) => void
112
+ headers : http . OutgoingHttpHeaders ,
113
+ callback : ( err ?: Error | null , res ?: http . IncomingMessage ) => void
80
114
) : void {
81
115
const httpx = doesHaveValue ( url . match ( / ^ h t t p s : / ) ) ? https : http
116
+ const additionalHttpHeaders : http . OutgoingHttpHeaders = { }
82
117
83
- if ( method === 'GET' ) {
84
- httpx . get ( url , { headers : this . headers } , ( res ) => {
85
- if ( res . statusCode >= 400 ) {
86
- return callback (
87
- new Error ( `${ method } ${ url } returned status ${ res . statusCode } ` )
88
- )
89
- }
90
-
91
- if ( res . statusCode !== 202 || res . headers . location === undefined ) {
92
- callback ( null , url )
93
- } else {
94
- let body = Buffer . alloc ( 0 )
95
- res . on ( 'data' , ( chunk ) => {
96
- body = Buffer . concat ( [ body , chunk ] )
97
- } )
98
- res . on ( 'end' , ( ) => {
99
- this . responseBodyFromGet = body . toString ( 'utf-8' )
100
- this . sendRequest ( res . headers . location , 'PUT' , callback )
101
- } )
102
- }
103
- } )
104
- } else {
105
- const contentLength = fs . statSync ( this . tempFilePath ) . size
106
- const req = httpx . request ( url , {
107
- method,
108
- headers : {
109
- 'Content-Length' : contentLength ,
110
- } ,
111
- } )
118
+ const upload = method === 'PUT' || method === 'POST'
119
+ if ( upload ) {
120
+ additionalHttpHeaders [ 'Content-Length' ] = fs . statSync (
121
+ this . tempFilePath
122
+ ) . size
123
+ }
112
124
113
- req . on ( 'response' , ( res ) => {
114
- if ( res . statusCode >= 400 ) {
115
- let body = Buffer . alloc ( 0 )
116
- res . on ( 'data' , ( chunk ) => {
117
- body = Buffer . concat ( [ body , chunk ] )
118
- } )
119
- res . on ( 'end' , ( ) => {
120
- callback (
121
- new Error (
122
- `${ method } ${ url } returned status ${
123
- res . statusCode
124
- } :\n${ body . toString ( 'utf-8' ) } `
125
- )
126
- )
127
- } )
128
- res . on ( 'error' , callback )
129
- } else {
130
- callback ( null , url )
131
- }
132
- } )
125
+ const allHeaders = { ...headers , ...additionalHttpHeaders }
126
+ const req = httpx . request ( url , {
127
+ method,
128
+ headers : allHeaders ,
129
+ } )
130
+ req . on ( 'error' , ( err ) => this . emit ( 'error' , err ) )
131
+ req . on ( 'response' , ( res ) => {
132
+ res . on ( 'error' , ( err ) => this . emit ( 'error' , err ) )
133
+ callback ( null , res )
134
+ } )
133
135
136
+ if ( upload ) {
134
137
pipeline ( fs . createReadStream ( this . tempFilePath ) , req , ( err ) => {
135
- if ( doesHaveValue ( err ) ) callback ( err )
138
+ if ( doesHaveValue ( err ) ) {
139
+ this . emit ( 'error' , err )
140
+ }
136
141
} )
142
+ } else {
143
+ req . end ( )
137
144
}
138
145
}
139
146
}
0 commit comments