Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mongo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.redhat.lightblue.mongo</groupId>
<artifactId>lightblue-mongo-pom</artifactId>
<version>1.28.0-SNAPSHOT</version>
<version>1.29.0-SNAPSHOT</version>
</parent>
<groupId>com.redhat.lightblue.mongo</groupId>
<artifactId>lightblue-mongo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,14 @@ public void retryConcurrentUpdateErrorsIfNeeded(Map<Integer,Error> results) {
} else {
break;
}

if (nRetries == 0) {
// retried failureRetryCount and still not able to update, the error will reach the client
LOGGER.error("Retried docs.id in {} {} times, all times failed", failedDocs, cfg.getFailureRetryCount());
}
}


}


Expand Down Expand Up @@ -289,10 +296,16 @@ private List<Integer> retryFailedDocs(List<Integer> failedDocs,Map<Integer,Error
nestedBwo.find(replaceQuery).replaceOne(newDoc);
try {
if(nestedBwo.execute().getMatchedCount()==1) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Successfully retried to update a doc: replaceQuery={} newDoc={}", replaceQuery, newDoc);
}
// Successful update
results.remove(index);
}
} catch(Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed retrying to update a doc: replaceQuery={} newDoc={} error={}", replaceQuery, newDoc, e.toString());
}
newFailedDocs.add(index);
}
} else {
Expand All @@ -301,6 +314,9 @@ private List<Integer> retryFailedDocs(List<Integer> failedDocs,Map<Integer,Error
}
} else {
// Doc no longer exists
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Removing doc id={} from retry queue, because it does not exist or match anymore", index);
}
results.remove(index);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package com.redhat.lightblue.mongo.crud;

