Skip to content

Commit eea1682

Browse files
ehennumShiva Verma
authored andcommitted
retry for any request without a streaming payload #550
1 parent 31c87db commit eea1682

File tree

9 files changed

+287
-159
lines changed

9 files changed

+287
-159
lines changed

lib/documents.js

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
'use strict';
1717

18-
1918
var requester = require('./requester.js');
2019
var mlutil = require('./mlutil.js');
2120
var Operation = require('./operation.js');
@@ -869,15 +868,17 @@ function writeStreamImpl(document, categories) {
869868
requestOptions.path = mlutil.databaseParam(connectionParams, endpoint, sep);
870869
mlutil.addTxidHeaders(requestOptions, txid);
871870

872-
// TODO: treat as chunked single document if no properties
873-
var requestPartList = [];
874-
addDocumentParts(requestPartList, document, true);
875-
876871
var operation = new Operation(
877872
'write document stream', this.client, requestOptions, 'chunkedMultipart', 'single'
878873
);
879-
operation.uri = document.uri;
880-
operation.requestDocument = requestPartList;
874+
operation.isReplayable = false;
875+
operation.uri = document.uri;
876+
877+
// TODO: treat as chunked single document if no properties
878+
var requestPartList = [];
879+
addDocumentParts(operation, requestPartList, document, true);
880+
operation.requestDocument = requestPartList;
881+
881882
operation.multipartBoundary = mlutil.multipartBoundary;
882883
operation.errorTransform = uriErrorTransform;
883884

@@ -1098,8 +1099,8 @@ function writeMetadata(document, categories) {
10981099
/** @ignore */
10991100
function writeContent(contentOnly, document, requestParams, categories, requestType) {
11001101
/*jshint validthis:true */
1101-
var content = document.content;
1102-
var hasContent = (content != null);
1102+
var content = document.content;
1103+
var hasContent = (content != null);
11031104

11041105
var endpoint = '/v1/documents';
11051106

@@ -1215,7 +1216,9 @@ function writeContent(contentOnly, document, requestParams, categories, requestT
12151216
operation.categories = categories;
12161217
}
12171218
if (hasContent) {
1218-
operation.requestBody = mlutil.marshal(content);
1219+
operation.requestBody = mlutil.marshal(content, operation);
1220+
} else {
1221+
operation.isReplayable = false;
12191222
}
12201223
operation.outputTransform = singleWriteOutputTransform;
12211224
operation.contentOnly = (contentOnly === true);
@@ -1229,11 +1232,6 @@ function writeDocumentList(contentOnly, documents, requestParams, categories) {
12291232
documents = documents.sort(compareDocuments);
12301233
}
12311234

1232-
var requestPartList = [];
1233-
for (var i=0; i < documents.length; i++) {
1234-
addDocumentParts(requestPartList, documents[i], false);
1235-
}
1236-
12371235
var endpoint = '/v1/documents';
12381236

12391237
var txid = getTxid(requestParams);
@@ -1266,8 +1264,15 @@ function writeDocumentList(contentOnly, documents, requestParams, categories) {
12661264
if (Array.isArray(categories) && categories.length > 0) {
12671265
operation.categories = categories;
12681266
}
1267+
12691268
operation.multipartBoundary = multipartBoundary;
1270-
operation.requestPartList = requestPartList;
1269+
1270+
var requestPartList = [];
1271+
for (var i=0; i < documents.length; i++) {
1272+
addDocumentParts(operation, requestPartList, documents[i], false);
1273+
}
1274+
operation.requestPartList = requestPartList;
1275+
12711276
operation.errorTransform = uriListErrorTransform;
12721277
if (contentOnly === true) {
12731278
operation.subdata = ['documents', 'uri'];
@@ -1425,7 +1430,7 @@ function addWriteConfig(document, hasUri, content, headers, sep) {
14251430
return writeConfig;
14261431
}
14271432
/** @ignore */
1428-
function addDocumentParts(partList, document, isContentOptional) {
1433+
function addDocumentParts(operation, partList, document, isContentOptional) {
14291434
var uri = document.uri;
14301435
var hasUri = (uri != null);
14311436

@@ -1464,7 +1469,7 @@ function addDocumentParts(partList, document, isContentOptional) {
14641469
headers['Content-Disposition'] = disposition+'; category=content';
14651470

14661471
if (hasContent) {
1467-
part.content = mlutil.marshal(content);
1472+
part.content = mlutil.marshal(content, operation);
14681473
}
14691474

14701475
partList.push(part);

lib/endpoint-proxy.js

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ function multiAtomicRequest(client, funcdef, args) {
5656

5757
checkArgNames(funcdef, args, requestHeaders);
5858

59+
const connectionParams = client.connectionParams;
60+
const requestOptions = mlutil.copyProperties(connectionParams);
61+
requestOptions.method = 'POST';
62+
requestOptions.headers = requestHeaders;
63+
requestOptions.path = mlutil.databaseParam(connectionParams, endpoint, '?');
64+
65+
const operation = new Operation(
66+
'call to ' + endpoint, client, requestOptions, 'single', funcdef.returnKind()
67+
);
68+
5969
const requestPartList = [];
6070
for (const [paramName, paramdef] of funcdef.paramdefs()) {
6171
const paramArgs = args[paramName];
@@ -65,25 +75,15 @@ function multiAtomicRequest(client, funcdef, args) {
6575

6676
if (Array.isArray(paramArgs)) {
6777
for (const paramArg of paramArgs) {
68-
requestPartList.push(paramName + '=' + encodeURIComponent(mlutil.marshal(paramArg)));
78+
requestPartList.push(paramName + '=' + encodeURIComponent(mlutil.marshal(paramArg, operation)));
6979
}
7080
} else {
71-
requestPartList.push(paramName + '=' + encodeURIComponent(mlutil.marshal(paramArgs)));
81+
requestPartList.push(paramName + '=' + encodeURIComponent(mlutil.marshal(paramArgs, operation)));
7282
}
7383
}
84+
operation.requestBody = requestPartList.join('&');
7485

7586
const outputTransform = specifyOutputTransform(funcdef, requestHeaders);
76-
77-
const connectionParams = client.connectionParams;
78-
const requestOptions = mlutil.copyProperties(connectionParams);
79-
requestOptions.method = 'POST';
80-
requestOptions.headers = requestHeaders;
81-
requestOptions.path = mlutil.databaseParam(connectionParams, endpoint, '?');
82-
83-
const operation = new Operation(
84-
'call to ' + endpoint, client, requestOptions, 'single', funcdef.returnKind()
85-
);
86-
operation.requestBody = mlutil.marshal(requestPartList.join('&'));
8787
addOutputTransform(funcdef, operation, outputTransform);
8888

8989
return startRequest(funcdef, operation);
@@ -99,6 +99,17 @@ function multiNodeRequest(client, funcdef, args) {
9999

100100
checkArgNames(funcdef, args, requestHeaders);
101101

102+
const connectionParams = client.connectionParams;
103+
const requestOptions = mlutil.copyProperties(connectionParams);
104+
requestOptions.method = 'POST';
105+
requestOptions.headers = requestHeaders;
106+
requestOptions.path = mlutil.databaseParam(connectionParams, endpoint, '?');
107+
108+
const operation = new Operation(
109+
'call to ' + endpoint, client, requestOptions, 'multipart', funcdef.returnKind()
110+
);
111+
operation.multipartBoundary = multipartBoundary;
112+
102113
const requestPartList = [];
103114
for (const [paramName, paramdef] of funcdef.paramdefs()) {
104115
const paramArgs = args[paramName];
@@ -115,30 +126,19 @@ function multiNodeRequest(client, funcdef, args) {
115126
for (const paramArg of paramArgs) {
116127
requestPartList.push({
117128
headers: headers,
118-
content: mlutil.marshal(paramArg)
129+
content: mlutil.marshal(paramArg, operation)
119130
});
120131
}
121132
} else {
122133
requestPartList.push({
123134
headers: headers,
124-
content: mlutil.marshal(paramArgs)
135+
content: mlutil.marshal(paramArgs, operation)
125136
});
126137
}
127138
}
139+
operation.requestPartList = requestPartList;
128140

129141
const outputTransform = specifyOutputTransform(funcdef, requestHeaders);
130-
131-
const connectionParams = client.connectionParams;
132-
const requestOptions = mlutil.copyProperties(connectionParams);
133-
requestOptions.method = 'POST';
134-
requestOptions.headers = requestHeaders;
135-
requestOptions.path = mlutil.databaseParam(connectionParams, endpoint, '?');
136-
137-
const operation = new Operation(
138-
'call to ' + endpoint, client, requestOptions, 'multipart', funcdef.returnKind()
139-
);
140-
operation.multipartBoundary = multipartBoundary;
141-
operation.requestPartList = requestPartList;
142142
addOutputTransform(funcdef, operation, outputTransform);
143143

144144
return startRequest(funcdef, operation);

lib/graphs.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,9 @@ function changeGraph(action, isStreaming, args) {
391391
if (graphType === 'named') {
392392
operation.uri = uri;
393393
}
394-
if (!isStreaming) {
394+
if (isStreaming) {
395+
operation.isReplayable = false;
396+
} else {
395397
operation.requestBody = data;
396398
}
397399
operation.outputTransform = emptyOutputTransform;

lib/mlutil.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
'use strict';
1717
var util = require("util");
1818

19-
2019
var multipartBoundary = 'MLBOUND_' + Date.UTC(2014,12,31);
2120

2221
// Normalize arguments by returning them as an array.
@@ -213,7 +212,7 @@ function identify(arg, withValues) {
213212
}
214213
}
215214

216-
function marshal(data) {
215+
function marshal(data, state) {
217216
if (data == null) {
218217
return null;
219218
} else if (typeof data === 'string' || data instanceof String) {
@@ -222,6 +221,9 @@ function marshal(data) {
222221
return data;
223222
// readable stream might not inherit from ReadableStream
224223
} else if (typeof data._read === 'function') {
224+
if (state !== void 0 && state.isReplayable === true) {
225+
state.isReplayable = false;
226+
}
225227
return data;
226228
} else if (typeof data === 'object' && data !== null) {
227229
if (Array.isArray(data)) {

lib/operation.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ function Operation(name, client, options, requestType, responseType) {
2626
this.logger = client.getLogger();
2727
this.options = options;
2828
this.requestType = requestType;
29+
this.isReplayable = true;
2930
this.responseType = responseType;
3031
this.validStatusCodes = null;
3132
this.inlineAsDocument = true;
3233
this.errorTransform = null;
3334
this.error = null;
3435
this.outputTransform = null;
3536
this.subdata = null;
37+
this.authenticator = null;
38+
this.inputSender = null;
3639
this.startedResponse = false;
3740
this.done = false;
3841
this.outputMode = 'none';
@@ -48,6 +51,8 @@ function Operation(name, client, options, requestType, responseType) {
4851
this.nextMetadataBuffer = null;
4952
this.timestamp = null;
5053
this.complexValues = null;
54+
this.retryAttempt = 0;
55+
this.retryDuration = 0;
5156
}
5257
Operation.prototype.STREAM_MODES_CHUNKED_OBJECT_SEQUENCE =
5358
{chunked: true, object: true, sequence: true};

0 commit comments

Comments
 (0)