Skip to content

Commit 26806fb

Browse files
committed
Add affinity support
1 parent 84dc7d1 commit 26806fb

File tree

9 files changed

+337
-47
lines changed

9 files changed

+337
-47
lines changed

ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Properties;
88
import java.util.function.BooleanSupplier;
99
import java.util.function.Consumer;
10+
import java.util.function.Supplier;
1011

1112
/**
1213
* Builder for DataSourcePool.
@@ -639,6 +640,26 @@ default DataSourceBuilder customProperties(Map<String, String> customProperties)
639640
*/
640641
DataSourceBuilder addProperty(String key, int value);
641642

643+
/**
644+
* sets the affinity-size (internal hashmap of distinct affinity keys). Should be a prime number. Default: 257
645+
*/
646+
DataSourceBuilder affinitySize(int affinitySize);
647+
648+
/**
649+
* Returns the affinity size.
650+
*/
651+
int getAffinitySize();
652+
653+
/**
654+
* Sets the affinity provider. e.g. Thread::currentThread.
655+
*/
656+
DataSourceBuilder affinityProvider(Supplier<Object> affinityProvider);
657+
658+
/**
659+
* Returns the affinity provider.
660+
*/
661+
Supplier<Object> getAffinityProvider();
662+
642663
/**
643664
* Set the database owner username (used to create connection for use with InitDatabase).
644665
*/

ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Map;
1010
import java.util.Properties;
1111
import java.util.function.Consumer;
12+
import java.util.function.Supplier;
1213

