Skip to content
This repository was archived by the owner on May 31, 2024. It is now read-only.

Commit 9890113

Browse files
andyedwardsibmsimonkotwicz
authored andcommitted
Treat returned ack data as chunks of a complete ack (#2)
1 parent 0e933a6 commit 9890113

File tree

2 files changed

+72
-58
lines changed

2 files changed

+72
-58
lines changed

LogmetProducer.js

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ var currentSequenceNumber = 0;
3636
// Logmet mandatory field for identifying the data owner
3737
var ALCHEMY_TENANT_ID_KEY = 'ALCH_TENANT_ID';
3838

39-
// Max Lumberjack sequence number before rolling over
39+
// Max Lumberjack sequence number before rolling over
4040
var MAX_SEQ_NUMBER = Number.MAX_SAFE_INTEGER - 1;
4141

4242
// Used to buffer data that needs to be sent out to Logmet
@@ -58,9 +58,9 @@ var windowFramebuffer;
5858
var initialConnectionEstablished = false;
5959

6060
// When a call to terminate() is made, how frequently we should check if all data has been flushed.
61-
var TERMINATE_POLL_INTERVAL = 300; // milliseconds
61+
var TERMINATE_POLL_INTERVAL = 300; // milliseconds
6262

63-
// How much time until we detect an unexpected network error
63+
// How much time until we detect an unexpected network error
6464
var INACTIVITY_TIMEOUT = 30000; // milliseconds
6565

6666
// How much time until we re-try when a connection drops
@@ -82,7 +82,7 @@ var retryFunction;
8282
* @param {string} logmetToken The Logmet token (API key) used for authentication
8383
* @param {boolean} isSuperTenant Flag indicating whether or not the value passed to tenantOrSupertenantId represents a supertenant
8484
* @param {object} options Additional optional parameters that override defaults. Supported overrides: bufferSize
85-
*
85+
*
8686
*/
8787
function LogmetProducer(endpoint, port, tenantOrSupertenantId, logmetToken, isSuperTenant, options) {
8888
this.endpoint = endpoint;
@@ -94,10 +94,10 @@ function LogmetProducer(endpoint, port, tenantOrSupertenantId, logmetToken, isSu
9494
if (options && options.bufferSize && parseInt(options.bufferSize, 10)) {
9595
MAX_PENDING_ELEMENTS = parseInt(options.bufferSize, 10);
9696
}
97-
97+
9898
windowFramebuffer = new Buffer(6);
99-
windowFramebuffer.write('1W', 0, 2);
100-
99+
windowFramebuffer.write('1W', 0, 2);
100+
101101
}
102102

103103
// Export the constructor
@@ -111,9 +111,9 @@ module.exports = LogmetProducer;
111111
/*
112112
* Establishes a connection with Logmet for sending data.
113113
* This function must be called once to enable the sendData function
114-
*
115-
* @param {function(error,data)} callback Callback function to be invoked in case of error or for signaling that
116-
* the handshake with Logmet has successfully completed. The returned error message is assigned to the
114+
*
115+
* @param {function(error,data)} callback Callback function to be invoked in case of error or for signaling that
116+
* the handshake with Logmet has successfully completed. The returned error message is assigned to the
117117
* first argument of callback; if any data is available, it is assigned to the second argument of callback.
118118
*/
119119

@@ -123,34 +123,34 @@ LogmetProducer.prototype.connect = function(callback) {
123123
};
124124

125125
/*
126-
* Sends data to Logmet.
127-
*
126+
* Sends data to Logmet.
127+
*
128128
* A call to the connectToMTLumberjackServer() function must be made before sendData() can be called.
129-
*
129+
*
130130
* @param {object} data The object representing the data to be sent out to Logmet
131131
* @param {string} type The type that identifies the data
132132
* @param {function} callback(error, data) A callback function that is called to notify the caller of the operation result
133-
* @param {string} tenantId The id of the tenant who owns the data
133+
* @param {string} tenantId The id of the tenant who owns the data
134134
*/
135135
LogmetProducer.prototype.sendData = function(data, type, tenantId, callback) {
136136

137-
var activeConnection = socketWrapper.state === State.CONNECTED;
137+
var activeConnection = socketWrapper.state === State.CONNECTED;
138138

139139
if (pendingDataElements.length >= MAX_PENDING_ELEMENTS) {
140140
// Our buffer is full. Apply back pressure.
141141
logger.warn('Buffer of data elements is full. Rejecting new data. Connection state: ' + socketWrapper.state);
142142
callback('ERROR: Buffer of data elements is full.', {connectionActive: activeConnection});
143143
return;
144144
}
145-
145+
146146
var augmentedData = Object.assign({}, data);
147147
augmentedData[ALCHEMY_TENANT_ID_KEY] = tenantId;
148148
augmentedData['type'] = type;
149-
149+
150150
pendingDataElements.push(augmentedData);
151-
151+
152152
logger.debug('Current size of pending data buffer: ' + pendingDataElements.length);
153-
153+
154154
callback('', {connectionActive: activeConnection});
155155
if (activeConnection) {
156156
processDataBuffer();
@@ -161,7 +161,7 @@ LogmetProducer.prototype.sendData = function(data, type, tenantId, callback) {
161161
/*
162162
* Gracefully stops the connection with Logmet's MT Lumberjack server
163163
* It will close the connection only after all locally-buffered data has been received by Logmet.
164-
*
164+
*
165165
* @param {function} callback() A callback function to notify the caller that the connection to logmet has been closed
166166
*/
167167
LogmetProducer.prototype.terminate = function(callback) {
@@ -201,7 +201,7 @@ function connectToMTLumberjackServer(endpoint, port, tenantOrSupertenantId, logm
201201
currentSequenceNumber = 0;
202202
currentAck = -1;
203203
previousAck = -1;
204-
204+
205205
var conn_options = {
206206
host: endpoint,
207207
port: port
@@ -210,7 +210,7 @@ function connectToMTLumberjackServer(endpoint, port, tenantOrSupertenantId, logm
210210
tlsSocket = tls.connect(conn_options, function() {
211211
if (tlsSocket.authorized) {
212212
logger.info('Successfully established a connection with Logmet');
213-
213+
214214
// Now that the connection has been established, let's perform the handshake with Logmet
215215
authenticate(tenantOrSupertenantId, logmetToken, isSuperTenant);
216216
} else {
@@ -229,20 +229,34 @@ function connectToMTLumberjackServer(endpoint, port, tenantOrSupertenantId, logm
229229
retryFunction = connectToMTLumberjackServer.bind(this, endpoint, port, tenantOrSupertenantId, logmetToken, isSuperTenant, callback);
230230

231231
// Define callbacks to handle the network communication with Logmet
232-
232+
233233
tlsSocket.setTimeout(INACTIVITY_TIMEOUT);
234234

235235
tlsSocket.on('timeout', socketEventHandler.bind(socketWrapper, 'timeout'));
236236
tlsSocket.on('error', socketEventHandler.bind(socketWrapper , 'error'));
237237
tlsSocket.on('disconnect', socketEventHandler.bind(socketWrapper, 'disconnect'));
238238
tlsSocket.on('end', socketEventHandler.bind(socketWrapper, 'end'));
239239
tlsSocket.on('close', socketEventHandler.bind(socketWrapper, 'close'));
240-
240+
241+
var ackBuffer = Buffer.allocUnsafe(6);
242+
var ackPosition = 0;
243+
241244
tlsSocket.on('data', function(data) {
242-
// We must have received an ACK from Logmet. Let's process it.
243-
var buffer = new Buffer(data);
244-
var version = buffer[0];
245-
var type = buffer[1];
245+
// We must have received part of an ACK from Logmet. Let's process it.
246+
var dataBuf = Buffer.from(data);
247+
dataBuf.copy(ackBuffer, ackPosition);
248+
ackPosition += dataBuf.length;
249+
250+
if (ackPosition !== 6) {
251+
// We don't have a complete ACK message yet, so return and wait for the rest
252+
return;
253+
} else {
254+
// We now have a complete ACK message, so reset the position pointer in the ackBuffer and continue
255+
ackPosition = 0;
256+
}
257+
258+
var version = ackBuffer[0];
259+
var type = ackBuffer[1];
246260
if (type != 65) {
247261
// Unknown ACK type
248262
logger.error('Received an unknown ACK type from Logmet: ' + String.fromCharCode(type));
@@ -261,23 +275,23 @@ function connectToMTLumberjackServer(endpoint, port, tenantOrSupertenantId, logm
261275
// We got a '"1A"<ack_number>'. Let's read the ACK number.
262276
logger.debug("Reading ACK number");
263277
previousAck = currentAck;
264-
currentAck = buffer.readInt32BE(2);
278+
currentAck = ackBuffer.readInt32BE(2);
265279
logger.info('Last ACK received: ' + currentAck);
266280
if (currentAck == 0) {
267281
// The connection has just been established.
268-
282+
269283
// If this is a reconnection after a failure, let's check if there is unACKED data to be sent
270284
if (unackedDataElements.length !== 0) {
271285
for (var i = 0; i < unackedDataElements.length; i++) {
272286
writeToSocket(unackedDataElements, i);
273287
}
274288
}
275-
289+
276290
logger.info('Initialized the Logmet client. The Logmet handshake is complete.');
277291

278292
// Reset the retry delay, as we have just successfully connected.
279293
RETRY_DELAY = INITIAL_RETRY_DELAY;
280-
294+
281295
if (!initialConnectionEstablished) {
282296
// Let's signal the constructor caller that the connection is established.
283297
// We only notify the constructor caller when the first connection is established.
@@ -291,7 +305,7 @@ function connectToMTLumberjackServer(endpoint, port, tenantOrSupertenantId, logm
291305
unackedDataElements.splice(0, currentAck - previousAck);
292306
}
293307
}
294-
308+
295309
socketWrapper.state = State.CONNECTED;
296310
processDataBuffer();
297311
} else {
@@ -333,64 +347,64 @@ function authenticate(tenantOrSupertenantId, logmetToken, isSuperTenant) {
333347
var idFrameTypeAndVersion = "1I";
334348
var clientIdString = "standalone_dlms_data_client_v0.0.1_" + os.hostname();
335349
logger.info('Identifying the Logmet client: ' + clientIdString);
336-
350+
337351
var idDataBuffer = new Buffer(idFrameTypeAndVersion.length + 1 + clientIdString.length);
338-
352+
339353
idDataBuffer.write(idFrameTypeAndVersion, 0 , idFrameTypeAndVersion.length);
340-
354+
341355
idDataBuffer.writeUIntBE(clientIdString.length, idFrameTypeAndVersion.length, 1);
342356
idDataBuffer.write(clientIdString, idFrameTypeAndVersion.length + 1, clientIdString.length);
343-
357+
344358
// Send the identification frame to Logmet
345359
tlsSocket.write(idDataBuffer);
346-
360+
347361
// Authentication frame:
348362
// 2 | S or T | tenant_id_size | tenant_id | token_size | token
349363
var authFrameTypeAndVersion = isSuperTenant ? '2S' : '2T';
350364
logger.info('Authenticating with Logmet with frame type: ' + authFrameTypeAndVersion[1]);
351-
365+
352366
var bufferSize = authFrameTypeAndVersion.length + tenantOrSupertenantId.length + logmetToken.length + 2;
353367
var authDataBuffer = new Buffer(bufferSize);
354-
368+
355369
authDataBuffer.write(authFrameTypeAndVersion, 0, authFrameTypeAndVersion.length);
356-
370+
357371
authDataBuffer.writeUIntBE(tenantOrSupertenantId.length, authFrameTypeAndVersion.length, 1);
358372
authDataBuffer.write(tenantOrSupertenantId, authFrameTypeAndVersion.length + 1, tenantOrSupertenantId.length);
359-
373+
360374
authDataBuffer.writeUIntBE(logmetToken.length, authFrameTypeAndVersion.length + 1 + tenantOrSupertenantId.length, 1);
361375
authDataBuffer.write(logmetToken, authFrameTypeAndVersion.length + 1 + tenantOrSupertenantId.length + 1, logmetToken.length);
362-
376+
363377
// Send the authentication frame to Logmet
364378
tlsSocket.write(authDataBuffer);
365379
}
366380

367381
/*
368382
* Converts an object into a Lumberjack data frame
369-
*
383+
*
370384
* @param {object} data The object to be converted into a Lumberjack frame
371-
*
385+
*
372386
* @return A Buffer with a Lumberjack frame representing the provided data object
373387
*/
374388
function convertDataToFrame(data, sequence) {
375389
// Data frame:
376390
// 1 | D | <sequence> | <nkeys> | <key_length_i> | <key_i> | <val_length_i> | <val_i> | ...
377-
391+
378392
var dottedNotationData = {};
379393
objectToFlatDottedNotation(data, '', dottedNotationData);
380394
logger.debug('Key-value pairs in dotted notation', dottedNotationData);
381-
395+
382396
var numberOfPairs = Object.keys(dottedNotationData).length;
383397
var bufferSize = 1 + 1 + 4 + 4 + (4 * numberOfPairs) + (4 * numberOfPairs); // "1" | "D" | <seq> | <nkeys> | 4 * <key_length> | 4 * <val_length>
384-
398+
385399
for (var k in dottedNotationData) {
386400
bufferSize += dottedNotationData[k].length + k.length;
387401
}
388-
402+
389403
var buffer = new Buffer(bufferSize);
390404
buffer.write("1D", 0, 2);
391405
buffer.writeUInt32BE(sequence, 2);
392406
buffer.writeUInt32BE(numberOfPairs, 6);
393-
407+
394408
var offset = 10;
395409
for (k in dottedNotationData) {
396410
buffer.writeUInt32BE(k.length, offset);
@@ -413,12 +427,12 @@ function objectToFlatDottedNotation(data, prefix, dottedNotationData) {
413427
var newKey;
414428
for (var k in data) {
415429
if (typeof data[k] === 'string' || typeof data[k] === 'number') {
416-
newKey = (prefix == '') ? k : prefix + '.' + k;
430+
newKey = (prefix == '') ? k : prefix + '.' + k;
417431
dottedNotationData[newKey] = data[k].toString();
418432
} else if (Array.isArray(data[k]) && (typeof data[k][0] === 'string' || typeof data[k][0] === 'number')) {
419433
newKey = (prefix == '') ? k : prefix + '.' + k;
420434
dottedNotationData[newKey] = data[k].join(',');
421-
}
435+
}
422436
else if (typeof data[k] === 'object') {
423437
var newPrefix = (prefix == '') ? k : prefix + '.' + k;
424438
objectToFlatDottedNotation(data[k], newPrefix, dottedNotationData);
@@ -463,4 +477,4 @@ function writeToSocket(unackedDataElements, dataElementIndex) {
463477
function sendWindowFrame(numberOfFrames) {
464478
windowFramebuffer.writeUInt32BE(numberOfFrames, 2);
465479
tlsSocket.write(windowFramebuffer);
466-
}
480+
}

samples/test_logmet_client_send_data.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/* eslint-disable no-console */
22

3-
var LogmetProducer = require('../LogmetProducer');
3+
var LogmetProducer = require('../LogmetProducer');
44

5-
var logmetEndpoint = 'logs.opvis.bluemix.net';
5+
var logmetEndpoint = 'ingest.logging.ng.bluemix.net';
66
var logmetPort = 9091;
77

88
//********************************************************
@@ -82,7 +82,7 @@ logmetClient.connect(function(error, status) {
8282
console.log('LogmetClient returned an error: ' + error);
8383
} else if (status.handshakeCompleted) {
8484
console.log('LogmetClient is ready to send data.');
85-
85+
8686
var data = sample_data;
8787

8888
var i = 0;
@@ -100,8 +100,8 @@ logmetClient.connect(function(error, status) {
100100
});
101101
} else {
102102
clearInterval(timer);
103-
logmetClient.terminate();
103+
logmetClient.terminate(function() {});
104104
}
105105
}, timeBetweenData);
106106
}
107-
});
107+
});

0 commit comments

Comments
 (0)