import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,23 +48,49 @@ public class MongoSequenceGenerator {
public static final String VALUE = "value";

private static final Logger LOGGER = LoggerFactory.getLogger(MongoSequenceGenerator.class);
private static final ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();

private final DBCollection coll;

// a set of sequances collections which were already initialized
private static Set<String> initializedCollections = new CopyOnWriteArraySet<>();
static Map<String,SequenceInfo> sequenceInfo = new HashMap<>();
static CopyOnWriteArraySet<String> initializedCollections = new CopyOnWriteArraySet<>();

static class SequenceInfo {
final String name;
final ReentrantLock lock=new ReentrantLock();

long poolSize;
long nextIdInPool;
long inc;

SequenceInfo(String name) {
this.name=name;
}

Long nextId() {
if(poolSize>0) {
poolSize--;
long ret=nextIdInPool;
nextIdInPool+=inc;
return Long.valueOf(ret);
} else {
return null;
}
}
}

public MongoSequenceGenerator(DBCollection coll) {
this.coll = coll;

if (!initializedCollections.contains(coll.getFullName())) {
// Here, we also make sure we have the indexes setup properly
String name=coll.getFullName();
if(!initializedCollections.contains(name)) {
initIndex();
initializedCollections.add(coll.getFullName());
LOGGER.info("Initialized sequances collection {}", coll.getFullName());
initializedCollections.add(name);
LOGGER.info("Initialized sequances collection {}", name);
}
}

private void initIndex() {
// Make sure we have a unique index on name
BasicDBObject keys = new BasicDBObject(NAME, 1);
Expand All @@ -80,48 +109,92 @@ private void initIndex() {
* @param inc The increment, Could be negative or positive. If 0, it is
* assumed to be 1. Used only if the sequence does not exist prior to this
* call
* @param poolSize If the sequence already has a pool associated
* with it, this is ignored, and the next id is used from the
* pool. Otherwise, a new pool with this size is initialized for
* the sequence
*
* If the sequence already exists, the <code>init</code> and
* <code>inc</code> are ignored.
*
* @return The value of the sequence before the call
*/
public long getNextSequenceValue(String name, long init, long inc) {
public long getNextSequenceValue(String name, long init, long inc, long poolSize) {
LOGGER.debug("getNextSequenceValue({})", name);
// Read the sequence document
BasicDBObject q = new BasicDBObject(NAME, name);
DBObject doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
// Sequence document does not exist. Insert a new document using the init and inc
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc);
if (inc == 0) {
inc = 1;
// First check if there is already a pool of ids available
String fullName=coll.getFullName()+"."+name;
rwl.readLock().lock();
SequenceInfo si=sequenceInfo.get(fullName);
rwl.readLock().unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that I understand the lock/unlock around Hash.get. What does that achieve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readers of the hashmap use readLock, writers use writeLock. So, many can read, only one can write. Readers will block when someone is writing, but readers won't block each other. Better than synchronized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

if(si==null) {
rwl.writeLock().lock();
si=sequenceInfo.get(fullName);
if(si==null) {
si=new SequenceInfo(fullName);
sequenceInfo.put(fullName,si);
}
rwl.writeLock().unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks more or less like a lazy instantiation. Would a synchronised method that returns a ReentrantReadWriteLock help clear some of this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is lazy initialization of a pool for the first time in the hashmap. To write into the hashmap, we need a writeLock. I don't understand your comment about the synchronized method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard then, based on other conversation thread.

}

BasicDBObject u = new BasicDBObject().
si.lock.lock();

long ret=0;

try {
// If there are ids in the pool, use one
Long next=si.nextId();
if(next!=null) {
return next;
}
// No ids in the pool

// Read the sequence document
BasicDBObject q = new BasicDBObject(NAME, name);
DBObject doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
// Sequence document does not exist. Insert a new document using the init and inc
LOGGER.debug("inserting sequence record name={}, init={}, inc={}", name, init, inc);
if (inc == 0) {
inc = 1;
}

BasicDBObject u = new BasicDBObject().
append(NAME, name).
append(INIT, init).
append(INC, inc).
append(VALUE, init);
try {
coll.insert(u, WriteConcern.ACKNOWLEDGED);
} catch (Exception e) {
// Someone else might have inserted already, try to re-read
LOGGER.debug("Insertion failed with {}, trying to read", e);
try {
coll.insert(u, WriteConcern.ACKNOWLEDGED);
} catch (Exception e) {
// Someone else might have inserted already, try to re-read
LOGGER.debug("Insertion failed with {}, trying to read", e);
}
doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
throw new RuntimeException("Cannot generate value for " + name);
}
}
doc = coll.findOne(q,null,ReadPreference.primary());
if (doc == null) {
throw new RuntimeException("Cannot generate value for " + name);
LOGGER.debug("Sequence doc={}", doc);
Long increment = (Long) doc.get(INC);

if(poolSize>1) {
si.inc=increment;
increment*=poolSize;
}
}
LOGGER.debug("Sequence doc={}", doc);
Long increment = (Long) doc.get(INC);
BasicDBObject u = new BasicDBObject().
BasicDBObject u = new BasicDBObject().
append("$inc", new BasicDBObject(VALUE, increment));
// This call returns the unmodified document
doc = coll.findAndModify(q, u);
Long l = (Long) doc.get(VALUE);
LOGGER.debug("{} -> {}", name, l);
return l;
// This call returns the unmodified document
doc = coll.findAndModify(q, u);
ret = (Long) doc.get(VALUE);
// Here, ret is the next id to return
if(poolSize>1) {
si.poolSize=poolSize-1;
si.nextIdInPool=ret+si.inc;
}
LOGGER.debug("{} -> {}", name, ret);
} finally {
si.lock.unlock();
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class MongoSequenceSupport implements ValueGeneratorSupport {
public static final String PROP_COLLECTION = "collection";
public static final String PROP_INITIAL_VALUE = "initialValue";
public static final String PROP_INCREMENT = "increment";
public static final String PROP_POOLSIZE="poolSize";

private static final ValueGenerator.ValueGeneratorType[] TYPES = {ValueGenerator.ValueGeneratorType.IntSequence};

Expand All @@ -93,23 +94,34 @@ public Object generateValue(EntityMetadata md, ValueGenerator generator) {
if (collection == null) {
collection = DEFAULT_COLLECTION_NAME;
}
String initialValueStr = p.getProperty(PROP_INITIAL_VALUE);
Object initialValueStr = p.get(PROP_INITIAL_VALUE);
long initialValue;
if (initialValueStr == null) {
initialValue = 1;
} else {
initialValue = Long.valueOf(initialValueStr).longValue();
initialValue = Long.valueOf(initialValueStr.toString()).longValue();
}
String incrementStr = p.getProperty(PROP_INCREMENT);
Object incrementStr = p.get(PROP_INCREMENT);
long increment;
if (incrementStr == null) {
increment = 1;
} else {
increment = Long.valueOf(incrementStr).longValue();
increment = Long.valueOf(incrementStr.toString()).longValue();
}
Object poolSizeStr=p.get(PROP_POOLSIZE);
long poolSize;
if(poolSizeStr==null) {
poolSize=0;
} else {
poolSize=Long.valueOf(poolSizeStr.toString()).longValue();
// A poolsize=1 is meaningless, don't pool IDs for poolsize=1
if(poolSize<=1)
poolSize=0;
}

DB db = controller.getDbResolver().get((MongoDataStore) md.getDataStore());
DBCollection coll = db.getCollection(collection);
MongoSequenceGenerator gen = new MongoSequenceGenerator(coll);
return gen.getNextSequenceValue(name, initialValue, increment);
return gen.getNextSequenceValue(name, initialValue, increment, poolSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import org.junit.Ignore;

import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;

import com.redhat.lightblue.metadata.EntityMetadata;
import com.redhat.lightblue.metadata.ValueGenerator;
import com.redhat.lightblue.metadata.SimpleField;
import com.redhat.lightblue.util.Path;
import com.redhat.lightblue.mongo.common.DBResolver;
import com.redhat.lightblue.mongo.common.MongoDataStore;
import com.redhat.lightblue.mongo.config.MongoConfiguration;
Expand Down Expand Up @@ -58,6 +63,7 @@ public MongoConfiguration getConfiguration(MongoDataStore store) {
return null;
}
});
factory.registerValueGenerators("mongo",controller);
}

@Test
Expand All @@ -72,4 +78,20 @@ public void testSeq() throws Exception {
value = ss.generateValue(md, vg);
Assert.assertEquals("2", value.toString());
}

@Test
public void poolTest() throws Exception {
EntityMetadata md=getMd("./testMetadata-seq.json");
ValueGeneratorSupport ss = controller.getExtensionInstance(ValueGeneratorSupport.class);
ValueGenerator vg=((SimpleField)md.resolve(new Path("_id"))).getValueGenerator();
Object value = ss.generateValue(md, vg);

DBObject q=new BasicDBObject("name","testSequence");

DBCollection seqCollection=db.getCollection("sequences");
DBObject obj=seqCollection.findOne(q);
Assert.assertEquals(150l, ((Long)obj.get("value")).longValue());


}
}
Loading