Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package com.redhat.lightblue.mongo.crud;

import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,23 +48,49 @@ public class MongoSequenceGenerator {
public static final String VALUE = "value";

private static final Logger LOGGER = LoggerFactory.getLogger(MongoSequenceGenerator.class);
private static final ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();

private final DBCollection coll;

// a set of sequances collections which were already initialized
private static Set<String> initializedCollections = new CopyOnWriteArraySet<>();
static Map<String,SequenceInfo> sequenceInfo = new HashMap<>();
static CopyOnWriteArraySet<String> initializedCollections = new CopyOnWriteArraySet<>();

static class SequenceInfo {
final String name;
final ReentrantLock lock=new ReentrantLock();

long poolSize;
long nextIdInPool;
long inc;

SequenceInfo(String name) {
this.name=name;
}

Long nextId() {
if(poolSize>0) {
poolSize--;
long ret=nextIdInPool;
nextIdInPool+=inc;
return Long.valueOf(ret);
} else {
return null;
}
}
}

public MongoSequenceGenerator(DBCollection coll) {
this.coll = coll;

if (!initializedCollections.contains(coll.getFullName())) {
// Here, we also make sure we have the indexes setup properly
String name=coll.getFullName();
if(!initializedCollections.contains(name)) {
initIndex();
initializedCollections.add(coll.getFullName());
LOGGER.info("Initialized sequances collection {}", coll.getFullName());
initializedCollections.add(name);
LOGGER.info("Initialized sequances collection {}", name);
}
}

private void initIndex() {
// Make sure we have a unique index on name
BasicDBObject keys = new BasicDBObject(NAME, 1);
Expand All @@ -80,48 +109,96 @@ private void initIndex() {
* @param inc The increment, Could be negative or positive. If 0, it is
* assumed to be 1. Used only if the sequence does not exist prior to this
* call
* @param poolSize If the sequence already has a pool associated
* with it, this is ignored, and the next id is used from the
* pool. Otherwise, a new pool with this size is initialized for
* the sequence
*
* If the sequence already exists, the <code>init</code> and
* <code>inc</code> are ignored.
*
* @return The value of the sequence before the call
*/
public long getNextSequenceValue(String name, long init, long inc) {
public long getNextSequenceValue(String name, long init, long inc, long poolSize) {
LOGGER.debug("getNextSequenceValue({})", name);
// Read the sequence document
BasicDBObject q = new BasicDBObject(NAME, name);
DBObject doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
// Sequence document does not exist. Insert a new document using the init and inc
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc);
if (inc == 0) {
inc = 1;
// First check if there is already a pool of ids available
String fullName=coll.getFullName()+"."+name;
rwl.readLock().lock();
SequenceInfo si=sequenceInfo.get(fullName);
rwl.readLock().unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that I understand the lock/unlock around Hash.get. What does that achieve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readers of the hashmap use readLock, writers use writeLock. So, many can read, only one can write. Readers will block when someone is writing, but readers won't block each other. Better than synchronized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

if(si==null) {
rwl.writeLock().lock();
si=sequenceInfo.get(fullName);
if(si==null) {
si=new SequenceInfo(fullName);
sequenceInfo.put(fullName,si);
}
rwl.writeLock().unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks more or less like a lazy instantiation. Would a synchronised method that returns a ReentrantReadWriteLock help clear some of this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is lazy initialization of a pool for the first time in the hashmap. To write into the hashmap, we need a writeLock. I don't understand your comment about the synchronized method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard then, based on other conversation thread.

}

BasicDBObject u = new BasicDBObject().
si.lock.lock();

long ret=0;

try {
// If there are ids in the pool, use one
if(si!=null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it even possible for si to be null at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not, at this point, i'll clean that up

Long next=si.nextId();
if(next!=null) {
return next;
}
}
// No ids in the pool

// Read the sequence document
BasicDBObject q = new BasicDBObject(NAME, name);
DBObject doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
// Sequence document does not exist. Insert a new document using the init and inc
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc);
if (inc == 0) {
inc = 1;
}

BasicDBObject u = new BasicDBObject().
append(NAME, name).
append(INIT, init).
append(INC, inc).
append(VALUE, init);
try {
coll.insert(u, WriteConcern.ACKNOWLEDGED);
} catch (Exception e) {
// Someone else might have inserted already, try to re-read
LOGGER.debug("Insertion failed with {}, trying to read", e);
try {
coll.insert(u, WriteConcern.ACKNOWLEDGED);
} catch (Exception e) {
// Someone else might have inserted already, try to re-read
LOGGER.debug("Insertion failed with {}, trying to read", e);
}
doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
throw new RuntimeException("Cannot generate value for " + name);
}
}
doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
throw new RuntimeException("Cannot generate value for " + name);
LOGGER.debug("Sequence doc={}", doc);
Long increment = (Long) doc.get(INC);

if(poolSize>1&&si!=null) {
si.inc=increment;
increment*=poolSize;
}
}
LOGGER.debug("Sequence doc={}", doc);
Long increment = (Long) doc.get(INC);
BasicDBObject u = new BasicDBObject().
BasicDBObject u = new BasicDBObject().
append("$inc", new BasicDBObject(VALUE, increment));
// This call returns the unmodified document
doc = coll.findAndModify(q, u);
Long l = (Long) doc.get(VALUE);
LOGGER.debug("{} -> {}", name, l);
return l;
// This call returns the unmodified document
doc = coll.findAndModify(q, u);
ret = (Long) doc.get(VALUE);
// Here, ret is the next id to return
if(poolSize>1&&si!=null) {
si.poolSize=poolSize-1;
si.nextIdInPool=ret+si.inc;
}
LOGGER.debug("{} -> {}", name, ret);
} finally {
if(si!=null) {
si.lock.unlock();
}
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class MongoSequenceSupport implements ValueGeneratorSupport {
public static final String PROP_COLLECTION = "collection";
public static final String PROP_INITIAL_VALUE = "initialValue";
public static final String PROP_INCREMENT = "increment";
public static final String PROP_POOLSIZE="poolSize";

private static final ValueGenerator.ValueGeneratorType[] TYPES = {ValueGenerator.ValueGeneratorType.IntSequence};

Expand Down Expand Up @@ -107,9 +108,20 @@ public Object generateValue(EntityMetadata md, ValueGenerator generator) {
} else {
increment = Long.valueOf(incrementStr).longValue();
}
String poolSizeStr=p.getProperty(PROP_POOLSIZE);
long poolSize;
if(poolSizeStr==null) {
poolSize=0;
} else {
poolSize=Long.valueOf(poolSizeStr).longValue();
// A poolsize=1 is meaningless, don't pool IDs for poolsize=1
if(poolSize<=1)
poolSize=0;
}

DB db = controller.getDbResolver().get((MongoDataStore) md.getDataStore());
DBCollection coll = db.getCollection(collection);
MongoSequenceGenerator gen = new MongoSequenceGenerator(coll);
return gen.getNextSequenceValue(name, initialValue, increment);
return gen.getNextSequenceValue(name, initialValue, increment, poolSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,105 @@
package com.redhat.lightblue.mongo.crud;

import org.junit.Test;
import org.junit.Before;

import com.mongodb.DBObject;
import com.mongodb.BasicDBObject;

import com.redhat.lightblue.mongo.crud.MongoSequenceGenerator;

import org.junit.Assert;

public class MongoSequenceGeneratorTest extends AbstractMongoCrudTest {

@Before
public void init() {
coll.remove(new BasicDBObject());
MongoSequenceGenerator.sequenceInfo.clear();
}

@Test
public void zeroPoolTest() throws Exception {
MongoSequenceGenerator g = new MongoSequenceGenerator(coll);

Assert.assertEquals(1, g.getNextSequenceValue("s1", 1, 1,0));
validateId("s1",2);
Assert.assertEquals(100, g.getNextSequenceValue("s2", 100, 1,0));
validateId("s2",101);
Assert.assertEquals(-1000, g.getNextSequenceValue("s3", -1000, 10,0));
validateId("s3",-990);
Assert.assertEquals(2, g.getNextSequenceValue("s1", 123, 123,0));
validateId("s1",3);
Assert.assertEquals(3, g.getNextSequenceValue("s1", 213, 123,0));
validateId("s1",4);
Assert.assertEquals(101, g.getNextSequenceValue("s2", 1234, 123,0));
validateId("s2",102);
Assert.assertEquals(-990, g.getNextSequenceValue("s3", 123, 123,0));
validateId("s3",-980);
}

@Test
public void theTest() throws Exception {
public void onePoolTest() throws Exception {
MongoSequenceGenerator g = new MongoSequenceGenerator(coll);

Assert.assertEquals(1, g.getNextSequenceValue("s1", 1, 1));
Assert.assertEquals(100, g.getNextSequenceValue("s2", 100, 1));
Assert.assertEquals(-1000, g.getNextSequenceValue("s3", -1000, 10));
Assert.assertEquals(2, g.getNextSequenceValue("s1", 123, 123));
Assert.assertEquals(3, g.getNextSequenceValue("s1", 213, 123));
Assert.assertEquals(101, g.getNextSequenceValue("s2", 1234, 123));
Assert.assertEquals(-990, g.getNextSequenceValue("s3", 123, 123));
Assert.assertEquals(1, g.getNextSequenceValue("s1", 1, 1,1));
validateId("s1",2);
Assert.assertEquals(100, g.getNextSequenceValue("s2", 100, 1,1));
validateId("s2",101);
Assert.assertEquals(-1000, g.getNextSequenceValue("s3", -1000, 10,1));
validateId("s3",-990);
Assert.assertEquals(2, g.getNextSequenceValue("s1", 123, 123,1));
validateId("s1",3);
Assert.assertEquals(3, g.getNextSequenceValue("s1", 213, 123,1));
validateId("s1",4);
Assert.assertEquals(101, g.getNextSequenceValue("s2", 1234, 123,1));
validateId("s2",102);
Assert.assertEquals(-990, g.getNextSequenceValue("s3", 123, 123,1));
validateId("s3",-980);
}

@Test
public void bigPoolTest() throws Exception {
MongoSequenceGenerator g = new MongoSequenceGenerator(coll);

Assert.assertEquals(1, g.getNextSequenceValue("s1", 1, 1,2));
validateId("s1",3);
Assert.assertEquals(2, g.getNextSequenceValue("s1", 1, 1,2));
validateId("s1",3);
Assert.assertEquals(3, g.getNextSequenceValue("s1", 1, 1,2));
validateId("s1",5);
Assert.assertEquals(4, g.getNextSequenceValue("s1", 1, 1,2));
validateId("s1",5);


Assert.assertEquals(100, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",103);
Assert.assertEquals(101, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",103);
Assert.assertEquals(102, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",103);
Assert.assertEquals(103, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",106);
Assert.assertEquals(104, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",106);
Assert.assertEquals(105, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",106);
Assert.assertEquals(106, g.getNextSequenceValue("s2", 100, 1,3));
validateId("s2",109);

Assert.assertEquals(-1000, g.getNextSequenceValue("s3", -1000, 10,3));
validateId("s3",-970);
Assert.assertEquals(-990, g.getNextSequenceValue("s3", -1000, 10,3));
validateId("s3",-970);
Assert.assertEquals(-980, g.getNextSequenceValue("s3", -1000, 10,3));
validateId("s3",-970);
Assert.assertEquals(-970, g.getNextSequenceValue("s3", -1000, 10,3));
validateId("s3",-940);
}

private void validateId(String seq,long expected) throws Exception {
DBObject obj=coll.findOne(new BasicDBObject("name",seq));
Long l=(Long)obj.get("value");
Assert.assertEquals(expected, l.longValue());
}
}