Skip to content

Commit dc486e4

Browse files
committed
Fixed blob stream deadlock
Fixed the issue in SAP#233 where streaming blobs out of the database into another table can cause a deadlock - Added a "blocked" mode to the Queue which prevents tasks from running except for the blocking task and READ_LOB tasks - Modified ExecuteTask's run to free the queue while it waits for the Writer's getParameters - The callback of getParameters will enqueue the task again to send the packet - Before the freeing of the queue to the next task, ExecuteTask will block the queue to only allow itself and READ_LOB tasks to run - This prevents issues where exec's can run at the same time which will lead to HANA disconnecting and sending invalid LOB locator id errors Implementation Details To implement the "blocked" mode, the Queue is modified to a data structure which supports 3 operations 1. Push a task (preserving the order in which tasks were pushed) 2. Pop a task (remove the task in the order they were pushed) 3. Selective pop (remove a task in the order they were pushed given that the type matches a given variety) For optimization purposes, the tasks that can block are only READ_LOB requests. As such, we only require another readLobQueue which stores READ_LOB tasks to allow those to skip the queue. - An invariant of the Queue is that the queue must store the same READ_LOB tasks as the readLobQueue and possibly more READ_LOB tasks that are already run. If in the future, it is found that other tasks can block, it is possible to maintain a map<message type, list of queue tasks> and still have these 3 operations run in constant time, because the number of message types is finite (and small).
1 parent 67090ff commit dc486e4

File tree

5 files changed

+468
-87
lines changed

5 files changed

+468
-87
lines changed

lib/protocol/Connection.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ Connection.prototype.enqueue = function enqueue(task, cb) {
441441
if (task instanceof request.Segment) {
442442
queueable = this._queue.createTask(this.send.bind(this, task), cb);
443443
queueable.name = MessageTypeName[task.type];
444+
queueable.msgType = task.type;
444445
} else if (util.isFunction(task.run)) {
445446
queueable = task;
446447
}
@@ -732,6 +733,10 @@ Connection.prototype.isIdle = function isIdle() {
732733
return this._queue.empty && !this._queue.busy;
733734
};
734735

736+
Connection.prototype.blockQueue = function blockQueue(blockingTask) {
737+
this._queue.block(blockingTask);
738+
}
739+
735740
Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) {
736741
this._transaction.autoCommit = autoCommit;
737742
};

lib/protocol/ExecuteTask.js

Lines changed: 83 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ function ExecuteTask(connection, options, callback) {
3939
}
4040
this.callback = callback;
4141
this.reply = undefined;
42+
this.finishedError = null;
43+
this.finishedParameters = undefined;
44+
this.isExecuteParams = true;
4245
}
4346

4447
ExecuteTask.create = function createExecuteTask(connection, options, cb) {
@@ -59,60 +62,79 @@ ExecuteTask.prototype.run = function run(next) {
5962
}
6063
if (err) {
6164
return self.sendRollback(function () {
62-
// ignore roolback error
65+
// ignore rollback error
6366
done(err);
6467
});
6568
}
6669
self.sendCommit(done);
6770
}
6871

