-
Notifications
You must be signed in to change notification settings - Fork 14
Pooled sequence generator #367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
a1de652
89dcfb4
3b62e1a
683e379
c6835f7
dfd4ef3
5e53fae
af18861
f2d4660
29cb9d9
45da713
84961a5
8bb8f6d
6c7013a
05c6970
2dc0ddf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,11 @@ | |
*/ | ||
package com.redhat.lightblue.mongo.crud; | ||
|
||
import java.util.Set; | ||
import java.util.Map; | ||
import java.util.HashMap; | ||
import java.util.concurrent.CopyOnWriteArraySet; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -45,23 +48,49 @@ public class MongoSequenceGenerator { | |
public static final String VALUE = "value"; | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSequenceGenerator.class); | ||
private static final ReentrantReadWriteLock rwl=new ReentrantReadWriteLock(); | ||
|
||
private final DBCollection coll; | ||
|
||
// a set of sequances collections which were already initialized | ||
private static Set<String> initializedCollections = new CopyOnWriteArraySet<>(); | ||
static Map<String,SequenceInfo> sequenceInfo = new HashMap<>(); | ||
static CopyOnWriteArraySet<String> initializedCollections = new CopyOnWriteArraySet<>(); | ||
|
||
static class SequenceInfo { | ||
final String name; | ||
final ReentrantLock lock=new ReentrantLock(); | ||
|
||
long poolSize; | ||
long nextIdInPool; | ||
long inc; | ||
|
||
SequenceInfo(String name) { | ||
this.name=name; | ||
} | ||
|
||
Long nextId() { | ||
if(poolSize>0) { | ||
poolSize--; | ||
long ret=nextIdInPool; | ||
nextIdInPool+=inc; | ||
return Long.valueOf(ret); | ||
} else { | ||
return null; | ||
} | ||
} | ||
} | ||
|
||
public MongoSequenceGenerator(DBCollection coll) { | ||
this.coll = coll; | ||
|
||
if (!initializedCollections.contains(coll.getFullName())) { | ||
// Here, we also make sure we have the indexes setup properly | ||
String name=coll.getFullName(); | ||
if(!initializedCollections.contains(name)) { | ||
initIndex(); | ||
initializedCollections.add(coll.getFullName()); | ||
LOGGER.info("Initialized sequances collection {}", coll.getFullName()); | ||
initializedCollections.add(name); | ||
LOGGER.info("Initialized sequances collection {}", name); | ||
} | ||
} | ||
|
||
private void initIndex() { | ||
// Make sure we have a unique index on name | ||
BasicDBObject keys = new BasicDBObject(NAME, 1); | ||
|
@@ -80,48 +109,92 @@ private void initIndex() { | |
* @param inc The increment, Could be negative or positive. If 0, it is | ||
* assumed to be 1. Used only if the sequence does not exist prior to this | ||
* call | ||
* @param poolSize If the sequence already has a pool associated | ||
* with it, this is ignored, and the next id is used from the | ||
* pool. Otherwise, a new pool with this size is initialized for | ||
* the sequence | ||
* | ||
* If the sequence already exists, the <code>init</code> and | ||
* <code>inc</code> are ignored. | ||
* | ||
* @return The value of the sequence before the call | ||
*/ | ||
public long getNextSequenceValue(String name, long init, long inc) { | ||
public long getNextSequenceValue(String name, long init, long inc, long poolSize) { | ||
LOGGER.debug("getNextSequenceValue({})", name); | ||
// Read the sequence document | ||
BasicDBObject q = new BasicDBObject(NAME, name); | ||
DBObject doc = coll.findOne(q,null,ReadPreference.primary()); | ||
if (doc == null) { | ||
// Sequence document does not exist. Insert a new document using the init and inc | ||
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc); | ||
if (inc == 0) { | ||
inc = 1; | ||
// First check if there is already a pool of ids available | ||
String fullName=coll.getFullName()+"."+name; | ||
rwl.readLock().lock(); | ||
SequenceInfo si=sequenceInfo.get(fullName); | ||
rwl.readLock().unlock(); | ||
if(si==null) { | ||
rwl.writeLock().lock(); | ||
si=sequenceInfo.get(fullName); | ||
if(si==null) { | ||
si=new SequenceInfo(fullName); | ||
sequenceInfo.put(fullName,si); | ||
} | ||
rwl.writeLock().unlock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks more or less like a lazy instantiation. Would a synchronised method that returns a ReentrantReadWriteLock help clear some of this up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is lazy initialization of a pool for the first time in the hashmap. To write into the hashmap, we need a writeLock. I don't understand your comment about the synchronized method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Disregard then, based on other conversation thread. |
||
} | ||
|
||
BasicDBObject u = new BasicDBObject(). | ||
si.lock.lock(); | ||
|
||
long ret=0; | ||
|
||
try { | ||
// If there are ids in the pool, use one | ||
Long next=si.nextId(); | ||
if(next!=null) { | ||
return next; | ||
} | ||
// No ids in the pool | ||
|
||
// Read the sequence document | ||
BasicDBObject q = new BasicDBObject(NAME, name); | ||
DBObject doc = coll.findOne(q,null,ReadPreference.primary()); | ||
if (doc == null) { | ||
// Sequence document does not exist. Insert a new document using the init and inc | ||
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc); | ||
if (inc == 0) { | ||
inc = 1; | ||
} | ||
|
||
BasicDBObject u = new BasicDBObject(). | ||
append(NAME, name). | ||
append(INIT, init). | ||
append(INC, inc). | ||
append(VALUE, init); | ||
try { | ||
coll.insert(u, WriteConcern.ACKNOWLEDGED); | ||
} catch (Exception e) { | ||
// Someone else might have inserted already, try to re-read | ||
LOGGER.debug("Insertion failed with {}, trying to read", e); | ||
try { | ||
coll.insert(u, WriteConcern.ACKNOWLEDGED); | ||
} catch (Exception e) { | ||
// Someone else might have inserted already, try to re-read | ||
LOGGER.debug("Insertion failed with {}, trying to read", e); | ||
} | ||
doc = coll.findOne(q,null,ReadPreference.primary()); | ||
if (doc == null) { | ||
throw new RuntimeException("Cannot generate value for " + name); | ||
} | ||
} | ||
doc = coll.findOne(q,null,ReadPreference.primary()); | ||
if (doc == null) { | ||
throw new RuntimeException("Cannot generate value for " + name); | ||
LOGGER.debug("Sequence doc={}", doc); | ||
Long increment = (Long) doc.get(INC); | ||
|
||
if(poolSize>1) { | ||
si.inc=increment; | ||
increment*=poolSize; | ||
} | ||
} | ||
LOGGER.debug("Sequence doc={}", doc); | ||
Long increment = (Long) doc.get(INC); | ||
BasicDBObject u = new BasicDBObject(). | ||
BasicDBObject u = new BasicDBObject(). | ||
append("$inc", new BasicDBObject(VALUE, increment)); | ||
// This call returns the unmodified document | ||
doc = coll.findAndModify(q, u); | ||
Long l = (Long) doc.get(VALUE); | ||
LOGGER.debug("{} -> {}", name, l); | ||
return l; | ||
// This call returns the unmodified document | ||
doc = coll.findAndModify(q, u); | ||
ret = (Long) doc.get(VALUE); | ||
// Here, ret is the next id to return | ||
if(poolSize>1) { | ||
si.poolSize=poolSize-1; | ||
si.nextIdInPool=ret+si.inc; | ||
} | ||
LOGGER.debug("{} -> {}", name, ret); | ||
} finally { | ||
si.lock.unlock(); | ||
} | ||
return ret; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that I understand the lock/unlock around Hash.get. What does that achieve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readers of the hashmap use readLock, writers use writeLock. So, many can read, only one can write. Readers will block when someone is writing, but readers won't block each other. Better than synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.