17
17
18
18
import * as zlib from 'zlib' ;
19
19
20
- import { Call , WriteFlags , WriteObject } from './call-stream' ;
20
+ import { Call , WriteObject , WriteFlags } from './call-stream' ;
21
21
import { Channel } from './channel' ;
22
+ import { ChannelOptions } from './channel-options' ;
23
+ import { CompressionAlgorithms } from './compression-algorithms' ;
24
+ import { LogVerbosity } from './constants' ;
22
25
import { BaseFilter , Filter , FilterFactory } from './filter' ;
26
+ import * as logging from './logging' ;
23
27
import { Metadata , MetadataValue } from './metadata' ;
24
28
29
+ const isCompressionAlgorithmKey = ( key : number ) : key is CompressionAlgorithms => {
30
+ return typeof key === 'number' && typeof CompressionAlgorithms [ key ] === 'string' ;
31
+ }
32
+
33
+ type CompressionAlgorithm = keyof typeof CompressionAlgorithms ;
34
+
35
+ type SharedCompressionFilterConfig = {
36
+ serverSupportedEncodingHeader ?: string ;
37
+ } ;
38
+
25
39
abstract class CompressionHandler {
26
40
protected abstract compressMessage ( message : Buffer ) : Promise < Buffer > ;
27
41
protected abstract decompressMessage ( data : Buffer ) : Promise < Buffer > ;
@@ -167,10 +181,45 @@ function getCompressionHandler(compressionName: string): CompressionHandler {
167
181
export class CompressionFilter extends BaseFilter implements Filter {
168
182
private sendCompression : CompressionHandler = new IdentityHandler ( ) ;
169
183
private receiveCompression : CompressionHandler = new IdentityHandler ( ) ;
184
+ private currentCompressionAlgorithm : CompressionAlgorithm = 'identity' ;
185
+
186
+ constructor ( channelOptions : ChannelOptions , private sharedFilterConfig : SharedCompressionFilterConfig ) {
187
+ super ( ) ;
188
+
189
+ const compressionAlgorithmKey = channelOptions [ 'grpc.default_compression_algorithm' ] ;
190
+ if ( compressionAlgorithmKey !== undefined ) {
191
+ if ( isCompressionAlgorithmKey ( compressionAlgorithmKey ) ) {
192
+ const clientSelectedEncoding = CompressionAlgorithms [ compressionAlgorithmKey ] as CompressionAlgorithm ;
193
+ const serverSupportedEncodings = sharedFilterConfig . serverSupportedEncodingHeader ?. split ( ',' ) ;
194
+ /**
195
+ * There are two possible situations here:
196
+ * 1) We don't have any info yet from the server about what compression it supports
197
+ * In that case we should just use what the client tells us to use
198
+ * 2) We've previously received a response from the server including a grpc-accept-encoding header
199
+ * In that case we only want to use the encoding chosen by the client if the server supports it
200
+ */
201
+ if ( ! serverSupportedEncodings || serverSupportedEncodings . includes ( clientSelectedEncoding ) ) {
202
+ this . currentCompressionAlgorithm = clientSelectedEncoding ;
203
+ this . sendCompression = getCompressionHandler ( this . currentCompressionAlgorithm ) ;
204
+ }
205
+ } else {
206
+ logging . log ( LogVerbosity . ERROR , `Invalid value provided for grpc.default_compression_algorithm option: ${ compressionAlgorithmKey } ` ) ;
207
+ }
208
+ }
209
+ }
210
+
170
211
async sendMetadata ( metadata : Promise < Metadata > ) : Promise < Metadata > {
171
212
const headers : Metadata = await metadata ;
172
213
headers . set ( 'grpc-accept-encoding' , 'identity,deflate,gzip' ) ;
173
214
headers . set ( 'accept-encoding' , 'identity' ) ;
215
+
216
+ // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
217
+ if ( this . currentCompressionAlgorithm === 'identity' ) {
218
+ headers . remove ( 'grpc-encoding' ) ;
219
+ } else {
220
+ headers . set ( 'grpc-encoding' , this . currentCompressionAlgorithm ) ;
221
+ }
222
+
174
223
return headers ;
175
224
}
176
225
@@ -183,6 +232,19 @@ export class CompressionFilter extends BaseFilter implements Filter {
183
232
}
184
233
}
185
234
metadata . remove ( 'grpc-encoding' ) ;
235
+
236
+ /* Check to see if the compression we're using to send messages is supported by the server
237
+ * If not, reset the sendCompression filter and have it use the default IdentityHandler */
238
+ const serverSupportedEncodingsHeader = metadata . get ( 'grpc-accept-encoding' ) [ 0 ] as string | undefined ;
239
+ if ( serverSupportedEncodingsHeader ) {
240
+ this . sharedFilterConfig . serverSupportedEncodingHeader = serverSupportedEncodingsHeader ;
241
+ const serverSupportedEncodings = serverSupportedEncodingsHeader . split ( ',' ) ;
242
+
243
+ if ( ! serverSupportedEncodings . includes ( this . currentCompressionAlgorithm ) ) {
244
+ this . sendCompression = new IdentityHandler ( ) ;
245
+ this . currentCompressionAlgorithm = 'identity' ;
246
+ }
247
+ }
186
248
metadata . remove ( 'grpc-accept-encoding' ) ;
187
249
return metadata ;
188
250
}
@@ -192,10 +254,13 @@ export class CompressionFilter extends BaseFilter implements Filter {
192
254
* and the output is a framed and possibly compressed message. For this
193
255
* reason, this filter should be at the bottom of the filter stack */
194
256
const resolvedMessage : WriteObject = await message ;
195
- const compress =
196
- resolvedMessage . flags === undefined
197
- ? false
198
- : ( resolvedMessage . flags & WriteFlags . NoCompress ) === 0 ;
257
+ let compress : boolean ;
258
+ if ( this . sendCompression instanceof IdentityHandler ) {
259
+ compress = false ;
260
+ } else {
261
+ compress = ( ( resolvedMessage . flags ?? 0 ) & WriteFlags . NoCompress ) === 0 ;
262
+ }
263
+
199
264
return {
200
265
message : await this . sendCompression . writeMessage (
201
266
resolvedMessage . message ,
@@ -216,8 +281,9 @@ export class CompressionFilter extends BaseFilter implements Filter {
216
281
217
282
export class CompressionFilterFactory
218
283
implements FilterFactory < CompressionFilter > {
219
- constructor ( private readonly channel : Channel ) { }
284
+ private sharedFilterConfig : SharedCompressionFilterConfig = { } ;
285
+ constructor ( private readonly channel : Channel , private readonly options : ChannelOptions ) { }
220
286
createFilter ( callStream : Call ) : CompressionFilter {
221
- return new CompressionFilter ( ) ;
287
+ return new CompressionFilter ( this . options , this . sharedFilterConfig ) ;
222
288
}
223
289
}
0 commit comments