69-
function execute() {
72+
function getExecuteRequest() {
7073
if (!self.parameterValues.length && !self.writer.hasParameters) {
7174
return finalize();
7275
}
73-
self.sendExecute(function receive(err, reply) {
74-
if (err) {
75-
return finalize(err);
76-
}
77-
if (!self.writer.finished && reply.rowsAffected == -1) {
78-
reply.rowsAffected = undefined;
79-
}
80-
self.pushReply(reply);
81-
if (!self.writer.finished && reply.writeLobReply) {
82-
self.writer.update(reply.writeLobReply);
83-
}
84-
writeLob();
76+
self.finishedParameters = undefined;
77+
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
78+
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
79+
80+
// Block the queue to this task and read lob requests
81+
self.connection.blockQueue(self);
82+
83+
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
84+
// Enqueue itself to wait for when the task becomes the one actively running in the queue
85+
// and the connection is avaliable to send the packet
86+
self.finishedError = err;
87+
self.finishedParameters = parameters;
88+
self.isExecuteParams = true;
89+
self.connection.enqueue(self);
8590
});
91+
92+
// Yield to only read lob tasks in the queue, the callback will enqueue this task
93+
// again once the parameters are ready
94+
next();
8695
}
8796

88-
function writeLob() {
97+
function getWriteLobRequest() {
8998
if (self.writer.finished || self.writer.hasParameters) {
90-
return execute();
99+
return getExecuteRequest();
91100
}
92-
self.sendWriteLobRequest(function receive(err, reply) {
93-
/* jshint unused:false */
94-
if (err) {
95-
return finalize(err);
96-
}
97-
self.pushReply(reply);
98-
writeLob();
101+
self.finishedParameters = undefined;
102+
var availableSize = self.connection.getAvailableSize(true);
103+
self.connection.blockQueue(self);
104+
self.writer.getWriteLobRequest(availableSize, function (err, buffer) {
105+
self.finishedError = err;
106+
self.finishedParameters = buffer;
107+
self.isExecuteParams = false;
108+
self.connection.enqueue(self);
99109
});
110+
111+
next();
100112
}
101113

102-
// validate function code
103-
if (self.parameterValues.length > 1) {
104-
switch (self.functionCode) {
105-
case FunctionCode.DDL:
106-
case FunctionCode.INSERT:
107-
case FunctionCode.UPDATE:
108-
case FunctionCode.DELETE:
109-
break;
110-
default:
111-
return done(createInvalidFunctionCodeError());
114+
if (this.finishedError) {
115+
finalize(this.finishedError);
116+
} else if (this.finishedParameters) {
117+
if (this.isExecuteParams) {
118+
self.sendExecute(this.finishedParameters, finalize, getWriteLobRequest);
119+
} else {
120+
self.sendWriteLobRequest(this.finishedParameters, finalize, getWriteLobRequest);
121+
}
122+
} else { // No stored error or parameters, so get initial execute data
123+
// validate function code
124+
if (self.parameterValues.length > 1) {
125+
switch (self.functionCode) {
126+
case FunctionCode.DDL:
127+
case FunctionCode.INSERT:
128+
case FunctionCode.UPDATE:
129+
case FunctionCode.DELETE:
130+
break;
131+
default:
132+
return done(createInvalidFunctionCodeError());
133+
}
112134
}
113-
}
114135

115-
execute();
136+
getExecuteRequest();
137+
}
116138
};
117139

118140
ExecuteTask.prototype.end = function end(err) {
@@ -195,35 +217,40 @@ ExecuteTask.prototype.getParameters = function getParameters(availableSize, avai
195217
next();
196218
};
197219

198-
ExecuteTask.prototype.sendExecute = function sendExecute(cb) {
220+
ExecuteTask.prototype.sendExecute = function sendExecute(parameters, finalize, cb) {
199221
var self = this;
200-
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
201-
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
202-
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
222+
self.connection.send(request.execute({
223+
autoCommit: self.autoCommit,
224+
holdCursorsOverCommit: self.holdCursorsOverCommit,
225+
scrollableCursor: self.scrollableCursor,
226+
statementId: self.statementId,
227+
parameters: parameters,
228+
useCesu8: self.connection.useCesu8
229+
}), function (err, reply) {
203230
if (err) {
204-
return cb(err);
231+
return finalize(err);
205232
}
206-
self.connection.send(request.execute({
207-
autoCommit: self.autoCommit,
208-
holdCursorsOverCommit: self.holdCursorsOverCommit,
209-
scrollableCursor: self.scrollableCursor,
210-
statementId: self.statementId,
211-
parameters: parameters,
212-
useCesu8: self.connection.useCesu8
213-
}), cb);
233+
if (!self.writer.finished && reply.rowsAffected == -1) {
234+
reply.rowsAffected = undefined;
235+
}
236+
self.pushReply(reply);
237+
if (!self.writer.finished && reply.writeLobReply) {
238+
self.writer.update(reply.writeLobReply);
239+
}
240+
cb();
214241
});
215-
};
242+
}
216243

217-
ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) {
244+
ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(buffer, finalize, cb) {
218245
var self = this;
219-
var availableSize = self.connection.getAvailableSize(true);
220-
self.writer.getWriteLobRequest(availableSize, function send(err, buffer) {
246+
self.connection.send(request.writeLob({
247+
writeLobRequest: buffer
248+
}), function (err, reply) {
221249
if (err) {
222-
return cb(err);
250+
return finalize(err);
223251
}
224-
self.connection.send(request.writeLob({
225-
writeLobRequest: buffer
226-
}), cb);
252+
self.pushReply(reply);
253+
cb();
227254
});
228255
};
229256

lib/util/Queue.js

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
var util = require('util');
1717
var EventEmitter = require('events').EventEmitter;
18+
var MessageType = require('../protocol/common/MessageType');
1819

1920
module.exports = Queue;
2021

@@ -26,6 +27,12 @@ function Queue(immediate) {
2627
this.queue = [];
2728
this.busy = false;
2829
this.running = !!immediate;
30+
// Records read lob tasks which can be called out of position when
31+
// the queue is blocked. If other tasks need to be called out of position
32+
// this can be changed to a Map with the message type as keys.
33+
this.readLobQueue = [];
34+
this.blocked = false;
35+
this.blockingTask = undefined;
2936
}
3037

3138
Object.defineProperty(Queue.prototype, 'empty', {
@@ -36,14 +43,25 @@ Object.defineProperty(Queue.prototype, 'empty', {
3643

3744
Queue.prototype.unshift = function unshift(task) {
3845
this.queue.unshift(task);
39-
if (this.running) {
46+
if (task.msgType === MessageType.READ_LOB) {
47+
this.readLobQueue.unshift(task);
48+
}
49+
if (this.blocked && this._isBlockingTask(task)) {
50+
this.emit('unblock', task);
51+
} else if (this.running) {
4052
this.dequeue();
4153
}
4254
return this;
4355
};
4456

4557
Queue.prototype.push = function push(task) {
58+
if (this.blocked && this._isBlockingTask(task)) {
59+
return this.unshift(task);
60+
}
4661
this.queue.push(task);
62+
if (task.msgType === MessageType.READ_LOB) {
63+
this.readLobQueue.push(task);
64+
}
4765
if (this.running) {
4866
this.dequeue();
4967
}
@@ -72,14 +90,28 @@ Queue.prototype.abort = function abort(err) {
7290
return this;
7391
};
7492

75-
Queue.prototype.createTask = function createTask(send, receive, name) {
76-
return new Task(send, receive, name);
93+
Queue.prototype.createTask = function createTask(send, receive, name, msgType) {
94+
return new Task(send, receive, name, msgType);
7795
};
7896

97+
Queue.prototype.block = function block(blockingTask) {
98+
this.blocked = true;
99+
this.blockingTask = blockingTask;
100+
}
101+
102+
Queue.prototype.unblock = function unblock() {
103+
this.blocked = false;
104+
this.blockingTask = undefined;
105+
}
106+
107+
Queue.prototype._isBlockingTask = function _isBlockingTask(task) {
108+
return task === this.blockingTask || task.msgType === MessageType.READ_LOB;
109+
}
110+
79111
Queue.prototype.dequeue = function dequeue() {
80112
var self = this;
81113

82-
function next(err, name) {
114+
function runNext() {
83115
/* jshint unused:false */
84116
self.busy = false;
85117
if (self.queue.length) {
@@ -89,21 +121,77 @@ Queue.prototype.dequeue = function dequeue() {
89121
}
90122
}
91123

124+
function runReadLob() {
125+
if (self.readLobQueue.length) {
126+
self.busy = false;
127+
if (self.running && !self.busy) {
128+
self.busy = true;
129+
var task = self.readLobQueue.shift();
130+
// Mark the task as ran so it will be skipped in the queue
131+
task.ran = true;
132+
// Optimization: When blocked, often read lobs are the most recently
133+
// added at the beginning or end of the queue so they can be removed from there
134+
// Note that the queue is not empty since it always has at least as many elements
135+
// as the readLobQueue
136+
if (self.queue[0] === task) {
137+
self.queue.shift();
138+
} else if (self.queue[self.queue.length - 1] === task) {
139+
self.queue.pop();
140+
}
141+
task.run(next);
142+
}
143+
} else {
144+
runNext();
145+
}
146+
}
147+
148+
function next(err, name) {
149+
if (self.blocked) {
150+
// Check if there exists a task that can be run
151+
if (self.queue.length && self.blockingTask === self.queue[0]) {
152+
self.unblock();
153+
runNext();
154+
} else if (self.readLobQueue.length) {
155+
runReadLob();
156+
} else {
157+
self.once('unblock', function runTask (task) {
158+
if (task === self.blockingTask) {
159+
self.unblock();
160+
runNext();
161+
} else {
162+
runReadLob();
163+
}
164+
});
165+
}
166+
} else {
167+
runNext();
168+
}
169+
}
170+
92171
function run() {
93172
if (self.running && !self.busy) {
94173
// Queue is running and not busy
95174
self.busy = true;
96175
var task = self.queue.shift();
97-
task.run(next);
176+
if (task.ran) {
177+
next(null, task.name);
178+
} else {
179+
if (task.msgType === MessageType.READ_LOB) {
180+
self.readLobQueue.shift();
181+
}
182+
task.run(next);
183+
}
98184
}
99185
}
100186
run();
101187
};
102188

103-
function Task(send, receive, name) {
189+
function Task(send, receive, name, msgType) {
104190
this.send = send;
105191
this.receive = receive;
106192
this.name = name;
193+
this.msgType = msgType;
194+
this.ran = false;
107195
}
108196

109197
Task.prototype.run = function run(next) {

0 commit comments

Comments
 (0)