Skip to content

Commit 138764b

Browse files
committed
Add support for multiple routing tables and new routing procedure
1 parent c33b8c1 commit 138764b

File tree

79 files changed

+855
-502
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+855
-502
lines changed

src/driver.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import Session from './session'
2121
import Pool from './internal/pool'
2222
import Connection from './internal/connection'
2323
import { newError, SERVICE_UNAVAILABLE } from './error'
24-
import { DirectConnectionProvider } from './internal/connection-providers'
24+
import DirectConnectionProvider from './internal/connection-provider-direct'
2525
import Bookmark from './internal/bookmark'
2626
import ConnectivityVerifier from './internal/connectivity-verifier'
2727
import PoolConfig, {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import ConnectionProvider from './connection-provider'
21+
22+
export default class DirectConnectionProvider extends ConnectionProvider {
23+
constructor (address, connectionPool, driverOnErrorCallback) {
24+
super()
25+
this._address = address
26+
this._connectionPool = connectionPool
27+
this._driverOnErrorCallback = driverOnErrorCallback
28+
}
29+
30+
acquireConnection (accessMode, database) {
31+
const connectionPromise = this._connectionPool.acquire(this._address)
32+
return this._withAdditionalOnErrorCallback(
33+
connectionPromise,
34+
this._driverOnErrorCallback
35+
)
36+
}
37+
}

src/internal/connection-providers.js renamed to src/internal/connection-provider-routing.js

Lines changed: 81 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,17 @@ import RoutingTable from './routing-table'
2424
import Rediscovery from './rediscovery'
2525
import RoutingUtil from './routing-util'
2626
import { HostNameResolver } from './node'
27+
import ConnectionProvider from './connection-provider'
28+
import SingleConnectionProvider from './connection-provider-single'
29+
import { VERSION_4_0_0 } from './server-version'
2730

2831
const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'
32+
const DATABASE_NOT_FOUND_ERROR_CODE =
33+
'Neo.ClientError.Database.DatabaseNotFound'
34+
const SYSTEM_DB_NAME = 'system'
35+
const DEFAULT_DB_NAME = ''
2936

30-
class ConnectionProvider {
31-
acquireConnection (accessMode, database) {
32-
throw new Error('Abstract function')
33-
}
34-
35-
_withAdditionalOnErrorCallback (connectionPromise, driverOnErrorCallback) {
36-
// install error handler from the driver on the connection promise; this callback is installed separately
37-
// so that it does not handle errors, instead it is just an additional error reporting facility.
38-
connectionPromise.catch(error => {
39-
driverOnErrorCallback(error)
40-
})
41-
// return the original connection promise
42-
return connectionPromise
43-
}
44-
}
45-
46-
export class DirectConnectionProvider extends ConnectionProvider {
47-
constructor (address, connectionPool, driverOnErrorCallback) {
48-
super()
49-
this._address = address
50-
this._connectionPool = connectionPool
51-
this._driverOnErrorCallback = driverOnErrorCallback
52-
}
53-
54-
acquireConnection (accessMode, database) {
55-
const connectionPromise = this._connectionPool.acquire(this._address)
56-
return this._withAdditionalOnErrorCallback(
57-
connectionPromise,
58-
this._driverOnErrorCallback
59-
)
60-
}
61-
}
62-
63-
export class LoadBalancer extends ConnectionProvider {
37+
export default class RoutingConnectionProvider extends ConnectionProvider {
6438
constructor (
6539
address,
6640
routingContext,
@@ -72,7 +46,7 @@ export class LoadBalancer extends ConnectionProvider {
7246
) {
7347
super()
7448
this._seedRouter = address
75-
this._routingTable = new RoutingTable()
49+
this._routingTables = {}
7650
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext))
7751
this._connectionPool = connectionPool
7852
this._driverOnErrorCallback = driverOnErrorCallback
@@ -84,60 +58,64 @@ export class LoadBalancer extends ConnectionProvider {
8458
}
8559

8660
acquireConnection (accessMode, database) {
87-
const connectionPromise = this._freshRoutingTable(accessMode).then(
88-
routingTable => {
89-
if (accessMode === READ) {
90-
const address = this._loadBalancingStrategy.selectReader(
91-
routingTable.readers
92-
)
93-
return this._acquireConnectionToServer(address, 'read')
94-
} else if (accessMode === WRITE) {
95-
const address = this._loadBalancingStrategy.selectWriter(
96-
routingTable.writers
97-
)
98-
return this._acquireConnectionToServer(address, 'write')
99-
} else {
100-
throw newError('Illegal mode ' + accessMode)
101-
}
61+
const connectionPromise = this._freshRoutingTable(
62+
accessMode,
63+
database || DEFAULT_DB_NAME
64+
).then(routingTable => {
65+
if (accessMode === READ) {
66+
const address = this._loadBalancingStrategy.selectReader(
67+
routingTable.readers
68+
)
69+
return this._acquireConnectionToServer(address, 'read', routingTable)
70+
} else if (accessMode === WRITE) {
71+
const address = this._loadBalancingStrategy.selectWriter(
72+
routingTable.writers
73+
)
74+
return this._acquireConnectionToServer(address, 'write', routingTable)
75+
} else {
76+
throw newError('Illegal mode ' + accessMode)
10277
}
103-
)
78+
})
10479
return this._withAdditionalOnErrorCallback(
10580
connectionPromise,
10681
this._driverOnErrorCallback
10782
)
10883
}
10984

11085
forget (address) {
111-
this._routingTable.forget(address)
86+
Object.values(this._routingTables).forEach(routingTable =>
87+
routingTable.forget(address)
88+
)
11289
this._connectionPool.purge(address)
11390
}
11491

11592
forgetWriter (address) {
116-
this._routingTable.forgetWriter(address)
93+
Object.values(this._routingTables).forEach(routingTable =>
94+
routingTable.forgetWriter(address)
95+
)
11796
}
11897

119-
_acquireConnectionToServer (address, serverName) {
98+
_acquireConnectionToServer (address, serverName, routingTable) {
12099
if (!address) {
121100
return Promise.reject(
122101
newError(
123-
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${
124-
this._routingTable
125-
}`,
102+
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${routingTable}`,
126103
SESSION_EXPIRED
127104
)
128105
)
129106
}
130107
return this._connectionPool.acquire(address)
131108
}
132109

133-
_freshRoutingTable (accessMode) {
134-
const currentRoutingTable = this._routingTable
110+
_freshRoutingTable (accessMode, database) {
111+
const currentRoutingTable =
112+
this._routingTables[database] || new RoutingTable({ database })
135113

136114
if (!currentRoutingTable.isStaleFor(accessMode)) {
137115
return Promise.resolve(currentRoutingTable)
138116
}
139117
this._log.info(
140-
`Routing table is stale for ${accessMode}: ${currentRoutingTable}`
118+
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
141119
)
142120
return this._refreshRoutingTable(currentRoutingTable)
143121
}
@@ -163,7 +141,11 @@ export class LoadBalancer extends ConnectionProvider {
163141
) {
164142
// we start with seed router, no routers were probed before
165143
const seenRouters = []
166-
return this._fetchRoutingTableUsingSeedRouter(seenRouters, this._seedRouter)
144+
return this._fetchRoutingTableUsingSeedRouter(
145+
seenRouters,
146+
this._seedRouter,
147+
currentRoutingTable
148+
)
167149
.then(newRoutingTable => {
168150
if (newRoutingTable) {
169151
this._useSeedRouter = false
@@ -177,7 +159,7 @@ export class LoadBalancer extends ConnectionProvider {
177159
)
178160
})
179161
.then(newRoutingTable => {
180-
this._applyRoutingTableIfPossible(newRoutingTable)
162+
this._applyRoutingTableIfPossible(currentRoutingTable, newRoutingTable)
181163
return newRoutingTable
182164
})
183165
}
@@ -198,11 +180,12 @@ export class LoadBalancer extends ConnectionProvider {
198180
// none of the known routers returned a valid routing table - try to use seed router address for rediscovery
199181
return this._fetchRoutingTableUsingSeedRouter(
200182
knownRouters,
201-
this._seedRouter
183+
this._seedRouter,
184+
currentRoutingTable
202185
)
203186
})
204187
.then(newRoutingTable => {
205-
this._applyRoutingTableIfPossible(newRoutingTable)
188+
this._applyRoutingTableIfPossible(currentRoutingTable, newRoutingTable)
206189
return newRoutingTable
207190
})
208191
}
@@ -218,7 +201,7 @@ export class LoadBalancer extends ConnectionProvider {
218201
// returned routing table was undefined, this means a connection error happened and the last known
219202
// router did not return a valid routing table, so we need to forget it
220203
const lastRouterIndex = knownRouters.length - 1
221-
LoadBalancer._forgetRouter(
204+
RoutingConnectionProvider._forgetRouter(
222205
currentRoutingTable,
223206
knownRouters,
224207
lastRouterIndex
@@ -229,14 +212,14 @@ export class LoadBalancer extends ConnectionProvider {
229212
)
230213
}
231214

232-
_fetchRoutingTableUsingSeedRouter (seenRouters, seedRouter) {
233-
const resolvedAddresses = this._hostNameResolver.resolve(seedRouter)
215+
_fetchRoutingTableUsingSeedRouter (seenRouters, seedRouter, routingTable) {
216+
const resolvedAddresses = this._resolveSeedRouter(seedRouter)
234217
return resolvedAddresses.then(resolvedRouterAddresses => {
235218
// filter out all addresses that we've already tried
236219
const newAddresses = resolvedRouterAddresses.filter(
237220
address => seenRouters.indexOf(address) < 0
238221
)
239-
return this._fetchRoutingTable(newAddresses, null)
222+
return this._fetchRoutingTable(newAddresses, routingTable)
240223
})
241224
}
242225

@@ -265,7 +248,7 @@ export class LoadBalancer extends ConnectionProvider {
265248
// returned routing table was undefined, this means a connection error happened and we need to forget the
266249
// previous router and try the next one
267250
const previousRouterIndex = currentIndex - 1
268-
LoadBalancer._forgetRouter(
251+
RoutingConnectionProvider._forgetRouter(
269252
routingTable,
270253
routerAddresses,
271254
previousRouterIndex
@@ -277,8 +260,16 @@ export class LoadBalancer extends ConnectionProvider {
277260
session => {
278261
if (session) {
279262
return this._rediscovery
280-
.lookupRoutingTableOnRouter(session, currentRouter)
263+
.lookupRoutingTableOnRouter(
264+
session,
265+
routingTable.database,
266+
currentRouter
267+
)
281268
.catch(error => {
269+
if (error && error.code === DATABASE_NOT_FOUND_ERROR_CODE) {
270+
// not finding the target database is a sign of a configuration issue
271+
throw error
272+
}
282273
this._log.warn(
283274
`unable to fetch routing table because of an error ${error}`
284275
)
@@ -302,25 +293,37 @@ export class LoadBalancer extends ConnectionProvider {
302293
.acquire(routerAddress)
303294
.then(connection => {
304295
const connectionProvider = new SingleConnectionProvider(connection)
305-
return new Session({ mode: READ, connectionProvider })
296+
297+
if (connection.version().compareTo(VERSION_4_0_0) < 0) {
298+
return new Session({ mode: READ, connectionProvider })
299+
}
300+
301+
return new Session({
302+
mode: READ,
303+
database: SYSTEM_DB_NAME,
304+
connectionProvider
305+
})
306306
})
307307
.catch(error => {
308308
// unable to acquire connection towards the given router
309-
if (error && error.code === UNAUTHORIZED_ERROR_CODE) {
310-
// auth error is a sign of a configuration issue, rediscovery should not proceed
309+
if (
310+
error &&
311+
(error.code === UNAUTHORIZED_ERROR_CODE ||
312+
error.code === DATABASE_NOT_FOUND_ERROR_CODE)
313+
) {
314+
// auth error and not finding system database is a sign of a configuration issue
315+
// discovery should not proceed
311316
throw error
312317
}
313318
return null
314319
})
315320
}
316321

317-
_applyRoutingTableIfPossible (newRoutingTable) {
322+
_applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable) {
318323
if (!newRoutingTable) {
319324
// none of routing servers returned valid routing table, throw exception
320325
throw newError(
321-
`Could not perform discovery. No routing servers available. Known routing table: ${
322-
this._routingTable
323-
}`,
326+
`Could not perform discovery. No routing servers available. Known routing table: ${currentRoutingTable}`,
324327
SERVICE_UNAVAILABLE
325328
)
326329
}
@@ -339,7 +342,7 @@ export class LoadBalancer extends ConnectionProvider {
339342
this._connectionPool.keepAll(newRoutingTable.allServers())
340343

341344
// make this driver instance aware of the new table
342-
this._routingTable = newRoutingTable
345+
this._routingTables[newRoutingTable.database] = newRoutingTable
343346
this._log.info(`Updated routing table ${newRoutingTable}`)
344347
}
345348

@@ -350,16 +353,3 @@ export class LoadBalancer extends ConnectionProvider {
350353
}
351354
}
352355
}
353-
354-
export class SingleConnectionProvider extends ConnectionProvider {
355-
constructor (connection) {
356-
super()
357-
this._connection = connection
358-
}
359-
360-
acquireConnection (mode, database) {
361-
const connection = this._connection
362-
this._connection = null
363-
return Promise.resolve(connection)
364-
}
365-
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import ConnectionProvider from './connection-provider'
21+
22+
export default class SingleConnectionProvider extends ConnectionProvider {
23+
constructor (connection) {
24+
super()
25+
this._connection = connection
26+
}
27+
28+
acquireConnection (mode, database) {
29+
const connection = this._connection
30+
this._connection = null
31+
return Promise.resolve(connection)
32+
}
33+
}

0 commit comments

Comments
 (0)