Skip to content

Commit 776f4d9

Browse files
fixup for arsenal sdk migration
1 parent ee10eb3 commit 776f4d9

File tree

4 files changed

+73
-156
lines changed

4 files changed

+73
-156
lines changed

lib/storage/data/external/AwsClient.js

Lines changed: 37 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const { S3Client,
1818
NotFound } = require('@aws-sdk/client-s3');
1919
const { Upload } = require('@aws-sdk/lib-storage');
2020
const werelogs = require('werelogs');
21-
const { Readable } = require('stream');
2221
const errors = require('../../../errors').default;
2322
const errorInstances = require('../../../errors').errorInstances;
2423
const MD5Sum = require('../../../s3middleware/MD5Sum').default;
@@ -121,62 +120,6 @@ class AwsClient {
121120
};
122121
}
123122

124-
_normalizeSdkStream(body) {
125-
if (!body) {
126-
return Readable.from([]);
127-
}
128-
129-
const isReadable = Readable.isReadable
130-
? Readable.isReadable(body)
131-
: body instanceof Readable || typeof body?.pipe === 'function';
132-
133-
if (isReadable) {
134-
return body;
135-
}
136-
137-
if (body.source) {
138-
const source = body.source;
139-
const sourceIsReadable = Readable.isReadable
140-
? Readable.isReadable(source)
141-
: source instanceof Readable || typeof source?.pipe === 'function';
142-
if (sourceIsReadable) {
143-
return source;
144-
}
145-
}
146-
147-
const fromWeb = Readable.fromWeb ? Readable.fromWeb.bind(Readable) : null;
148-
149-
if (typeof body.transformToWebStream === 'function' && fromWeb) {
150-
return fromWeb(body.transformToWebStream());
151-
}
152-
153-
if (typeof body.getReader === 'function' && fromWeb) {
154-
return fromWeb(body);
155-
}
156-
157-
if (typeof body.stream === 'function' && fromWeb) {
158-
return fromWeb(body.stream());
159-
}
160-
161-
if (body instanceof Uint8Array || ArrayBuffer.isView(body)) {
162-
return Readable.from(body);
163-
}
164-
165-
if (body instanceof ArrayBuffer) {
166-
return Readable.from(Buffer.from(body));
167-
}
168-
169-
if (typeof body === 'string') {
170-
return Readable.from([body]);
171-
}
172-
173-
if (typeof body?.[Symbol.asyncIterator] === 'function') {
174-
return Readable.from(body);
175-
}
176-
177-
return undefined;
178-
}
179-
180123
put(stream, size, keyContext, reqUids, callback) {
181124
const awsKey = this._createAwsKey(keyContext.bucketName,
182125
keyContext.objectKey, this._bucketMatch);
@@ -295,70 +238,47 @@ class AwsClient {
295238
const abortController = new AbortController();
296239
this._client.send(command, { abortSignal: abortController.signal })
297240
.then(data => {
298-
// const rawBody = data.Body;
299-
// const stream = this._normalizeSdkStream(rawBody);
300-
// if (!stream || typeof stream.on !== 'function') {
301-
// const bodyType = rawBody?.constructor?.name || typeof rawBody;
302-
// log.error('unsupported body type from AWS SDK getObject response', {
303-
// method: 'AwsClient.get',
304-
// backendType: this.clientType,
305-
// dataStoreName: this._dataStoreName,
306-
// bodyType,
307-
// });
308-
// return cbOnce(errorInstances.InternalError
309-
// .customizeDescription('Unsupported response body type returned from AWS SDK'));
310-
// }
311-
312-
// if (data.$metadata?.httpHeaders) {
313-
// log.trace(`${this.type} GET request response headers`, {
314-
// responseHeaders: data.$metadata.httpHeaders,
315-
// backendType: this.clientType,
316-
// });
317-
// }
318-
319-
// let finished = false;
320-
// const originalDestroy = stream.destroy.bind(stream);
241+
const stream = data.Body;
242+
if (!stream) {
243+
const bodyType = stream?.constructor?.name || typeof stream;
244+
log.error('unsupported body type from AWS SDK getObject response', {
245+
method: 'AwsClient.get',
246+
backendType: this.clientType,
247+
dataStoreName: this._dataStoreName,
248+
bodyType,
249+
});
250+
return cbOnce(errorInstances.InternalError
251+
.customizeDescription('Unsupported response body type returned from AWS SDK'));
252+
}
321253

322-
// const abortRequest = err => {
323-
// if (finished) {
324-
// return stream;
325-
// }
326-
// finished = true;
327-
// log.debug('aborting GET request in progress', { objectGetInfo });
328-
// abortController.abort(err);
329-
// if (rawBody && rawBody !== stream) {
330-
// rawBody.destroy?.(err);
331-
// }
332-
// try {
333-
// originalDestroy(err);
334-
// } catch (destroyErr) {
335-
// log.debug('error while destroying aws sdk stream', {
336-
// method: 'AwsClient.get',
337-
// backendType: this.clientType,
338-
// dataStoreName: this._dataStoreName,
339-
// error: destroyErr,
340-
// });
341-
// }
342-
// return stream;
343-
// };
254+
if (data.$metadata?.httpHeaders) {
255+
log.trace(`${this.type} GET request response headers`, {
256+
responseHeaders: data.$metadata.httpHeaders,
257+
backendType: this.clientType,
258+
});
259+
}
344260

345-
// stream.abort = abortRequest;
346-
// stream.destroy = err => abortRequest(err);
347-
// stream.createReadStream = () => stream;
261+
// Override destroy to also abort the HTTP request
262+
const originalDestroy = stream.destroy.bind(stream);
263+
stream.destroy = err => {
264+
log.debug('aborting GET request in progress', { objectGetInfo });
265+
abortController.abort(err);
266+
return originalDestroy(err);
267+
};
348268

349-
// stream.on('error', err => {
350-
// const logLevel = err?.code === 'NotFound' ? 'info' : 'error';
351-
// logHelper(
352-
// log,
353-
// logLevel,
354-
// `error streaming data from ${this.type}`,
355-
// err,
356-
// this._dataStoreName,
357-
// this.clientType
358-
// );
359-
// });
269+
stream.on('error', err => {
270+
const logLevel = err?.code === 'NotFound' ? 'info' : 'error';
271+
logHelper(
272+
log,
273+
logLevel,
274+
`error streaming data from ${this.type}`,
275+
err,
276+
this._dataStoreName,
277+
this.clientType
278+
);
279+
});
360280

361-
return process.nextTick(() => cbOnce(null, null));
281+
return process.nextTick(() => cbOnce(null, stream));
362282
})
363283
.catch(err => {
364284
if (err instanceof NoSuchKey || err instanceof NotFound) {

lib/storage/data/external/GCP/GcpService.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ class GcpClient extends S3Client {
107107
if (metaVersionId) {
108108
result.MetaVersionId = metaVersionId;
109109
}
110-
return callback && callback(null, result);
110+
return callback(null, result);
111111
})
112-
.catch(err => callback?.(err));
112+
.catch(err => callback(err));
113113
}
114114

115115
deleteBucket(params, callback) {
@@ -354,9 +354,11 @@ class GcpClient extends S3Client {
354354
if (objectGeneration) {
355355
result.VersionId = objectGeneration;
356356
}
357-
return callback && callback(null, result);
357+
return callback(null, result);
358358
})
359-
.catch(err => callback?.(err));
359+
.catch(err => {
360+
return callback(err);
361+
});
360362
}
361363

362364
/**
@@ -369,8 +371,8 @@ class GcpClient extends S3Client {
369371
delete deleteParams.VersionId;
370372
}
371373
return this.send(new DeleteObjectCommand(deleteParams))
372-
.then(data => callback?.(null, data))
373-
.catch(err => callback?.(err));
374+
.then(data => callback(null, data))
375+
.catch(err => callback(err));
374376
}
375377

376378
/**
@@ -402,10 +404,10 @@ class GcpClient extends S3Client {
402404
});
403405
return this.send(command)
404406
.then(data => {
405-
return callback && callback(null, data);
407+
return callback(null, data);
406408
})
407409
.catch(err => {
408-
return callback?.(err);
410+
return callback(err);
409411
});
410412
}
411413

tests/unit/s3routes/routesUtils/responseStreamData.spec.js

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const logger = new werelogs.Logger('test:routesUtils.responseStreamData');
88
const { responseStreamData } = require('../../../../lib/s3routes/routesUtils');
99
const AwsClient = require('../../../../lib/storage/data/external/AwsClient');
1010
const DummyObjectStream = require('../../storage/data/DummyObjectStream');
11-
const { once } = require('../../../../lib/jsutil');
1211

1312
werelogs.configure({
1413
level: 'debug',
@@ -177,32 +176,28 @@ describe('routesUtils.responseStreamData', () => {
177176

178177
it('should not leak socket if client closes the connection before ' +
179178
'data backend starts streaming', done => {
180-
const doneOnce = once(done);
181-
try {
182-
responseStreamData(undefined, {}, {}, [{
183-
key: 'foo',
184-
size: 10000000,
185-
}], {
186-
client: awsClient,
187-
implName: 'impl',
188-
config: {},
189-
locStorageCheckFn: () => {},
190-
}, {
191-
setHeader: () => {},
192-
writeHead: () => {},
193-
on: () => {},
194-
once: () => {},
195-
emit: () => {},
196-
write: () => {},
197-
end: () => setTimeout(() => {
198-
const nOpenSockets = Object.keys(awsAgent.sockets).length;
199-
assert.strictEqual(nOpenSockets, 0);
200-
doneOnce();
201-
}, 1000),
202-
isclosed: true,
203-
}, undefined, logger.newRequestLogger());
204-
} catch (err) {
205-
doneOnce(err);
206-
}
179+
responseStreamData(undefined, {}, {}, [{
180+
key: 'foo',
181+
size: 10000000,
182+
}], {
183+
client: awsClient,
184+
implName: 'impl',
185+
config: {},
186+
locStorageCheckFn: () => {},
187+
}, {
188+
setHeader: () => {},
189+
writeHead: () => {},
190+
on: () => {},
191+
once: () => {},
192+
emit: () => {},
193+
write: () => {},
194+
end: () => setTimeout(() => {
195+
const nOpenSockets = Object.keys(awsAgent.sockets).length;
196+
assert.strictEqual(nOpenSockets, 0);
197+
done();
198+
}, 1000),
199+
// fake a connection close from the S3 client by setting the "isclosed" flag
200+
isclosed: true,
201+
}, undefined, logger.newRequestLogger());
207202
});
208203
});

tests/unit/storage/data/DummyService.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,11 @@ class DummyService {
287287

288288
_handleGetObject(command) {
289289
const stream = new DummyObjectStream(0, 10000000);
290-
stream.abort = () => {};
290+
// stream.abort = () => {};
291291
const response = {
292-
createReadStream: () => stream,
292+
// createReadStream: () => stream,
293293
Body: stream,
294-
abort: () => {},
294+
// abort: () => {},
295295
$metadata: {
296296
httpStatusCode: 200,
297297
requestId: 'mock-request-id',

0 commit comments

Comments
 (0)