1314
/**
1415
* Configuration information for a DataSource.
@@ -84,6 +85,8 @@ public class DataSourceConfig implements DataSourceBuilder.Settings {
8485
private boolean shutdownOnJvmExit;
8586
private boolean validateOnHeartbeat = !System.getenv().containsKey("LAMBDA_TASK_ROOT");
8687
private boolean enforceCleanClose;
88+
private int affinitySize = 257;
89+
private Supplier<Object> affinityProvider;
8790

8891
@Override
8992
public Settings settings() {
@@ -146,6 +149,8 @@ public DataSourceConfig copy() {
146149
copy.alert = alert;
147150
copy.listener = listener;
148151
copy.enforceCleanClose = enforceCleanClose;
152+
copy.affinitySize = affinitySize;
153+
copy.affinityProvider = affinityProvider;
149154
return copy;
150155
}
151156

@@ -654,6 +659,28 @@ public DataSourceConfig addProperty(String key, int value) {
654659
return addProperty(key, Integer.toString(value));
655660
}
656661

662+
@Override
663+
public DataSourceBuilder affinitySize(int affinitySize) {
664+
this.affinitySize = affinitySize;
665+
return this;
666+
}
667+
668+
@Override
669+
public int getAffinitySize() {
670+
return affinitySize;
671+
}
672+
673+
@Override
674+
public DataSourceBuilder affinityProvider(Supplier<Object> affinityProvider) {
675+
this.affinityProvider = affinityProvider;
676+
return this;
677+
}
678+
679+
@Override
680+
public Supplier<Object> getAffinityProvider() {
681+
return affinityProvider;
682+
}
683+
657684
@Override
658685
public String getOwnerUsername() {
659686
return ownerUsername;
@@ -801,6 +828,7 @@ private void loadSettings(ConfigPropertiesHelper properties) {
801828
shutdownOnJvmExit = properties.getBoolean("shutdownOnJvmExit", shutdownOnJvmExit);
802829
validateOnHeartbeat = properties.getBoolean("validateOnHeartbeat", validateOnHeartbeat);
803830
enforceCleanClose = properties.getBoolean("enforceCleanClose", enforceCleanClose);
831+
affinitySize = properties.getInt("affinityCacheSize", affinitySize);
804832

805833

806834
String isoLevel = properties.get("isolationLevel", _isolationLevel(isolationLevel));
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.ebean.datasource;
2+
3+
import java.sql.Connection;
4+
5+
/**
6+
* Interface for connection objects returned from the ebean-datasource connection pool
7+
*
8+
* @author Roland Praml, Foconis Analytics GmbH
9+
*/
10+
public interface DataSourceConnection extends Connection {
11+
12+
/**
13+
* Returns the affinity-ID, this connection was assigned to.
14+
*/
15+
Object affinityId();
16+
17+
}

ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public interface DataSourcePool extends DataSource {
3636
* Connection connection = pool.getConnection();
3737
*
3838
* }</pre>
39-
*
4039
*/
4140
static DataSourceBuilder builder() {
4241
return new DataSourceConfig();
@@ -81,6 +80,12 @@ static DataSourceBuilder builder() {
8180
*/
8281
void offline();
8382

83+
/**
84+
* Returns a connection for given affinity ID. It is guaranteed, that connection.affinityId in listener etc.
85+
* is the same object.
86+
*/
87+
DataSourceConnection getConnection(Object affinityId) throws SQLException;
88+
8489
/**
8590
* Shutdown the pool.
8691
* <p>

ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java

Lines changed: 157 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,107 @@
1111
* PooledConnection, so that the node object can be reused. This avoids object
1212
* creation/gc during remove operations.
1313
* <p>
14+
* The connectionbuffer iself has one linkedList from <code>free</code> to
15+
* <code>freeEnd</code>. In parallel, the elements in this list can also be part
16+
* the affinityNodes list, which implement a kind of hashmap.
17+
* <p>
18+
* So you can prefer which connection should be taken. You can use CurrentThread or
19+
* currentTenant as affinity ID. So you likely get a connection that has the right
20+
* pstatement caches or is already in the CPU cache.
21+
* <p>
22+
* Without affinityId, the first free-connection is taken.
23+
* <p>
24+
* With affinityId, the affinityNodes-list is determined by the hashCode, then the
25+
* list is searched, if there is a connection with the same affinity object.
26+
* <p>
27+
* If there is no one found, we take the LAST connection in freeList, as this is
28+
* the best candidate not to steal the affinity of a connection, that was currently
29+
* used. This ensures (or also causes) that the pool has at least that size of the
30+
* frequent used affinityIds. E.g. if the affinity id represents tenant id, and
31+
* 15 tenants are active, the pool will not shrink below 15 - on the other hand,
32+
* there is always one connection ready for each active tenant.
33+
* <p>
34+
* A free node can be member in two lists:
35+
* <ol>
36+
* <li>it is definitively member in the freeList</li>
37+
* <li>it may be member in one of the affinity-lists (mod hash)</li>
38+
* </ol>
39+
* The remove / transition from free to busy will remove the node from both lists.
40+
* <p>
41+
* Graphical exammple
42+
* <pre>
43+
* By default, the busy list is empty
44+
* busy ---------------------------------------------------> busyEnd
45+
* free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> c7 --> freeEnd
46+
* al1 ---------------------------------------------------> end
47+
* al2 ---------------------------------------------------> end
48+
* ...
49+
* al257---------------------------------------------------> end
50+
*
51+
* if a popFree(1) is called, we lookup in al1 and found no usable node.
52+
* in this case, we take the last node, c7 and move it to the busy list
53+
*
54+
* busy --> c7 --------------------------------------------> busyEnd
55+
* free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 ---------> freeEnd
56+
*
57+
* When we put that node back in the freelist, it becomes the first node
58+
* and it will be also linked in affinity-list1
59+
*
60+
* busy ---------------------------------------------------> busyEnd
61+
* free , ,> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> freeEnd
62+
* al1-> '> c7 '
63+
* al2-> (empty)
64+
*
65+
* subsequent popFree(1) will always return c7 as long as it is not busy.
66+
* now we call popFree(1) twice, we will get this picture
67+
*
68+
* busy --> c6 --> c7 ----------------------------------------> busyEnd
69+
* free --> c1 --> c2 --> c3 --> c4 --> c5 -------------------> freeEnd
70+
* al1-> (empty)
71+
* al2-> (empty)
72+
*
73+
* putting them back
74+
*
75+
* busy ------------------------------------------------------> busyEnd
76+
* free , ,> c1 --> c2 --> c3 --> c4 --> c5 -----> freeEnd
77+
* al1-> '> c7 -- c6 '
78+
* al2-> (empty)
79+
*
80+
* fetching and return a connection with affinity = 2:
81+
*
82+
* busy ------------------------------------------------------> busyEnd
83+
* free , ,> c1 --> c2 --> c3 --> c4 -------> freeEnd
84+
* al1-> | '> c7 -- c6 '
85+
* al2-> '> c2 '
86+
*
87+
* so we have 2 connections for affinity 1 and one connection for affinity 2
88+
* (and the rest is ordered itself in the freeList)
89+
* </pre>
90+
* <p>
1491
* All thread safety controlled externally (by PooledConnectionQueue).
1592
* </p>
1693
*/
1794
final class ConnectionBuffer {
1895

1996
private final Node free = Node.init();
97+
private final Node freeEnd = free.next;
2098
private final Node busy = Node.init();
2199

100+
private final Node[] affinityNodes;
101+
private final int hashSize;
102+
103+
ConnectionBuffer(int hashSize) {
104+
this.hashSize = hashSize;
105+
if (hashSize > 0) {
106+
affinityNodes = new Node[hashSize];
107+
for (int i = 0; i < affinityNodes.length; i++) {
108+
affinityNodes[i] = Node.init();
109+
}
110+
} else {
111+
affinityNodes = null;
112+
}
113+
}
114+
22115
int freeSize = 0;
23116
int busySize = 0;
24117

@@ -76,17 +169,36 @@ boolean moveToFreeList(PooledConnection c) {
76169
}
77170
node.remove();
78171
busySize--;
79-
node.addAfter(free);
172+
Object affinityId = c.affinityId();
173+
if (affinityId != null) {
174+
node.addAfter(free, affinityNodes[affinityId.hashCode() % hashSize]);
175+
} else {
176+
node.addAfter(free);
177+
}
80178
freeSize++;
81179
c.setBusyNode(null);
82180
return true;
83181
}
84182

85183
/**
86184
* Remove a connection from the free list. Returns <code>null</code> if there is not any.
185+
* <p>
186+
* Connections that are returend from this method must be either added to busyList with
187+
* addBusy or closed fully.
87188
*/
88-
PooledConnection popFree() {
89-
Node node = free.next;
189+
PooledConnection popFree(Object affinityId) {
190+
Node node;
191+
if (affinityId == null || affinityNodes == null) {
192+
node = free.next;
193+
} else {
194+
node = affinityNodes[affinityId.hashCode() % hashSize].find(affinityId);
195+
if (node == null) {
196+
// when we did not find a node with that affinity, we take the last (oldest one)
197+
// and reuse this with the new affinity. This avoids to "steal" the affinity
198+
// from the newest one.
199+
node = freeEnd.prev;
200+
}
201+
}
90202
if (node.isBoundaryNode()) {
91203
return null;
92204
}
@@ -115,10 +227,10 @@ int addBusy(PooledConnection c) {
115227
*/
116228
void closeAllFree(boolean logErrors) {
117229
List<PooledConnection> tempList = new ArrayList<>();
118-
PooledConnection c = popFree();
230+
PooledConnection c = popFree(null);
119231
while (c != null) {
120232
tempList.add(c);
121-
c = popFree();
233+
c = popFree(null);
122234
}
123235

124236
if (Log.isLoggable(System.Logger.Level.TRACE)) {
@@ -212,20 +324,25 @@ static final class Node {
212324

213325
private Node next;
214326
private Node prev;
327+
// Double-LL nodes for affinity management
328+
private Node afNext;
329+
private Node afPrev;
215330
final PooledConnection pc;
216331

217332
private Node(PooledConnection pc) {
218333
this.pc = pc;
219334
}
220335

221336
/**
222-
* Creates new "list" with two empty boundary nodes
337+
* Creates new "list" with two empty edge nodes
223338
*/
224339
public static Node init() {
225340
Node node1 = new Node(null);
226341
Node node2 = new Node(null);
227342
node1.next = node2;
228343
node2.prev = node1;
344+
node1.afNext = node2;
345+
node2.afPrev = node1;
229346
return node1;
230347
}
231348

@@ -240,12 +357,18 @@ private boolean isBoundaryNode() {
240357
* Removes the node from the list. The node can be re-added to an other list
241358
*/
242359
private void remove() {
243-
assert pc != null : "called remove a boundary node";
360+
assert pc != null : "called remove on an edge node";
244361
assert prev != null && next != null : "not part of a list";
245362
next.prev = prev;
246363
prev.next = next;
247364
prev = null;
248365
next = null;
366+
if (afNext != null) {
367+
afNext.afPrev = afPrev;
368+
afPrev.afNext = afNext;
369+
afPrev = null;
370+
afNext = null;
371+
}
249372
}
250373

251374
/**
@@ -255,11 +378,37 @@ private void remove() {
255378
*/
256379
public void addAfter(Node node) {
257380
assert !this.isBoundaryNode() : "this is a boundary node";
258-
assert next == null & prev == null : "Node already member of a list";
381+
assert next == null && prev == null : "Node already member of a list";
259382
next = node.next;
260383
prev = node;
261384
node.next.prev = this;
262385
node.next = this;
263386
}
387+
388+
/**
389+
* Adds <code>this</code> after <code>node</code> AND as affinity-node after <code>afNode</code>.
390+
*/
391+
public void addAfter(Node node, Node afNode) {
392+
addAfter(node);
393+
assert afNext == null && afPrev == null : "Node already member of affinity-list";
394+
afNext = afNode.afNext;
395+
afPrev = afNode;
396+
afNode.afNext.afPrev = this;
397+
afNode.afNext = this;
398+
}
399+
400+
/**
401+
* Find the connection with given affinity id in this affinity-list.
402+
*/
403+
public Node find(Object affinityId) {
404+
Node n = this.afNext;
405+
while (!n.isBoundaryNode()) {
406+
if (affinityId.equals(n.pc.affinityId())) {
407+
return n;
408+
}
409+
n = n.afNext;
410+
}
411+
return null;
412+
}
264413
}
265414
}

0 commit comments

Comments
 (0)