Skip to content

Commit 99849b2

Browse files
authored
Delta support (#126)
* initial delta support * added tests for delta feature * updated documentation and default * fixed type in config * fixed eslint errors * switching travis to modern node * Additional test for test coverage to pass. * Additional validateConfig() test. * fixed mislabeled variable * fixed sync issue caused by validation of deleted instances
1 parent 4cd9d01 commit 99849b2

File tree

8 files changed

+363
-38
lines changed

8 files changed

+363
-38
lines changed

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ services:
66
- docker
77

88
node_js:
9-
- "0.10"
10-
- "0.12"
119
- "4"
1210
- "6"
11+
- "8"
1312

1413
script:
1514
- npm run test && npm run integration

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ option | default value | description
193193
---- | --- | ---
194194
`requestMiddleware` | noop | Custom middleware function to modify the outgoing [request](https://www.npmjs.com/package/request) to eureka
195195
`logger` | console logging | logger implementation for the client to use
196+
`shouldUseDelta` | false | Experimental mode to fetch deltas from eureka instead of full registry on update
196197
`eureka.maxRetries` | `3` | Number of times to retry all requests to eureka
197198
`eureka.requestRetryDelay` | `500` | milliseconds to wait between retries. This will be multiplied by the # of failed retries.
198199
`eureka.heartbeatInterval` | `30000` | milliseconds to wait between heartbeats

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "eureka-js-client",
3-
"version": "4.3.0",
3+
"version": "4.4.0",
44
"description": "A JavaScript implementation the Netflix OSS service registry, Eureka.",
55
"main": "lib/index.js",
66
"scripts": {

src/EurekaClient.js

Lines changed: 108 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import request from 'request';
22
import fs from 'fs';
33
import yaml from 'js-yaml';
4-
import merge from 'lodash/merge';
4+
import { merge, findIndex } from 'lodash';
5+
import { normalizeDelta, findInstance } from './deltaUtils';
56
import path from 'path';
67
import { series, waterfall } from 'async';
78
import { EventEmitter } from 'events';
@@ -69,6 +70,8 @@ export default class Eureka extends EventEmitter {
6970

7071
this.requestMiddleware = this.config.requestMiddleware;
7172

73+
this.hasFullRegistry = false;
74+
7275
if (this.amazonDataCenter) {
7376
this.metadataClient = new AwsMetadata({
7477
logger: this.logger,
@@ -328,32 +331,72 @@ export default class Eureka extends EventEmitter {
328331
}
329332

330333
/*
331-
Retrieves all applications registered with the Eureka server
334+
Orchestrates fetching registry
332335
*/
333336
fetchRegistry(callback = noop) {
337+
if (this.config.shouldUseDelta && this.hasFullRegistry) {
338+
this.fetchDelta(callback);
339+
} else {
340+
this.fetchFullRegistry(callback);
341+
}
342+
}
343+
344+
/*
345+
Retrieves all applications registered with the Eureka server
346+
*/
347+
fetchFullRegistry(callback = noop) {
334348
this.eurekaRequest({
335349
uri: '',
336350
headers: {
337351
Accept: 'application/json',
338352
},
339353
}, (error, response, body) => {
340354
if (!error && response.statusCode === 200) {
341-
this.logger.debug('retrieved registry successfully');
355+
this.logger.debug('retrieved full registry successfully');
342356
try {
343357
this.transformRegistry(JSON.parse(body));
344358
} catch (ex) {
345359
return callback(ex);
346360
}
347361
this.emit('registryUpdated');
362+
this.hasFullRegistry = true;
348363
return callback(null);
349364
} else if (error) {
350365
this.logger.warn('Error fetching registry', error);
351366
return callback(error);
352367
}
353-
callback(new Error('Unable to retrieve registry from Eureka server'));
368+
callback(new Error('Unable to retrieve full registry from Eureka server'));
354369
});
355370
}
356371

372+
/*
373+
Retrieves all applications registered with the Eureka server
374+
*/
375+
fetchDelta(callback = noop) {
376+
this.eurekaRequest({
377+
uri: 'delta',
378+
headers: {
379+
Accept: 'application/json',
380+
},
381+
}, (error, response, body) => {
382+
if (!error && response.statusCode === 200) {
383+
this.logger.debug('retrieved delta successfully');
384+
let applications;
385+
try {
386+
const jsonBody = JSON.parse(body);
387+
applications = jsonBody.applications.application;
388+
this.handleDelta(this.cache, applications);
389+
return callback(null);
390+
} catch (ex) {
391+
return callback(ex);
392+
}
393+
} else if (error) {
394+
this.logger.warn('Error fetching delta registry', error);
395+
return callback(error);
396+
}
397+
callback(new Error('Unable to retrieve delta registry from Eureka server'));
398+
});
399+
}
357400
/*
358401
Transforms the given registry and caches the registry locally
359402
*/
@@ -382,24 +425,11 @@ export default class Eureka extends EventEmitter {
382425
*/
383426
transformApp(app, cache) {
384427
if (app.instance.length) {
385-
const instances = app.instance.filter((instance) => (this.validateInstance(instance)));
386-
cache.app[app.name.toUpperCase()] = instances;
387-
instances.forEach((inst) => {
388-
const vipAddresses = this.splitVipAddress(inst.vipAddress);
389-
vipAddresses.forEach((vipAddress) => {
390-
if (!cache.vip[vipAddress]) {
391-
cache.vip[vipAddress] = [];
392-
}
393-
cache.vip[vipAddress].push(inst);
394-
});
395-
});
428+
app.instance
429+
.filter(this.validateInstance.bind(this))
430+
.forEach((inst) => this.addInstance(cache, inst));
396431
} else if (this.validateInstance(app.instance)) {
397-
const instances = [app.instance];
398-
const vipAddresses = this.splitVipAddress(app.instance.vipAddress);
399-
vipAddresses.forEach((vipAddress) => {
400-
cache.vip[vipAddress] = instances;
401-
});
402-
cache.app[app.name.toUpperCase()] = instances;
432+
this.addInstance(cache, app.instance);
403433
}
404434
}
405435

@@ -421,6 +451,63 @@ export default class Eureka extends EventEmitter {
421451
return vipAddress.split(',');
422452
}
423453

454+
handleDelta(cache, appDelta) {
455+
const delta = normalizeDelta(appDelta);
456+
delta.forEach((app) => {
457+
app.instance.forEach((instance) => {
458+
switch (instance.actionType) {
459+
case 'ADDED': this.addInstance(cache, instance); break;
460+
case 'MODIFIED': this.modifyInstance(cache, instance); break;
461+
case 'DELETED': this.deleteInstance(cache, instance); break;
462+
default: this.logger.warn('Unknown delta actionType', instance.actionType); break;
463+
}
464+
});
465+
});
466+
}
467+
468+
addInstance(cache, instance) {
469+
if (!this.validateInstance(instance)) return;
470+
const vipAddresses = this.splitVipAddress(instance.vipAddress);
471+
const appName = instance.app.toUpperCase();
472+
vipAddresses.forEach((vipAddress) => {
473+
const alreadyContains = findIndex(cache.vip[vipAddress], findInstance(instance)) > -1;
474+
if (alreadyContains) return;
475+
if (!cache.vip[vipAddress]) {
476+
cache.vip[vipAddress] = [];
477+
}
478+
cache.vip[vipAddress].push(instance);
479+
});
480+
if (!cache.app[appName]) cache.app[appName] = [];
481+
const alreadyContains = findIndex(cache.app[appName], findInstance(instance)) > -1;
482+
if (alreadyContains) return;
483+
cache.app[appName].push(instance);
484+
}
485+
486+
modifyInstance(cache, instance) {
487+
if (!this.validateInstance(instance)) return;
488+
const vipAddresses = this.splitVipAddress(instance.vipAddress);
489+
const appName = instance.app.toUpperCase();
490+
vipAddresses.forEach((vipAddress) => {
491+
const index = findIndex(cache.vip[vipAddress], findInstance(instance));
492+
if (index > -1) cache.vip[vipAddress].splice(index, 1, instance);
493+
else this.addInstance(cache, instance);
494+
});
495+
const index = findIndex(cache.app[appName], findInstance(instance));
496+
if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1, instance);
497+
else this.addInstance(cache, instance);
498+
}
499+
500+
deleteInstance(cache, instance) {
501+
const vipAddresses = this.splitVipAddress(instance.vipAddress);
502+
const appName = instance.app.toUpperCase();
503+
vipAddresses.forEach((vipAddress) => {
504+
const index = findIndex(cache.vip[vipAddress], findInstance(instance));
505+
if (index > -1) cache.vip[vipAddress].splice(index, 1);
506+
});
507+
const index = findIndex(cache.app[appName], findInstance(instance));
508+
if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1);
509+
}
510+
424511
/*
425512
Fetches the metadata using the built-in client and updates the instance
426513
configuration with the hostname and IP address. If the value of the config

src/defaultConfig.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Default configuration values:
22
export default {
33
requestMiddleware: (request, done) => done(request),
4+
shouldUseDelta: false,
45
eureka: {
56
heartbeatInterval: 30000,
67
registryFetchInterval: 30000,

src/deltaUtils.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
General utilities for handling processing of delta changes from eureka.
3+
*/
4+
export function arrayOrObj(mysteryValue) {
5+
return Array.isArray(mysteryValue) ? mysteryValue : [mysteryValue];
6+
}
7+
8+
export function findInstance(a) {
9+
return b => a.hostName === b.hostName && a.port.$ === b.port.$;
10+
}
11+
12+
export function normalizeDelta(appDelta) {
13+
return arrayOrObj(appDelta).map((app) => {
14+
app.instance = arrayOrObj(app.instance);
15+
return app;
16+
});
17+
}

0 commit comments

Comments
 (0)