Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The project dependencies are:
<dependency>
<groupId>io.appform.dropwizard.sharding</groupId>
<artifactId>db-sharding-bundle</artifactId>
<version>2.0.23-2</version>
<version>2.0.23-3</version>
</dependency>
```
# NOTE
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.appform.dropwizard.sharding</groupId>
<artifactId>db-sharding-bundle</artifactId>
<version>2.0.23-2</version>
<version>2.0.23-3</version>
<name>Dropwizard Database Sharding Bundle</name>
<url>https://github.com/santanusinha/dropwizard-db-sharding-bundle</url>
<description>Application layer database sharding over SQL dbs</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.appform.dropwizard.sharding;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -26,7 +27,11 @@
import io.appform.dropwizard.sharding.caching.LookupCache;
import io.appform.dropwizard.sharding.caching.RelationalCache;
import io.appform.dropwizard.sharding.config.ShardedHibernateFactory;
import io.appform.dropwizard.sharding.dao.*;
import io.appform.dropwizard.sharding.dao.CacheableLookupDao;
import io.appform.dropwizard.sharding.dao.CacheableRelationalDao;
import io.appform.dropwizard.sharding.dao.LookupDao;
import io.appform.dropwizard.sharding.dao.RelationalDao;
import io.appform.dropwizard.sharding.dao.WrapperDao;
import io.appform.dropwizard.sharding.healthcheck.HealthCheckManager;
import io.appform.dropwizard.sharding.sharding.BucketIdExtractor;
import io.appform.dropwizard.sharding.sharding.InMemoryLocalShardBlacklistingStore;
Expand Down Expand Up @@ -81,6 +86,8 @@ abstract class DBShardingBundleBase<T extends Configuration> implements Configur

private HealthCheckManager healthCheckManager;

private MetricRegistry metricRegistry;

protected DBShardingBundleBase(
String dbNamespace,
Class<?> entity,
Expand Down Expand Up @@ -134,6 +141,7 @@ public PooledDataSourceFactory getDataSourceFactory(T t) {

@Override
public void run(T configuration, Environment environment) {
metricRegistry = environment.metrics();
sessionFactories = shardBundles.stream().map(HibernateBundle::getSessionFactory).collect(Collectors.toList());
environment.admin().addTask(new BlacklistShardTask(shardManager));
environment.admin().addTask(new UnblacklistShardTask(shardManager));
Expand Down Expand Up @@ -178,42 +186,44 @@ protected ShardBlacklistingStore getBlacklistingStore() {

public <EntityType, T extends Configuration>
LookupDao<EntityType> createParentObjectDao(Class<EntityType> clazz) {
return new LookupDao<>(this.sessionFactories, clazz,
return new LookupDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, new ConsistentHashBucketIdExtractor<>(this.shardManager)));
}

public <EntityType, T extends Configuration>
CacheableLookupDao<EntityType> createParentObjectDao(Class<EntityType> clazz,
LookupCache<EntityType> cacheManager) {
return new CacheableLookupDao<>(this.sessionFactories, clazz,
return new CacheableLookupDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, new ConsistentHashBucketIdExtractor<>(this.shardManager)),
cacheManager);
}

public <EntityType, T extends Configuration>
LookupDao<EntityType> createParentObjectDao(Class<EntityType> clazz,
BucketIdExtractor<String> bucketIdExtractor) {
return new LookupDao<>(this.sessionFactories, clazz, new ShardCalculator<>(this.shardManager, bucketIdExtractor));
return new LookupDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, bucketIdExtractor));
}

public <EntityType, T extends Configuration>
CacheableLookupDao<EntityType> createParentObjectDao(Class<EntityType> clazz,
BucketIdExtractor<String> bucketIdExtractor,
LookupCache<EntityType> cacheManager) {
return new CacheableLookupDao<>(this.sessionFactories, clazz, new ShardCalculator<>(this.shardManager, bucketIdExtractor), cacheManager);
return new CacheableLookupDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, bucketIdExtractor), cacheManager);
}


public <EntityType, T extends Configuration>
RelationalDao<EntityType> createRelatedObjectDao(Class<EntityType> clazz) {
return new RelationalDao<>(this.sessionFactories, clazz,
return new RelationalDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, new ConsistentHashBucketIdExtractor<>(this.shardManager)));
}


public <EntityType, T extends Configuration>
CacheableRelationalDao<EntityType> createRelatedObjectDao(Class<EntityType> clazz, RelationalCache<EntityType> cacheManager) {
return new CacheableRelationalDao<>(this.sessionFactories,
return new CacheableRelationalDao<>(metricRegistry, shardInfoProvider, this.sessionFactories,
clazz,
new ShardCalculator<>(this.shardManager,
new ConsistentHashBucketIdExtractor<>(this.shardManager)),
Expand All @@ -224,14 +234,16 @@ CacheableRelationalDao<EntityType> createRelatedObjectDao(Class<EntityType> claz
public <EntityType, T extends Configuration>
RelationalDao<EntityType> createRelatedObjectDao(Class<EntityType> clazz,
BucketIdExtractor<String> bucketIdExtractor) {
return new RelationalDao<>(this.sessionFactories, clazz, new ShardCalculator<>(this.shardManager, bucketIdExtractor));
return new RelationalDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, bucketIdExtractor));
}

public <EntityType, T extends Configuration>
CacheableRelationalDao<EntityType> createRelatedObjectDao(Class<EntityType> clazz,
BucketIdExtractor<String> bucketIdExtractor,
RelationalCache<EntityType> cacheManager) {
return new CacheableRelationalDao<>(this.sessionFactories, clazz, new ShardCalculator<>(this.shardManager, bucketIdExtractor), cacheManager);
return new CacheableRelationalDao<>(metricRegistry, shardInfoProvider, this.sessionFactories, clazz,
new ShardCalculator<>(this.shardManager, bucketIdExtractor), cacheManager);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.appform.dropwizard.sharding.dao;

import com.codahale.metrics.MetricRegistry;
import io.appform.dropwizard.sharding.ShardInfoProvider;
import io.appform.dropwizard.sharding.caching.LookupCache;
import io.appform.dropwizard.sharding.exceptions.DaoFwdException;
import io.appform.dropwizard.sharding.sharding.LookupKey;
Expand All @@ -41,10 +43,12 @@ public class CacheableLookupDao<T> extends LookupDao<T> {

private LookupCache<T> cache;

public CacheableLookupDao(List<SessionFactory> sessionFactories,
public CacheableLookupDao(MetricRegistry metricRegistry,
ShardInfoProvider shardInfoProvider,
List<SessionFactory> sessionFactories,
Class<T> entityClass,
ShardCalculator<String> shardCalculator, LookupCache<T> cache) {
super(sessionFactories, entityClass, shardCalculator);
super(metricRegistry, shardInfoProvider, sessionFactories, entityClass, shardCalculator);
this.cache = cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.appform.dropwizard.sharding.dao;

import com.codahale.metrics.MetricRegistry;
import io.appform.dropwizard.sharding.ShardInfoProvider;
import io.appform.dropwizard.sharding.caching.RelationalCache;
import io.appform.dropwizard.sharding.utils.ShardCalculator;
import org.hibernate.SessionFactory;
Expand All @@ -32,10 +34,13 @@ public class CacheableRelationalDao<T> extends RelationalDao<T> {

private RelationalCache<T> cache;

public CacheableRelationalDao(List<SessionFactory> sessionFactories, Class<T> entityClass,
public CacheableRelationalDao(MetricRegistry metricRegistry,
ShardInfoProvider shardInfoProvider,
List<SessionFactory> sessionFactories,
Class<T> entityClass,
ShardCalculator<String> shardCalculator,
RelationalCache<T> cache) {
super(sessionFactories, entityClass, shardCalculator);
super(metricRegistry, shardInfoProvider, sessionFactories, entityClass, shardCalculator);
this.cache = cache;
}

Expand Down
59 changes: 44 additions & 15 deletions src/main/java/io/appform/dropwizard/sharding/dao/LookupDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package io.appform.dropwizard.sharding.dao;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.appform.dropwizard.sharding.ShardInfoProvider;
import io.appform.dropwizard.sharding.sharding.LookupKey;
import io.appform.dropwizard.sharding.sharding.ShardManager;
import io.appform.dropwizard.sharding.utils.ShardCalculator;
Expand All @@ -38,6 +40,7 @@
import org.hibernate.query.Query;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,6 +153,9 @@ public int update(final UpdateOperationMeta updateOperationMeta) {
@Getter
private final ShardCalculator<String> shardCalculator;
private final Field keyField;
private final MetricRegistry metricRegistry;
private final ShardInfoProvider shardInfoProvider;


/**
* Creates a new sharded DAO. The number of managed shards and bucketing is controlled by the {@link ShardManager}.
Expand All @@ -158,12 +164,16 @@ public int update(final UpdateOperationMeta updateOperationMeta) {
* @param shardCalculator calculator for shards
*/
public LookupDao(
MetricRegistry metricRegistry,
ShardInfoProvider shardInfoProvider,
List<SessionFactory> sessionFactories,
Class<T> entityClass,
ShardCalculator<String> shardCalculator) {
this.daos = sessionFactories.stream().map(LookupDaoPriv::new).collect(Collectors.toList());
this.entityClass = entityClass;
this.shardCalculator = shardCalculator;
this.metricRegistry = metricRegistry;
this.shardInfoProvider = shardInfoProvider;

Field fields[] = FieldUtils.getFieldsWithAnnotation(entityClass, LookupKey.class);
Preconditions.checkArgument(fields.length != 0, "At least one field needs to be sharding key");
Expand All @@ -189,7 +199,7 @@ public LookupDao(
* @throws Exception if backing dao throws
*/
public Optional<T> get(String key) throws Exception {
return Optional.ofNullable(get(key, t -> t));
return Optional.ofNullable(getImpl(key, t -> t));
}

/**
Expand All @@ -203,9 +213,7 @@ public Optional<T> get(String key) throws Exception {
* @throws Exception if backing dao throws
*/
public <U> U get(String key, Function<T, U> handler) throws Exception {
int shardId = shardCalculator.shardId(key);
LookupDaoPriv dao = daos.get(shardId);
return Transactions.execute(dao.sessionFactory, true, dao::get, key, handler);
return getImpl(key, handler);
}

/**
Expand All @@ -218,6 +226,13 @@ public boolean exists(String key) throws Exception {
return get(key).isPresent();
}

private <U> U getImpl(String key, Function<T, U> handler) throws Exception {
int shardId = shardCalculator.shardId(key);
LookupDaoPriv dao = daos.get(shardId);
return executeTracked(() -> Transactions.execute(dao.sessionFactory, true, dao::get, key, handler), shardId,
"get");
}

/**
* Saves an entity on proper shard based on hash of the value in the key field in the object.
* The updated entity is returned. If Cascade is specified, this can be used
Expand Down Expand Up @@ -246,25 +261,27 @@ public <U> U save(T entity, Function<T, U> handler) throws Exception {
int shardId = shardCalculator.shardId(key);
log.debug("Saving entity of type {} with key {} to shard {}", entityClass.getSimpleName(), key, shardId);
LookupDaoPriv dao = daos.get(shardId);
return Transactions.execute(dao.sessionFactory, false, dao::save, entity, handler);
return executeTracked(() -> Transactions.execute(dao.sessionFactory, false, dao::save, entity, handler),
shardId, "save");
}

public boolean updateInLock(String id, Function<Optional<T>, T> updater) {
int shardId = shardCalculator.shardId(id);
LookupDaoPriv dao = daos.get(shardId);
return updateImpl(id, dao::getLockedForWrite, updater, dao);
return executeTracked(() -> updateImpl(id, dao::getLockedForWrite, updater, dao), shardId, "updateInLock");
}

public boolean update(String id, Function<Optional<T>, T> updater) {
int shardId = shardCalculator.shardId(id);
LookupDaoPriv dao = daos.get(shardId);
return updateImpl(id, dao::get, updater, dao);
return executeTracked(() -> updateImpl(id, dao::get, updater, dao), shardId, "update");
}

public int updateUsingQuery(String id, UpdateOperationMeta updateOperationMeta) {
int shardId = shardCalculator.shardId(id);
LookupDaoPriv dao = daos.get(shardId);
return Transactions.execute(dao.sessionFactory, false, dao::update, updateOperationMeta);
return executeTracked(() -> Transactions.execute(dao.sessionFactory, false, dao::update, updateOperationMeta),
shardId, "updateUsingQuery");
}

private boolean updateImpl(String id, Function<String, T> getter, Function<Optional<T>, T> updater, LookupDaoPriv dao) {
Expand Down Expand Up @@ -307,13 +324,17 @@ public LockedContext<T> saveAndGetExecutor(T entity) {
* @return List of elements or empty if none match
*/
public List<T> scatterGather(DetachedCriteria criteria) {
return daos.stream().map(dao -> {
List<T> results = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it's scatter gather, need to pass shard id (i in this case) in the metric name.

for (int i = 0; i < daos.size(); i++) {
try {
return Transactions.execute(dao.sessionFactory, true, dao::select, criteria);
LookupDaoPriv dao = daos.get(i);
results.addAll(executeTracked(() -> Transactions.execute(dao.sessionFactory, true,
dao::select, criteria), i, "scatterGather"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}).flatMap(Collection::stream).collect(Collectors.toList());
}
return results;
}

/**
Expand Down Expand Up @@ -347,7 +368,8 @@ public List<T> get(List<String> keys) {
try {
DetachedCriteria criteria = DetachedCriteria.forClass(entityClass)
.add(Restrictions.in(keyField.getName(),lookupKeysGroupByShards.get(shardId)));
return Transactions.execute(daos.get(shardId).sessionFactory, true, daos.get(shardId)::select, criteria);
return executeTracked(() -> Transactions.execute(daos.get(shardId).sessionFactory, true,
daos.get(shardId)::select, criteria), shardId, "get");
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -357,12 +379,13 @@ public List<T> get(List<String> keys) {
public <U> U runInSession(String id, Function<Session, U> handler) {
int shardId = shardCalculator.shardId(id);
LookupDaoPriv dao = daos.get(shardId);
return Transactions.execute(dao.sessionFactory, handler);
return executeTracked(() -> Transactions.execute(dao.sessionFactory, handler), shardId, "runInSession");
}

public boolean delete(String id) {
int shardId = shardCalculator.shardId(id);
return Transactions.execute(daos.get(shardId).sessionFactory, false, daos.get(shardId)::delete, id);
return executeTracked(() -> Transactions.execute(daos.get(shardId).sessionFactory, false,
daos.get(shardId)::delete, id), shardId, "delete");
}

protected Field getKeyField() {
Expand Down Expand Up @@ -554,4 +577,10 @@ private T generateEntity() {
return result;
}
}
}

private <R> R executeTracked(Supplier<R> t, int shardId, String function) {
final String metricName = this.getClass().getCanonicalName() + "." + shardInfoProvider
.shardName(shardId) + ".operation." + function;
return Transactions.executeTracked(metricRegistry, t, metricName);
}
}
Loading