diff --git a/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java b/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java index c280ec67a..6d8e6434d 100644 --- a/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java +++ b/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java @@ -101,7 +101,6 @@ protected EnhanceEntity findById(Object primaryKey, EntityMetadata m, Client cli * * @param entity * @param relationsMap - * @param client * @param m * @param pd * @param relationStack @@ -193,13 +192,9 @@ private void onRelation(final Object entity, final Map relations * * @param entity * relation owning entity. - * @param entityId - * entity id of relation owning entity. * @param relationsMap * contains relation name and it's value. * @param relationStack - * @param m - * entity metadata. */ private void onRelation(Object entity, Map relationsMap, final Relation relation, final EntityMetadata metadata, final PersistenceDelegator pd, boolean lazilyloaded, @@ -510,7 +505,6 @@ else if (!relation.isUnary()) * * @param entity * @param relationsMap - * @param client * @param m * @param pd * @return diff --git a/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/EntityReader.java b/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/EntityReader.java index af1e84b93..adbf1e586 100644 --- a/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/EntityReader.java +++ b/src/jpa-engine/core/src/main/java/com/impetus/kundera/persistence/EntityReader.java @@ -39,10 +39,6 @@ public interface EntityReader * * @param m * entity meta data - * @param relationNames - * relation names - * @param isParent - * if entity is not holding any relation. * @param client * client instance * @return list of wrapped enhance entities. @@ -52,20 +48,9 @@ public interface EntityReader /** * Returns populated entity along with all relational value. * - * @param e - * enhance entity - * @param graphs - * entity graph - * @param collectionHolder - * collection holder. - * @param client - * client * @param m * entity meta data * @param relationStack - * @param persistenceDelegeator - * persistence delegator. - * @param lazily loaded. true if invoked over lazily fetched object. * @return populate entity. * @throws Exception * the exception @@ -81,8 +66,6 @@ Object recursivelyFindEntities(Object entity, Map relationsMap, * the primary key * @param m * the m - * @param relationNames - * the relation names * @param client * the client * @return the enhance entity diff --git a/src/kundera-hbase/kundera-hbase/pom.xml b/src/kundera-hbase/kundera-hbase/pom.xml index ecc0116a4..40555920e 100644 --- a/src/kundera-hbase/kundera-hbase/pom.xml +++ b/src/kundera-hbase/kundera-hbase/pom.xml @@ -14,16 +14,12 @@ http://maven.apache.org - 2.2.0 - 0.96.1.1-hadoop2 + 2.5.1 + 1.1.2 0.9.0 - - - - org.apache.hbase hbase-client @@ -36,6 +32,20 @@ + org.apache.hbase + hbase-hadoop-compat + ${hbase.version} + test + test-jar + + + org.apache.hbase + hbase-hadoop2-compat + ${hbase.version} + test + test-jar + + org.apache.hbase hbase-server ${hbase.version} @@ -81,8 +91,11 @@ hadoop-common ${hadoop.version} - - + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/AdminRequest.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/AdminRequest.java new file mode 100644 index 000000000..fc454d8b5 --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/AdminRequest.java @@ -0,0 +1,16 @@ +package com.impetus.client.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; + +public abstract class AdminRequest implements HBaseRequest { + public Void execute(Connection connection) throws IOException { + try (Admin admin = connection.getAdmin()) { + execute(admin); + return null; + } + } + protected abstract void execute(Admin admin) throws IOException; +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/BatchPutRequest.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/BatchPutRequest.java new file mode 100644 index 000000000..19fd6ee42 --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/BatchPutRequest.java @@ -0,0 +1,22 @@ +package com.impetus.client.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; + +public class BatchPutRequest extends TableRequest { + private final List puts; + + public BatchPutRequest(TableName tableName, List puts) { + super(tableName); + this.puts = puts; + } + + protected Void execute(Table table) throws IOException { + table.put(puts); + return null; + } +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java index eeefdf843..907234df8 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java @@ -31,8 +31,9 @@ import javax.persistence.metamodel.EntityType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -86,6 +87,8 @@ public class HBaseClient extends ClientBase implements Client, Batch /** The handler. */ DataHandler handler; + private RequestExecutor executor; + /** The reader. */ private EntityReader reader; @@ -102,8 +105,6 @@ public class HBaseClient extends ClientBase implements Client, Batch * the index manager * @param conf * the conf - * @param hTablePool - * the h table pool * @param reader * the reader * @param persistenceUnit @@ -115,15 +116,16 @@ public class HBaseClient extends ClientBase implements Client, Batch * @param kunderaMetadata * the kundera metadata */ - public HBaseClient(IndexManager indexManager, Configuration conf, HTablePool hTablePool, EntityReader reader, - + public HBaseClient(IndexManager indexManager, Configuration conf, + RequestExecutor requestExecutor, EntityReader reader, String persistenceUnit, Map externalProperties, ClientMetadata clientMetadata, final KunderaMetadata kunderaMetadata) { super(kunderaMetadata, externalProperties, persistenceUnit); this.indexManager = indexManager; - this.handler = new HBaseDataHandler(kunderaMetadata, conf, hTablePool); + this.executor = requestExecutor; + this.handler = new HBaseDataHandler(kunderaMetadata, conf, executor); this.reader = reader; this.clientMetadata = clientMetadata; getBatchSize(persistenceUnit, this.externalProperties); @@ -159,9 +161,7 @@ public Object find(Class entityClass, Object rowId) { rowId = KunderaCoreUtils.prepareCompositeKey(entityMetadata, rowId); } - - results = fetchEntity(entityClass, rowId, entityMetadata, relationNames, tableName, results, null, null); - + results = fetchEntity(entityClass, rowId, entityMetadata, relationNames, tableName, null, null); if (results != null && !results.isEmpty()) { enhancedEntity = results.get(0); @@ -182,9 +182,9 @@ public Object find(Class entityClass, Object rowId) * java.lang.Object[]) */ @Override - public List findAll(Class entityClass, String[] columnsToSelect, Object... rowIds) + public List findAll(Class entityClass, String[] columnsToSelect, final Object... rowIds) { - EntityMetadata entityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClass); + final EntityMetadata entityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClass); if (rowIds == null) { return null; @@ -203,10 +203,15 @@ public List findAll(Class entityClass, String[] columnsToSelect, Objec { for (AbstractManagedType subEntity : subManagedType) { - EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, + final EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, subEntity.getJavaType()); - results = handler.readAll(subEntityMetadata.getSchema(), subEntityMetadata.getEntityClazz(), - subEntityMetadata, Arrays.asList(rowIds), subEntityMetadata.getRelationNames()); + results = executor.execute(new TableRequest(subEntityMetadata.getSchema()) { + protected List execute(Table table) throws IOException { + return handler.readAll(table, subEntityMetadata.getEntityClazz(), + subEntityMetadata, Arrays.asList(rowIds), + subEntityMetadata.getRelationNames()); + } + }); // Result will not be empty for match sub entity. if (!results.isEmpty()) { @@ -216,9 +221,13 @@ public List findAll(Class entityClass, String[] columnsToSelect, Objec } else { - - results = handler.readAll(entityMetadata.getSchema(), entityMetadata.getEntityClazz(), entityMetadata, - Arrays.asList(rowIds), entityMetadata.getRelationNames()); + results = executor.execute(new TableRequest(entityMetadata.getSchema()) { + protected List execute(Table table) throws IOException { + return handler.readAll(table, entityMetadata.getEntityClazz(), + entityMetadata, Arrays.asList(rowIds), + entityMetadata.getRelationNames()); + } + }); } } catch (IOException ioex) @@ -259,9 +268,8 @@ public List find(Class entityClass, Map col) E e = null; try { - List results = new ArrayList(); - fetchEntity(entityClass, entityId, entityMetadata, entityMetadata.getRelationNames(), - entityMetadata.getSchema(), results, null, null); + List results = fetchEntity(entityClass, entityId, entityMetadata, + entityMetadata.getRelationNames(), entityMetadata.getSchema(), null, null); if (results != null) { e = (E) results.get(0); @@ -330,7 +338,7 @@ public List findByQuery(Class entityClass, EntityMetadata metadata, Fi try { - results = fetchEntity(entityClass, null, entityMetadata, relationNames, tableName, results, filter, + results = fetchEntity(entityClass, null, entityMetadata, relationNames, tableName, filter, filterClausequeue, columns); } catch (IOException ioex) @@ -362,7 +370,8 @@ public List findByQuery(Class entityClass, EntityMetadata metadata, Fi * the filter clausequeue * @return collection holding results. */ - public List findByRange(Class entityClass, EntityMetadata metadata, byte[] startRow, byte[] endRow, + public List findByRange(final Class entityClass,final EntityMetadata metadata, + final byte[] startRow, final byte[] endRow, String[] columns, Filter f, Queue filterClausequeue) { EntityMetadata entityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClass); @@ -371,7 +380,7 @@ public List findByRange(Class entityClass, EntityMetadata metadata, by String tableName = entityMetadata.getSchema(); List results = new ArrayList(); - FilterList filter = new FilterList(); + final FilterList filter = new FilterList(); if (f != null) { filter.addFilter(f); @@ -388,23 +397,32 @@ public List findByRange(Class entityClass, EntityMetadata metadata, by entityMetadata.getPersistenceUnit()); EntityType entityType = metaModel.entity(entityClass); - + final String[] finalColumns = columns; List subManagedType = ((AbstractManagedType) entityType).getSubManagedType(); if (!subManagedType.isEmpty()) { - for (AbstractManagedType subEntity : subManagedType) + for (final AbstractManagedType subEntity : subManagedType) { - EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, + final EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, subEntity.getJavaType()); - List found = handler.readDataByRange(tableName, subEntityMetadata.getEntityClazz(), - subEntityMetadata, startRow, endRow, columns, filter); + List found = executor.execute(new TableRequest(tableName) { + protected List execute(Table table) throws IOException { + return handler.readDataByRange(table, subEntityMetadata.getEntityClazz(), + subEntityMetadata, startRow, endRow, finalColumns, filter); + } + }); results.addAll(found); } } else { - results = handler.readDataByRange(tableName, entityClass, metadata, startRow, endRow, columns, filter); + results = executor.execute(new TableRequest(tableName) { + protected List execute(Table table) throws IOException { + return handler.readDataByRange(table, entityClass, metadata, startRow, endRow, + finalColumns, filter); + } + }); } if (showQuery && filterClausequeue.size() > 0) { @@ -525,14 +543,18 @@ public void setFetchSize(int fetchSize) * the relations */ @Override - protected void onPersist(EntityMetadata entityMetadata, Object entity, Object id, List relations) + protected void onPersist(final EntityMetadata entityMetadata, + final Object entity, final Object id, final List relations) { - String tableName = entityMetadata.getSchema(); - try { - // Write data to HBase - handler.writeData(tableName, entityMetadata, entity, id, relations, showQuery); + executor.execute(new TableRequest(entityMetadata.getSchema()) { + @Override + protected Void execute(Table table) throws IOException { + handler.writeData(table, entityMetadata, entity, id, relations, showQuery); + return null; + } + }); } catch (IOException e) { @@ -550,16 +572,16 @@ protected void onPersist(EntityMetadata entityMetadata, Object entity, Object id @Override public void persistJoinTable(JoinTableData joinTableData) { - String joinTableName = joinTableData.getJoinTableName(); + final String joinTableName = joinTableData.getJoinTableName(); String invJoinColumnName = joinTableData.getInverseJoinColumnName(); Map> joinTableRecords = joinTableData.getJoinTableRecords(); for (Object key : joinTableRecords.keySet()) { Set values = joinTableRecords.get(key); - Object joinColumnValue = key; + final Object joinColumnValue = key; - Map columns = new HashMap(); + final Map columns = new HashMap<>(); for (Object childValue : values) { Object invJoinColumnValue = childValue; @@ -570,8 +592,14 @@ public void persistJoinTable(JoinTableData joinTableData) { try { - handler.createTableIfDoesNotExist(joinTableData.getSchemaName(), joinTableName); - handler.writeJoinTableData(joinTableData.getSchemaName(), joinColumnValue, columns, joinTableName); + handler.createTableIfDoesNotExist(TableName.valueOf(joinTableData.getSchemaName()), + joinTableName); + executor.execute(new TableRequest(joinTableData.getSchemaName()) { + protected Object execute(Table table) throws IOException { + handler.writeJoinTableData(table, joinColumnValue, columns, joinTableName); + return null; + } + }); KunderaCoreUtils.printQuery("Persist Join Table:" + joinTableName, showQuery); } catch (IOException e) @@ -604,12 +632,19 @@ public List getColumnsById(String schemaName, String joinTableName, Strin * @see com.impetus.kundera.client.Client#deleteByColumn(java.lang.String, * java.lang.String, java.lang.String, java.lang.Object) */ - public void deleteByColumn(String schemaName, String tableName, String columnName, Object columnValue) + public void deleteByColumn(final String schemaName, final String tableName, + final String columnName, final Object columnValue) { try { - handler.deleteRow(columnValue, schemaName, tableName); - KunderaCoreUtils.printQuery("Delete data from " + tableName + " for PK " + columnValue, showQuery); + executor.execute(new TableRequest(schemaName) { + protected Void execute(Table table) throws IOException { + handler.deleteRow(columnValue, table, tableName); + KunderaCoreUtils.printQuery("Delete data from " + tableName + + " for PK " + columnValue, showQuery); + return null; + } + }); } catch (IOException ioex) { @@ -654,14 +689,14 @@ public void delete(Object entity, Object pKey) * java.lang.Object, java.lang.Class) */ @Override - public List findByRelation(String colName, Object colValue, Class entityClazz) + public List findByRelation(final String colName, Object colValue, final Class entityClazz) { - EntityMetadata m = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClazz); + final EntityMetadata m = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClazz); - String columnFamilyName = m.getTableName(); + final String columnFamilyName = m.getTableName(); byte[] valueInBytes = HBaseUtils.getBytes(colValue); - SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(columnFamilyName), + final SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(columnFamilyName), Bytes.toBytes(colName), CompareOp.EQUAL, valueInBytes); List output = new ArrayList(); @@ -673,20 +708,25 @@ public List findByRelation(String colName, Object colValue, Class entity { for (AbstractManagedType subEntity : subManagedType) { - EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, + final EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, subEntity.getJavaType()); - List results = ((HBaseDataHandler) handler).scanData(f, subEntityMetadata.getSchema(), - subEntityMetadata.getEntityClazz(), subEntityMetadata, columnFamilyName, colName); - if (!results.isEmpty()) - { - output.addAll(results); - } + output.addAll(executor.execute(new TableRequest(subEntityMetadata.getSchema()) { + protected Collection execute(Table table) throws IOException { + return ((HBaseDataHandler) handler).scanData(f, table, + subEntityMetadata.getEntityClazz(), subEntityMetadata, + columnFamilyName, colName); + } + })); } } else { - return ((HBaseDataHandler) handler).scanData(f, m.getSchema(), entityClazz, m, columnFamilyName, - colName); + return executor.execute(new TableRequest(m.getSchema()) { + protected List execute(Table table) throws IOException { + return ((HBaseDataHandler) handler).scanData(f, table, entityClazz, m, columnFamilyName, + colName); + } + }); } } catch (IOException ioe) @@ -694,17 +734,6 @@ public List findByRelation(String colName, Object colValue, Class entity log.error("Error during find By Relation, Caused by: .", ioe); throw new KunderaException(ioe); } - catch (InstantiationException ie) - { - log.error("Error during find By Relation, Caused by: .", ie); - throw new KunderaException(ie); - } - catch (IllegalAccessException iae) - { - log.error("Error during find By Relation, Caused by: .", iae); - throw new KunderaException(iae); - } - return output; } @@ -737,20 +766,25 @@ public Class getQueryImplementor() * java.lang.String, java.lang.String, java.lang.Object, java.lang.Class) */ @Override - public Object[] findIdsByColumn(String schemaName, String tableName, String pKeyName, String columnName, - Object columnValue, Class entityClazz) + public Object[] findIdsByColumn(final String schemaName, final String tableName, + String pKeyName, final String columnName, + final Object columnValue, Class entityClazz) { - EntityMetadata m = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClazz); + final EntityMetadata m = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, entityClazz); byte[] valueInBytes = HBaseUtils.getBytes(columnValue); Filter f = new SingleColumnValueFilter(Bytes.toBytes(tableName), Bytes.toBytes(columnName), CompareOp.EQUAL, valueInBytes); KeyOnlyFilter keyFilter = new KeyOnlyFilter(); - FilterList filterList = new FilterList(f, keyFilter); + final FilterList filterList = new FilterList(f, keyFilter); try { - return handler.scanRowyKeys(filterList, schemaName, tableName, columnName + "_" + columnValue, m - .getIdAttribute().getBindableJavaType()); + return executor.execute(new TableRequest(schemaName) { + protected Object[] execute(Table table) throws IOException { + return handler.scanRowyKeys(filterList, table, tableName, columnName + "_" + columnValue, + m.getIdAttribute().getBindableJavaType()); + } + }); } catch (IOException e) { @@ -809,7 +843,7 @@ public void clear() @Override public int executeBatch() { - Map> data = new HashMap>(); + Map> data = new HashMap<>(); try { @@ -818,7 +852,6 @@ public int executeBatch() if (node.isDirty()) { node.handlePreEvent(); - HTableInterface hTable = null; Object rowKey = node.getEntityId(); Object entity = node.getData(); if (node.isInState(RemovedState.class)) @@ -830,32 +863,31 @@ public int executeBatch() EntityMetadata metadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, node.getDataClass()); - HBaseDataWrapper columnWrapper = new HBaseDataHandler.HBaseDataWrapper(rowKey, - new java.util.HashMap(), entity, metadata.getTableName()); + HBaseDataWrapper columnWrapper = new HBaseDataWrapper(rowKey, + new HashMap(), entity, metadata.getTableName()); MetamodelImpl metaModel = (MetamodelImpl) kunderaMetadata.getApplicationMetadata() .getMetamodel(metadata.getPersistenceUnit()); EntityType entityType = metaModel.entity(node.getDataClass()); - List embeddableData = new ArrayList(); + List embeddableData = new ArrayList<>(); - hTable = ((HBaseDataHandler) handler).gethTable(metadata.getSchema()); - ((HBaseDataHandler) handler).preparePersistentData(metadata.getSchema(), + TableName tableName = TableName.valueOf(metadata.getSchema()); + ((HBaseDataHandler) handler).preparePersistentData( metadata.getTableName(), entity, rowKey, metaModel, entityType.getAttributes(), columnWrapper, embeddableData, showQuery); - - List dataSet = null; - if (data.containsKey(hTable)) + List dataSet; + if (data.containsKey(tableName)) { dataSet = data.get(metadata.getTableName()); addRecords(columnWrapper, embeddableData, dataSet); } else { - dataSet = new ArrayList(); + dataSet = new ArrayList<>(); addRecords(columnWrapper, embeddableData, dataSet); - data.put(hTable, dataSet); + data.put(tableName, dataSet); } } node.handlePostEvent(); @@ -973,10 +1005,10 @@ public void reset() * the m * @return the object */ - public Object next(EntityMetadata m) - { - return ((HBaseDataHandler) handler).next(m); - } +// public Object next(EntityMetadata m) +// { +// return ((HBaseDataHandler) handler).next(m, gethTable(m.getSchema())); +// } /** * Checks for next. @@ -1011,8 +1043,6 @@ public HBaseDataHandler getHandle() * the relation names * @param tableName * the table name - * @param results - * the results * @param filter * the filter * @param filterClausequeue @@ -1023,31 +1053,39 @@ public HBaseDataHandler getHandle() * @throws IOException * Signals that an I/O exception has occurred. */ - private List fetchEntity(Class entityClass, Object rowId, EntityMetadata entityMetadata, - List relationNames, String tableName, List results, FilterList filter, Queue filterClausequeue, - String... columns) throws IOException + private List fetchEntity(Class entityClass, final Object rowId, final EntityMetadata entityMetadata, + final List relationNames, String tableName, final FilterList filter, Queue filterClausequeue, + final String... columns) throws IOException { - results = new ArrayList(); - List subManagedType = getSubManagedType(entityClass, entityMetadata); - + List results; + final List subManagedType = getSubManagedType(entityClass, entityMetadata); if (!subManagedType.isEmpty()) { - for (AbstractManagedType subEntity : subManagedType) - { - EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata(kunderaMetadata, - subEntity.getJavaType()); - List tempResults = handler.readData(tableName, subEntityMetadata.getEntityClazz(), subEntityMetadata, - rowId, subEntityMetadata.getRelationNames(), filter, columns); - if (tempResults != null && !tempResults.isEmpty()) - { - results.addAll(tempResults); + results = executor.execute(new TableRequest(tableName) { + protected List execute(Table table) throws IOException { + List results = new ArrayList(); + for (AbstractManagedType subEntity : subManagedType) { + EntityMetadata subEntityMetadata = KunderaMetadataManager.getEntityMetadata( + kunderaMetadata, subEntity.getJavaType()); + List tempResults = handler.readData(table, subEntityMetadata.getEntityClazz(), + subEntityMetadata, rowId, subEntityMetadata.getRelationNames(), + filter, columns); + if (tempResults != null && !tempResults.isEmpty()) { + results.addAll(tempResults); + } + } + return results; } - } + }); } else { - results = handler.readData(tableName, entityMetadata.getEntityClazz(), entityMetadata, rowId, - relationNames, filter, columns); + results = executor.execute(new TableRequest(tableName) { + protected List execute(Table table) throws IOException { + return handler.readData(table, entityMetadata.getEntityClazz(), + entityMetadata, rowId, relationNames, filter, columns); + } + }); } if (rowId != null) { @@ -1093,4 +1131,7 @@ public Generator getIdGenerator() return (Generator) KunderaCoreUtils.createNewInstance(HBaseIdGenerator.class); } + public RequestExecutor getRequestExecutor() { + return executor; + } } \ No newline at end of file diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClientFactory.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClientFactory.java index bef917ab3..abec1cb5b 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClientFactory.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClientFactory.java @@ -15,12 +15,13 @@ ******************************************************************************/ package com.impetus.client.hbase; +import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.StringUtils; +import com.impetus.kundera.KunderaException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,17 +47,10 @@ public class HBaseClientFactory extends GenericClientFactory /** The conf. */ private Configuration conf; - /** The h table pool. */ - private HTablePool hTablePool; - - /** The Constant DEFAULT_POOL_SIZE. */ - private static final int DEFAULT_POOL_SIZE = 100; + private RequestExecutor requestExecutor; private static final String DEFAULT_ZOOKEEPER_PORT = "2181"; - /** The pool size. */ - private int poolSize; - @Override public void initialize(Map externalProperty) { @@ -67,12 +61,10 @@ public void initialize(Map externalProperty) String node = null; String port = null; - String poolSize = null; if (externalProperty != null) { node = (String) externalProperty.get(PersistenceProperties.KUNDERA_NODES); port = (String) externalProperty.get(PersistenceProperties.KUNDERA_PORT); - poolSize = (String) externalProperty.get(PersistenceProperties.KUNDERA_POOL_SIZE_MAX_ACTIVE); } if (node == null) { @@ -82,19 +74,6 @@ public void initialize(Map externalProperty) { port = puMetadata.getProperties().getProperty(PersistenceProperties.KUNDERA_PORT); } - if (poolSize == null) - { - poolSize = puMetadata.getProperties().getProperty(PersistenceProperties.KUNDERA_POOL_SIZE_MAX_ACTIVE); - } - - if (StringUtils.isEmpty(poolSize)) - { - this.poolSize = DEFAULT_POOL_SIZE; - } - else - { - this.poolSize = Integer.parseInt(poolSize); - } onValidation(node, port); @@ -123,16 +102,20 @@ public void initialize(Map externalProperty) } @Override - protected Object createPoolOrConnection() - { - hTablePool = new HTablePool(conf, poolSize); - return hTablePool; + protected Object createPoolOrConnection() { + try { + return requestExecutor = new SingleConnectionRequestExecutor( + ConnectionFactory.createConnection(conf)); + } catch (IOException e) { + throw new KunderaException(e); + } } @Override protected Client instantiateClient(String persistenceUnit) { - return new HBaseClient(indexManager, conf, hTablePool, reader, persistenceUnit, externalProperties, clientMetadata, kunderaMetadata); + return new HBaseClient(indexManager, conf, requestExecutor, reader, persistenceUnit, + externalProperties, clientMetadata, kunderaMetadata); } @Override diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseIdGenerator.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseIdGenerator.java index 0dd50a59d..cc86cef6a 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseIdGenerator.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseIdGenerator.java @@ -17,7 +17,7 @@ import java.io.IOException; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,14 +46,19 @@ public class HBaseIdGenerator implements TableGenerator * com.impetus.kundera.client.ClientBase, java.lang.Object) */ @Override - public Object generate(TableGeneratorDiscriptor discriptor, ClientBase client, String dataType) + public Object generate(final TableGeneratorDiscriptor discriptor, + ClientBase client, String dataType) { try { - HTableInterface hTable = ((HBaseDataHandler) ((HBaseClient) client).handler).gethTable(discriptor - .getSchema()); - Long latestCount = hTable.incrementColumnValue(discriptor.getPkColumnValue().getBytes(), discriptor - .getTable().getBytes(), discriptor.getValueColumnName().getBytes(), 1); + Long latestCount = ((HBaseClient) client).getRequestExecutor().execute( + new TableRequest(discriptor.getSchema()) { + protected Long execute(Table table) throws IOException { + return table.incrementColumnValue(discriptor.getPkColumnValue().getBytes(), + discriptor.getTable().getBytes(), + discriptor.getValueColumnName().getBytes(), 1); + } + }); if (latestCount == 1) { return (long) discriptor.getInitialValue(); diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseRequest.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseRequest.java new file mode 100644 index 000000000..f7c9ace53 --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseRequest.java @@ -0,0 +1,9 @@ +package com.impetus.client.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Connection; + +public interface HBaseRequest { + T execute(Connection connection) throws IOException; +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Reader.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Reader.java index d4e882272..05df82a28 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Reader.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Reader.java @@ -18,7 +18,7 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; // TODO: Auto-generated Javadoc @@ -43,7 +43,7 @@ public interface Reader * @throws IOException * Signals that an I/O exception has occurred. */ - List LoadData(HTableInterface hTable, String columnFamily, Object rowKey, Filter filter, + List LoadData(Table hTable, String columnFamily, Object rowKey, Filter filter, String... columns) throws IOException; /** @@ -59,7 +59,7 @@ List LoadData(HTableInterface hTable, String columnFamily, Object row * @throws IOException * Signals that an I/O exception has occurred. */ - List LoadData(HTableInterface hTable, Object rowKey, Filter filter, String... columns) + List LoadData(Table hTable, Object rowKey, Filter filter, String... columns) throws IOException; /** @@ -78,7 +78,7 @@ List LoadData(HTableInterface hTable, Object rowKey, Filter filter, S * @throws IOException * Signals that an I/O exception has occurred. */ - List loadAll(HTableInterface hTable, Filter filter, byte[] startRow, byte[] endRow, String columnFamily, + List loadAll(Table hTable, Filter filter, byte[] startRow, byte[] endRow, String columnFamily, String qualifier, String[] columns) throws IOException; /** @@ -96,6 +96,6 @@ List loadAll(HTableInterface hTable, Filter filter, byte[] startRow, * @throws IOException * Signals that an I/O exception has occurred. */ - Object[] scanRowKeys(final HTableInterface hTable, final Filter filter, final String columnFamilyName, + Object[] scanRowKeys(final Table hTable, final Filter filter, final String columnFamilyName, final String columnName, final Class rowKeyClazz) throws IOException; } diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/RequestExecutor.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/RequestExecutor.java new file mode 100644 index 000000000..d7cea641a --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/RequestExecutor.java @@ -0,0 +1,8 @@ +package com.impetus.client.hbase; + +import java.io.Closeable; +import java.io.IOException; + +public interface RequestExecutor extends Closeable { + T execute(HBaseRequest request) throws IOException; +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/SingleConnectionRequestExecutor.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/SingleConnectionRequestExecutor.java new file mode 100644 index 000000000..7ce4dbf05 --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/SingleConnectionRequestExecutor.java @@ -0,0 +1,23 @@ +package com.impetus.client.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Connection; + +public class SingleConnectionRequestExecutor implements RequestExecutor { + private final Connection connection; + + public SingleConnectionRequestExecutor(Connection connection) { + this.connection = connection; + } + + @Override + public T execute(HBaseRequest request) throws IOException { + return request.execute(connection); + } + + @Override + public void close() throws IOException { + connection.close(); + } +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/TableRequest.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/TableRequest.java new file mode 100644 index 000000000..99def270c --- /dev/null +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/TableRequest.java @@ -0,0 +1,28 @@ +package com.impetus.client.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; + +public abstract class TableRequest implements HBaseRequest { + private final TableName tableName; + + public TableRequest(TableName tableName) { + this.tableName = tableName; + } + + public TableRequest(String tableName) { + this(TableName.valueOf(tableName)); + } + + @Override + public T execute(Connection connection) throws IOException { + try(Table table = connection.getTable(tableName)) { + return execute(table); + } + } + + protected abstract T execute(Table table) throws IOException; +} diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Writer.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Writer.java index 325c1f3d1..a19bf8682 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Writer.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/Writer.java @@ -22,7 +22,8 @@ import javax.persistence.metamodel.Attribute; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import com.impetus.client.hbase.admin.HBaseDataHandler.HBaseDataWrapper; import com.impetus.kundera.db.RelationHolder; @@ -38,8 +39,8 @@ public interface Writer /** * Write column. * - * @param htable - * the htable + * @param table + * the table * @param columnFamily * the column family * @param rowKey @@ -51,7 +52,7 @@ public interface Writer * @throws IOException * Signals that an I/O exception has occurred. */ - void writeColumn(HTableInterface htable, String columnFamily, Object rowKey, Attribute column, Object columnObj) + void writeColumn(Table table, String columnFamily, Object rowKey, Attribute column, Object columnObj) throws IOException; /** @@ -72,15 +73,15 @@ void writeColumn(HTableInterface htable, String columnFamily, Object rowKey, Att * @throws IOException * Signals that an I/O exception has occurred. */ - void writeColumns(HTableInterface htable, String columnFamily, Object rowKey, Map columns, + void writeColumns(Table htable, String columnFamily, Object rowKey, Map columns, Map values, Object columnFamilyObj) throws IOException; /** * Writes Columns columns into a given table. Each columns is * written in their own column family(name same as column name) * - * @param htable - * the htable + * @param table + * the table * @param rowKey * the row key * @param columns @@ -90,14 +91,14 @@ void writeColumns(HTableInterface htable, String columnFamily, Object rowKey, Ma * @throws IOException * Signals that an I/O exception has occurred. */ - void writeColumns(HTableInterface htable, Object rowKey, Map columns, Object entity, + void writeColumns(Table table, Object rowKey, Map columns, Object entity, String columnFamilyName) throws IOException; /** * Write relations. * - * @param htable - * the htable + * @param table + * the table * @param rowKey * the row key * @param containsEmbeddedObjectsOnly @@ -107,7 +108,7 @@ void writeColumns(HTableInterface htable, Object rowKey, Map * @throws IOException * Signals that an I/O exception has occurred. */ - void writeRelations(HTableInterface htable, Object rowKey, boolean containsEmbeddedObjectsOnly, + void writeRelations(Table table, Object rowKey, boolean containsEmbeddedObjectsOnly, List relations, String columnFamilyName) throws IOException; /** @@ -115,8 +116,8 @@ void writeRelations(HTableInterface htable, Object rowKey, boolean containsEmbed * column family named FKey-TO. Each column corresponds to foreign key field * name and values are actual foreign keys (separated by ~ if applicable) * - * @param hTable - * the h table + * @param table + * the table * @param rowKey * the row key * @param foreignKeyMap @@ -125,15 +126,15 @@ void writeRelations(HTableInterface htable, Object rowKey, boolean containsEmbed * Signals that an I/O exception has occurred. * @deprecated */ - public void writeForeignKeys(HTableInterface hTable, String rowKey, Map> foreignKeyMap) + public void writeForeignKeys(Table table, String rowKey, Map> foreignKeyMap) throws IOException; /** * Writes columns data to HBase table, supplied as a map in Key/ value pair; * key and value representing column name and value respectively. * - * @param htable - * the htable + * @param table + * the table * @param rowKey * the row key * @param columns @@ -141,20 +142,20 @@ public void writeForeignKeys(HTableInterface hTable, String rowKey, Map columns, String columnFamilyName) + void writeColumns(Table table, Object rowKey, Map columns, String columnFamilyName) throws IOException; /** * Delete. * - * @param hTable - * the h table + * @param table + * the table * @param rowKey * the row key * @param columnFamily * the column family */ - void delete(HTableInterface hTable, Object rowKey, String columnFamily); + void delete(Table table, Object rowKey, String columnFamily); /** * method to perform batch insert/update. @@ -164,5 +165,5 @@ void writeColumns(HTableInterface htable, Object rowKey, Map col * @throws IOException * throws io exception. */ - void persistRows(Map> rows) throws IOException; + void persistRows(Map> rows) throws IOException; } diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/DataHandler.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/DataHandler.java index 1049a9444..eb4ebc2b8 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/DataHandler.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/DataHandler.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterList; import com.impetus.kundera.db.RelationHolder; @@ -43,13 +45,12 @@ public interface DataHandler * @throws IOException * Signals that an I/O exception has occurred. */ - void createTableIfDoesNotExist(String tableName, String... colFamily) throws IOException; + void createTableIfDoesNotExist(TableName tableName, String... colFamily) throws IOException; /** * Populates data for give column family, column name, and HBase table name. * - * @param tableName - * the table name + * @param table * @param clazz * the clazz * @param m @@ -63,19 +64,18 @@ public interface DataHandler * @throws IOException * Signals that an I/O exception has occurred. */ - List readData(String tableName, Class clazz, EntityMetadata m, Object rowKey, List relatationNames, + List readData(Table table, Class clazz, EntityMetadata m, Object rowKey, List relationNames, FilterList f, String... columns) throws IOException; /** * Populates data for give column family, column name, and HBase table name. * - * @param tableName - * the table name + * @param table * @param clazz * the clazz * @param m * the m - * @param rowKey + * @param rowKeys * the row key * @param relationNames * the relation names @@ -83,28 +83,25 @@ List readData(String tableName, Class clazz, EntityMetadata m, Object rowKey, Li * @throws IOException * Signals that an I/O exception has occurred. */ - List readAll(String tableName, Class clazz, EntityMetadata m, List rowKeys, List relatationNames, + List readAll(Table table, Class clazz, EntityMetadata m, List rowKeys, List relationNames, String... columns) throws IOException; /** - * @param tableName * @param clazz * @param m - * @param relationNames * @param startRow * @param endRow * @param columns * @param f * @return */ - List readDataByRange(String tableName, Class clazz, EntityMetadata m, byte[] startRow, byte[] endRow, + List readDataByRange(Table table, Class clazz, EntityMetadata m, byte[] startRow, byte[] endRow, String[] columns, FilterList f) throws IOException; /** * Write data. * - * @param tableName - * the table name + * @param table * @param m * the m * @param entity @@ -116,14 +113,13 @@ List readDataByRange(String tableName, Class clazz, EntityMetadata m, byte[] sta * @throws IOException * Signals that an I/O exception has occurred. */ - void writeData(String tableName, EntityMetadata m, Object entity, Object rowId, List relations, + void writeData(Table table, EntityMetadata m, Object entity, Object rowId, List relations, boolean showQuery) throws IOException; /** * Writes data into Join Table. * - * @param tableName - * the table name + * @param table * @param rowId * the row id * @param columns @@ -131,7 +127,7 @@ void writeData(String tableName, EntityMetadata m, Object entity, Object rowId, * @throws IOException * Signals that an I/O exception has occurred. */ - void writeJoinTableData(String tableName, Object rowId, Map columns, String columnFamilyName) + void writeJoinTableData(Table table, Object rowId, Map columns, String columnFamilyName) throws IOException; /** @@ -174,13 +170,13 @@ List findParentEntityFromJoinTable(EntityMetadata parentMetadata, String * * @param rowKey * the row key - * @param tableName + * @param table * the table name * @throws IOException * Signals that an I/O exception has occurred. */ - void deleteRow(Object rowKey, String tableName, String columnFamilyName) throws IOException; + void deleteRow(Object rowKey, Table table, String columnFamilyName) throws IOException; - Object[] scanRowyKeys(FilterList filterList, String tableName, String columnFamilyName, String columnName, + Object[] scanRowyKeys(FilterList filterList, Table table, String columnFamilyName, String columnName, Class rowKeyClazz) throws IOException; } diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/HBaseDataHandler.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/HBaseDataHandler.java index f8fd40860..3a4a93453 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/HBaseDataHandler.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/admin/HBaseDataHandler.java @@ -34,13 +34,15 @@ import javax.persistence.metamodel.EmbeddableType; import javax.persistence.metamodel.EntityType; +import com.impetus.client.hbase.AdminRequest; +import com.impetus.client.hbase.RequestExecutor; +import com.impetus.client.hbase.TableRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; @@ -81,20 +83,15 @@ public class HBaseDataHandler implements DataHandler /** the log used by this class. */ private static Logger log = LoggerFactory.getLogger(HBaseDataHandler.class); - /** The admin. */ - private HBaseAdmin admin; - /** The conf. */ private Configuration conf; - /** The h table pool. */ - private HTablePool hTablePool; - /** The hbase reader. */ private Reader hbaseReader = new HBaseReader(); + private RequestExecutor executor; /** The hbase writer. */ - private Writer hbaseWriter = new HBaseWriter(); + private Writer hbaseWriter; private FilterList filter = null; @@ -107,17 +104,16 @@ public class HBaseDataHandler implements DataHandler * * @param conf * the conf - * @param hTablePool - * the h table pool */ - public HBaseDataHandler(final KunderaMetadata kunderaMetadata, Configuration conf, HTablePool hTablePool) + public HBaseDataHandler(final KunderaMetadata kunderaMetadata, Configuration conf, + RequestExecutor executor) { try { this.kunderaMetadata = kunderaMetadata; this.conf = conf; - this.hTablePool = hTablePool; - this.admin = new HBaseAdmin(conf); + this.executor = executor; + hbaseWriter = new HBaseWriter(executor); } catch (Exception e) { @@ -134,78 +130,22 @@ public HBaseDataHandler(final KunderaMetadata kunderaMetadata, Configuration con * java.lang.String, java.lang.String[]) */ @Override - public void createTableIfDoesNotExist(final String tableName, final String... colFamily) - throws MasterNotRunningException, IOException - { - if (!admin.tableExists(Bytes.toBytes(tableName))) - { - HTableDescriptor htDescriptor = new HTableDescriptor(tableName); - for (String columnFamily : colFamily) - { - HColumnDescriptor familyMetadata = new HColumnDescriptor(columnFamily); - htDescriptor.addFamily(familyMetadata); - } - admin.createTable(htDescriptor); - } - } - - /** - * Adds the column family to table. - * - * @param tableName - * the table name - * @param columnFamilyName - * the column family name - * @throws IOException - * Signals that an I/O exception has occurred. - */ - private void addColumnFamilyToTable(String tableName, String columnFamilyName) throws IOException + public void createTableIfDoesNotExist(final TableName tableName, final String... colFamily) + throws IOException { - HColumnDescriptor cfDesciptor = new HColumnDescriptor(columnFamilyName); - - try - { - if (admin.tableExists(tableName)) - { - - // Before any modification to table schema, it's necessary to - // disable it - if (!admin.isTableEnabled(tableName)) - { - admin.enableTable(tableName); + executor.execute(new AdminRequest() { + protected void execute(Admin admin) throws IOException { + if (admin.tableExists(tableName)) { + return; } - HTableDescriptor descriptor = admin.getTableDescriptor(tableName.getBytes()); - boolean found = false; - for (HColumnDescriptor hColumnDescriptor : descriptor.getColumnFamilies()) - { - if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(columnFamilyName)) - found = true; - } - if (!found) - { - - if (admin.isTableEnabled(tableName)) - { - admin.disableTable(tableName); - } - - admin.addColumn(tableName, cfDesciptor); - - // Enable table once done - admin.enableTable(tableName); + HTableDescriptor htDescriptor = new HTableDescriptor(tableName); + for (String columnFamily : colFamily) { + HColumnDescriptor familyMetadata = new HColumnDescriptor(columnFamily); + htDescriptor.addFamily(familyMetadata); } + admin.createTable(htDescriptor); } - else - { - log.warn("Table {} doesn't exist, so no question of adding column family {} to it!", tableName, - columnFamilyName); - } - } - catch (IOException e) - { - log.error("Error while adding column family {}, to table{} . ", columnFamilyName, tableName); - throw e; - } + }); } /* @@ -217,18 +157,10 @@ private void addColumnFamilyToTable(String tableName, String columnFamilyName) t * java.lang.String, java.util.List) */ @Override - public List readData(final String tableName, Class clazz, EntityMetadata m, final Object rowKey, + public List readData(final Table table, Class clazz, EntityMetadata m, final Object rowKey, List relationNames, FilterList f, String... columns) throws IOException { - List output = null; - - Object entity = null; - - HTableInterface hTable = null; - - hTable = gethTable(tableName); - if (getFilter(m.getTableName()) != null) { if (f == null) @@ -248,13 +180,13 @@ public List readData(final String tableName, Class clazz, EntityMetadata m, fina .getSecondaryTablesName(); secondaryTables.add(m.getTableName()); Collections.shuffle(secondaryTables); - List results = new ArrayList(); + List results = new ArrayList<>(); for (String colTableName : secondaryTables) { - results.addAll(hbaseReader.LoadData(hTable, colTableName, rowKey, f, columns)); + results.addAll(hbaseReader.LoadData(table, colTableName, rowKey, f, columns)); } - output = onRead(tableName, clazz, m, output, hTable, entity, relationNames, results); + output = onRead(clazz, m, output, relationNames, results); return output; } @@ -267,18 +199,10 @@ public List readData(final String tableName, Class clazz, EntityMetadata m, fina * java.lang.String, java.util.List) */ @Override - public List readAll(final String tableName, Class clazz, EntityMetadata m, final List rowKey, + public List readAll(final Table hTable, Class clazz, EntityMetadata m, final List rowKey, List relationNames, String... columns) throws IOException { - List output = null; - - Object entity = null; - - HTableInterface hTable = null; - - hTable = gethTable(tableName); - // Load raw data from HBase MetamodelImpl metaModel = (MetamodelImpl) kunderaMetadata.getApplicationMetadata().getMetamodel( m.getPersistenceUnit()); @@ -300,7 +224,7 @@ public List readAll(final String tableName, Class clazz, EntityMetadata m, final } } - output = onRead(tableName, clazz, m, output, hTable, entity, relationNames, results); + output = onRead(clazz, m, output, relationNames, results); return output; } @@ -314,12 +238,10 @@ public List readAll(final String tableName, Class clazz, EntityMetadata m, final * byte[], byte[]) */ @Override - public List readDataByRange(String tableName, Class clazz, EntityMetadata m, byte[] startRow, byte[] endRow, + public List readDataByRange(Table table, Class clazz, EntityMetadata m, byte[] startRow, byte[] endRow, String[] columns, FilterList f) throws IOException { List output = new ArrayList(); - HTableInterface hTable = null; - Object entity = null; List relationNames = m.getRelationNames(); Filter filter = getFilter(m.getTableName()); if (filter != null) @@ -331,10 +253,8 @@ public List readDataByRange(String tableName, Class clazz, EntityMetadata m, byt f.addFilter(filter); } // Load raw data from HBase - hTable = gethTable(tableName); - List results = hbaseReader.loadAll(hTable, f, startRow, endRow, m.getTableName(), null, columns); - output = onRead(tableName, clazz, m, output, hTable, entity, relationNames, results); - + List results = hbaseReader.loadAll(table, f, startRow, endRow, m.getTableName(), null, columns); + output = onRead(clazz, m, output, relationNames, results); return output; } @@ -347,11 +267,9 @@ public List readDataByRange(String tableName, Class clazz, EntityMetadata m, byt * java.lang.String, java.util.List) */ @Override - public void writeData(String tableName, EntityMetadata m, Object entity, Object rowId, + public void writeData(Table table, EntityMetadata m, Object entity, Object rowId, List relations, boolean showQuery) throws IOException { - HTableInterface hTable = gethTable(tableName); - MetamodelImpl metaModel = (MetamodelImpl) kunderaMetadata.getApplicationMetadata().getMetamodel( m.getPersistenceUnit()); @@ -366,16 +284,16 @@ public void writeData(String tableName, EntityMetadata m, Object entity, Object HBaseDataWrapper columnWrapper = new HBaseDataWrapper(rowId, new java.util.HashMap(), entity, null); - List persistentData = new ArrayList(attributes.size()); + List persistentData = new ArrayList<>(attributes.size()); - Map columnWrappers = preparePersistentData(tableName, m.getTableName(), entity, + Map columnWrappers = preparePersistentData(m.getTableName(), entity, rowId, metaModel, attributes, columnWrapper, persistentData, showQuery); - writeColumnData(hTable, entity, columnWrappers); + writeColumnData(table, entity, columnWrappers); for (HBaseDataWrapper wrapper : persistentData) { - hbaseWriter.writeColumns(hTable, wrapper.getColumnFamily(), wrapper.getRowKey(), wrapper.getColumns(), + hbaseWriter.writeColumns(table, wrapper.getColumnFamily(), wrapper.getRowKey(), wrapper.getColumns(), wrapper.getValues(), wrapper.getEntity()); } @@ -385,7 +303,7 @@ public void writeData(String tableName, EntityMetadata m, Object entity, Object if (relations != null && !relations.isEmpty()) { - hbaseWriter.writeRelations(hTable, rowId, containsEmbeddedObjectsOnly, relations, m.getTableName()); + hbaseWriter.writeRelations(table, rowId, containsEmbeddedObjectsOnly, relations, m.getTableName()); } // add discriminator column @@ -398,19 +316,17 @@ public void writeData(String tableName, EntityMetadata m, Object entity, Object { List discriminator = new ArrayList(1); discriminator.add(new RelationHolder(discrColumn, discrValue)); - hbaseWriter.writeRelations(hTable, rowId, containsEmbeddedObjectsOnly, discriminator, m.getTableName()); + hbaseWriter.writeRelations(table, rowId, containsEmbeddedObjectsOnly, discriminator, m.getTableName()); } - - puthTable(hTable); } - private void writeColumnData(HTableInterface hTable, Object entity, Map columnWrappers) + private void writeColumnData(Table table, Object entity, Map columnWrappers) throws IOException { for (HBaseDataWrapper wrapper : columnWrappers.values()) { - hbaseWriter.writeColumns(hTable, wrapper.getRowKey(), wrapper.getColumns(), entity, + hbaseWriter.writeColumns(table, wrapper.getRowKey(), wrapper.getColumns(), entity, wrapper.getColumnFamily()); } @@ -424,14 +340,10 @@ private void writeColumnData(HTableInterface hTable, Object entity, Map columns, String columnFamilyName) + public void writeJoinTableData(Table hTable, Object rowId, Map columns, String columnFamilyName) throws IOException { - HTableInterface hTable = gethTable(tableName); - hbaseWriter.writeColumns(hTable, rowId, columns, columnFamilyName); - - puthTable(hTable); } /* @@ -442,19 +354,18 @@ public void writeJoinTableData(String tableName, Object rowId, Map List getForeignKeysFromJoinTable(String schemaName, String joinTableName, Object rowKey, - String inverseJoinColumnName) + public List getForeignKeysFromJoinTable(String schemaName, + final String joinTableName, final Object rowKey, String inverseJoinColumnName) { List foreignKeys = new ArrayList(); - - HTableInterface hTable = null; - // Load raw data from Join Table in HBase try { - hTable = gethTable(schemaName); - - List results = hbaseReader.LoadData(hTable, joinTableName, rowKey, getFilter(joinTableName)); + List results = executor.execute(new TableRequest>(schemaName) { + protected List execute(Table table) throws IOException { + return hbaseReader.LoadData(table, joinTableName, rowKey, getFilter(joinTableName)); + } + }); // assuming rowKey is not null. if (results != null && !results.isEmpty()) @@ -482,51 +393,10 @@ public List getForeignKeysFromJoinTable(String schemaName, String joinTab } catch (IOException e) { - return foreignKeys; - } - finally - { - try - { - if (hTable != null) - { - puthTable(hTable); - } - } - catch (IOException e) - { - - // Do nothing. - } } return foreignKeys; } - /** - * Selects an HTable from the pool and returns. - * - * @param tableName - * Name of HBase table - * @return the h table - * @throws IOException - * Signals that an I/O exception has occurred. - */ - public HTableInterface gethTable(final String tableName) throws IOException - { - return hTablePool.getTable(tableName); - } - - /** - * Puts HTable back into the HBase table pool. - * - * @param hTable - * HBase Table instance - */ - private void puthTable(HTableInterface hTable) throws IOException - { - hTablePool.putTable(hTable); - } - /* * (non-Javadoc) * @@ -782,9 +652,6 @@ else if (columnValue != null && columnValue.length > 0) /** * Sets the h base data into object. * - * @param colData - * the col data - * @param columnFamilyField * the column family field * @param columnNameToFieldMap * the column name to field map @@ -820,9 +687,9 @@ private void setHBaseDataIntoObject(String columnName, byte[] columnValue, Map relationNames, List results) throws IOException + private List onRead(Class clazz, EntityMetadata m, List output, + List relationNames, List results) throws IOException { try { @@ -892,7 +754,7 @@ private List onRead(String tableName, Class clazz, EntityMetadata m, List output for (HBaseData data : results) { - entity = KunderaCoreUtils.createNewInstance(clazz); + Object entity = KunderaCoreUtils.createNewInstance(clazz); /* Set Row Key */ MetamodelImpl metaModel = (MetamodelImpl) kunderaMetadata.getApplicationMetadata().getMetamodel( @@ -931,14 +793,6 @@ private List onRead(String tableName, Class clazz, EntityMetadata m, List output log.error("Error while creating an instance of {}, Caused by: .", clazz, e); throw new PersistenceException(e); } - finally - { - if (hTable != null) - { - puthTable(hTable); - } - } - return output; } @@ -1047,8 +901,8 @@ public String getColumnFamily() } - public List scanData(Filter f, final String tableName, Class clazz, EntityMetadata m, String columnFamily, - String qualifier) throws IOException, InstantiationException, IllegalAccessException + public List scanData(Filter f, final Table table, Class clazz, EntityMetadata m, String columnFamily, + String qualifier) throws IOException { List returnedResults = new ArrayList(); MetamodelImpl metaModel = (MetamodelImpl) kunderaMetadata.getApplicationMetadata().getMetamodel( @@ -1070,28 +924,30 @@ else if (attr.isCollection()) break; } } - List results = hbaseReader.loadAll(gethTable(tableName), f, null, null, m.getTableName(), + List results = hbaseReader.loadAll(table, f, null, null, m.getTableName(), isCollection ? qualifier : null, null); if (results != null) { for (HBaseData row : results) { - Object entity = clazz.newInstance();// Entity Object - /* Set Row Key */ - PropertyAccessorHelper.setId(entity, m, HBaseUtils.fromBytes(m, metaModel, row.getRowKey())); - - returnedResults.add(populateEntityFromHbaseData(entity, row, m, row.getRowKey(), m.getRelationNames())); + try { + Object entity = clazz.newInstance(); + /* Set Row Key */ + PropertyAccessorHelper.setId(entity, m, HBaseUtils.fromBytes(m, metaModel, row.getRowKey())); + returnedResults.add(populateEntityFromHbaseData(entity, row, m, row.getRowKey(), m.getRelationNames())); + } catch (IllegalAccessException|InstantiationException e) { + log.error("Error during find By Relation, Caused by: .", e); + throw new KunderaException(e); + } } } return returnedResults; } @Override - public Object[] scanRowyKeys(FilterList filterList, String tableName, String columnFamilyName, String columnName, + public Object[] scanRowyKeys(FilterList filterList, Table hTable, String columnFamilyName, String columnName, final Class rowKeyClazz) throws IOException { - HTableInterface hTable = null; - hTable = gethTable(tableName); return hbaseReader.scanRowKeys(hTable, filterList, columnFamilyName, columnName, rowKeyClazz); } @@ -1117,7 +973,7 @@ private Object getObjectFromByteArray(EntityType entityType, byte[] value, Strin /** * - * @param tableName + * @param columnFamily * @param entity * @param rowId * @param metaModel @@ -1127,12 +983,12 @@ private Object getObjectFromByteArray(EntityType entityType, byte[] value, Strin * @return * @throws IOException */ - public Map preparePersistentData(String tableName, String columnFamily, Object entity, + public Map preparePersistentData(String columnFamily, Object entity, Object rowId, MetamodelImpl metaModel, Set attributes, HBaseDataWrapper columnWrapper, List persistentData, boolean showQuery) throws IOException { - Map persistentDataWrappers = new HashMap(); + Map persistentDataWrappers = new HashMap<>(); persistentDataWrappers.put(columnFamily, columnWrapper); StringBuilder printQuery = null; if (showQuery) @@ -1353,7 +1209,7 @@ private Map>, HBaseDataWrapper> getEmbeddableHBaseWrapperOb * @param data * @throws IOException */ - public void batch_insert(Map> data) throws IOException + public void batch_insert(Map> data) throws IOException { hbaseWriter.persistRows(data); } @@ -1365,15 +1221,13 @@ public void setFetchSize(final int fetchSize) public Object next(EntityMetadata m) { - Object entity = null; HBaseData result = ((HBaseReader) hbaseReader).next(); List results = new ArrayList(); List output = new ArrayList(); results.add(result); try { - output = onRead(m.getSchema(), m.getEntityClazz(), m, output, gethTable(m.getSchema()), entity, - m.getRelationNames(), results); + output = onRead(m.getEntityClazz(), m, output, m.getRelationNames(), results); } catch (IOException e) { @@ -1403,7 +1257,8 @@ public void resetFilter() public HBaseDataHandler getHandle() { - HBaseDataHandler handler = new HBaseDataHandler(this.kunderaMetadata, this.conf, this.hTablePool); + HBaseDataHandler handler = new HBaseDataHandler(this.kunderaMetadata, + this.conf, executor); handler.filter = this.filter; handler.filters = this.filters; return handler; diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/query/ResultIterator.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/query/ResultIterator.java index 40ff9f792..113a43c62 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/query/ResultIterator.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/query/ResultIterator.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.NoSuchElementException; +import com.impetus.client.hbase.RequestExecutor; +import com.impetus.client.hbase.TableRequest; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.slf4j.Logger; @@ -50,6 +53,8 @@ class ResultIterator implements IResultIterator { private HBaseClient client; + private RequestExecutor executor; + private EntityMetadata entityMetadata; private PersistenceDelegator persistenceDelegator; @@ -69,13 +74,15 @@ class ResultIterator implements IResultIterator /** the log used by this class. */ private static Logger log = LoggerFactory.getLogger(ResultIterator.class); - public ResultIterator(HBaseClient client, EntityMetadata m, PersistenceDelegator pd, int fetchSize, + public ResultIterator(HBaseClient client, + EntityMetadata m, PersistenceDelegator pd, int fetchSize, QueryTranslator translator, List columns) { this.entityMetadata = m; this.client = client; + this.executor = client.getRequestExecutor(); this.persistenceDelegator = pd; - this.handler = ((HBaseClient) client).getHandle(); + this.handler = client.getHandle(); this.handler.setFetchSize(fetchSize); this.fetchSize = fetchSize; this.translator = translator; @@ -113,7 +120,7 @@ public E next() } else { - return result = setRelationEntities(result, client, entityMetadata); + return setRelationEntities(result, client, entityMetadata); } } @@ -140,7 +147,7 @@ private E setRelationEntities(Object enhanceEntity, Client client, EntityMetadat EnhanceEntity ee = (EnhanceEntity) enhanceEntity; result = (E) client.getReader().recursivelyFindEntities(ee.getEntity(), ee.getRelations(), m, - persistenceDelegator, false, new HashMap()); + persistenceDelegator, false, new HashMap<>()); } return result; @@ -156,65 +163,59 @@ private E setRelationEntities(Object enhanceEntity, Client client, EntityMetadat * hbase client * @return list of entities. */ - private void onQuery(EntityMetadata m, Client client) + private void onQuery(final EntityMetadata m, final Client client) { try { // Called only in case of standalone entity. - FilterList filter = null; - if (translator.getFilter() != null) - { + final FilterList filter; + if (translator.getFilter() != null) { filter = new FilterList(translator.getFilter()); + } else { + filter = null; } - String[] columnAsArr = getColumnsAsArray(); + final String[] columnAsArr = getColumnsAsArray(); - if (isFindKeyOnly(m, columnAsArr)) - { + if (isFindKeyOnly(m, columnAsArr)) { this.handler.setFilter(new KeyOnlyFilter()); } + executor.execute(new TableRequest(m.getSchema()) { + protected Void execute(Table table) throws IOException { + if (filter == null && columns != null) { - if (filter == null && columns != null) - { - handler.readDataByRange(m.getSchema(), m.getEntityClazz(), m, translator.getStartRow(), - translator.getEndRow(), columnAsArr, null); - } - if (MetadataUtils.useSecondryIndex(((ClientBase) client).getClientMetadata())) - { - if (filter == null) - { - // means complete scan without where clause, scan all - // records. - // findAll. - if (translator.isRangeScan()) - { - handler.readDataByRange(m.getSchema(), m.getEntityClazz(), m, translator.getStartRow(), + handler.readDataByRange(table, m.getEntityClazz(), m, translator.getStartRow(), translator.getEndRow(), columnAsArr, null); } - else - { - handler.readDataByRange(m.getSchema(), m.getEntityClazz(), m, null, null, columnAsArr, null); + if (MetadataUtils.useSecondryIndex(((ClientBase) client).getClientMetadata())) { + if (filter == null) { + // means complete scan without where clause, scan all + // records. + // findAll. + if (translator.isRangeScan()) { + handler.readDataByRange(table, m.getEntityClazz(), m, translator.getStartRow(), + translator.getEndRow(), columnAsArr, null); + } else { + handler.readDataByRange(table, m.getEntityClazz(), m, null, null, columnAsArr, null); + } + } else { + // means WHERE clause is present. + if (translator.isRangeScan()) { + handler.readDataByRange(table, m.getEntityClazz(), m, translator.getStartRow(), + translator.getEndRow(), columnAsArr, filter); + } else { + // if range query. means query over id column. create + // range + // scan method. + + handler.readData(table, entityMetadata.getEntityClazz(), entityMetadata, null, + m.getRelationNames(), filter, columnAsArr); + } + } } + return null; } - else - { - // means WHERE clause is present. - if (translator.isRangeScan()) - { - handler.readDataByRange(m.getSchema(), m.getEntityClazz(), m, translator.getStartRow(), - translator.getEndRow(), columnAsArr, filter); - } - else - { - // if range query. means query over id column. create - // range - // scan method. - - handler.readData(m.getSchema(), entityMetadata.getEntityClazz(), entityMetadata, null, - m.getRelationNames(), filter, columnAsArr); - } - } - } + }); } catch (IOException ioex) { diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseReader.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseReader.java index 77659d1fe..605615706 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseReader.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseReader.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -57,7 +57,7 @@ public class HBaseReader implements Reader */ @Override - public List LoadData(HTableInterface hTable, String columnFamily, Object rowKey, Filter filter, + public List LoadData(Table hTable, String columnFamily, Object rowKey, Filter filter, String... columns) throws IOException { List results = new ArrayList(); @@ -124,10 +124,10 @@ public List LoadData(HTableInterface hTable, String columnFamily, Obj * .HTable, java.lang.String) */ @Override - public List LoadData(HTableInterface hTable, Object rowKey, Filter filter, String... columns) + public List LoadData(Table hTable, Object rowKey, Filter filter, String... columns) throws IOException { - return LoadData(hTable, Bytes.toString(hTable.getTableName()), rowKey, filter, columns); + return LoadData(hTable, Bytes.toString(hTable.getName().getName()), rowKey, filter, columns); } /* @@ -138,7 +138,7 @@ public List LoadData(HTableInterface hTable, Object rowKey, Filter fi * .HTable, org.apache.hadoop.hbase.filter.Filter, byte[], byte[]) */ @Override - public List loadAll(HTableInterface hTable, Filter filter, byte[] startRow, byte[] endRow, + public List loadAll(Table hTable, Filter filter, byte[] startRow, byte[] endRow, String columnFamily, String qualifier, String[] columns) throws IOException { List results = null; @@ -214,8 +214,7 @@ else if (columnFamily != null) * column family. * @param results * results. - * @param scanner - * result scanner. + * @return collection of scanned results. * @throws IOException */ @@ -249,7 +248,7 @@ private List scanResults(final String columnFamily, List r } @Override - public Object[] scanRowKeys(final HTableInterface hTable, final Filter filter, final String columnFamilyName, + public Object[] scanRowKeys(final Table hTable, final Filter filter, final String columnFamilyName, final String columnName, final Class rowKeyClazz) throws IOException { List rowKeys = new ArrayList(); @@ -280,7 +279,7 @@ public Object[] scanRowKeys(final HTableInterface hTable, final Filter filter, f return null; } - public List loadAll(final HTableInterface hTable, final List rows, final String columnFamily, + public List loadAll(final Table hTable, final List rows, final String columnFamily, final String[] columns) throws IOException { List results = null; diff --git a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseWriter.java b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseWriter.java index dc09738f0..c8da72a34 100644 --- a/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseWriter.java +++ b/src/kundera-hbase/kundera-hbase/src/main/java/com/impetus/client/hbase/service/HBaseWriter.java @@ -26,9 +26,12 @@ import javax.persistence.metamodel.Attribute; import javax.persistence.metamodel.SingularAttribute; +import com.impetus.client.hbase.BatchPutRequest; +import com.impetus.client.hbase.RequestExecutor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -56,6 +59,13 @@ public class HBaseWriter implements Writer /** the log used by this class. */ private static Logger log = LoggerFactory.getLogger(HBaseWriter.class); + private final RequestExecutor executor; + + public HBaseWriter(RequestExecutor executor) { + this.executor = executor; + } + + /* * (non-Javadoc) * @@ -65,13 +75,11 @@ public class HBaseWriter implements Writer * java.lang.Object) */ @Override - public void writeColumns(HTableInterface htable, String columnFamily, Object rowKey, + public void writeColumns(Table table, String columnFamily, Object rowKey, Map columns, - Map values, Object columnFamilyObj) throws IOException { - Put p = preparePut(columnFamily, rowKey, columns, values); - htable.put(p); + table.put(preparePut(columnFamily, rowKey, columns, values)); } /* @@ -83,14 +91,14 @@ public void writeColumns(HTableInterface htable, String columnFamily, Object row * javax.persistence.metamodel.Attribute, java.lang.Object) */ @Override - public void writeColumn(HTableInterface htable, String columnFamily, Object rowKey, Attribute column, + public void writeColumn(Table table, String columnFamily, Object rowKey, Attribute column, Object columnObj) throws IOException { Put p = new Put(HBaseUtils.getBytes(rowKey)); - p.add(Bytes.toBytes(columnFamily), Bytes.toBytes(((AbstractAttribute) column).getJPAColumnName()), + p.addColumn(Bytes.toBytes(columnFamily), + Bytes.toBytes(((AbstractAttribute) column).getJPAColumnName()), Bytes.toBytes(columnObj.toString())); - - htable.put(p); + table.put(p); } /* @@ -101,27 +109,23 @@ public void writeColumn(HTableInterface htable, String columnFamily, Object rowK * client.HTable, java.lang.Object, java.util.Set, java.lang.Object) */ @Override - public void writeColumns(HTableInterface htable, Object rowKey, Map columns, Object entity, + public void writeColumns(Table table, Object rowKey, Map columns, Object entity, String columnFamilyName) throws IOException { Put p = new Put(HBaseUtils.getBytes(rowKey)); - - boolean present = false; for (String columnName : columns.keySet()) { Attribute column = columns.get(columnName); if (!column.isCollection() && !((SingularAttribute) column).isId()) { - String qualifier = columnName; try { - byte[] qualValInBytes = Bytes.toBytes(qualifier); + byte[] qualValInBytes = Bytes.toBytes(columnName); Object value = PropertyAccessorHelper.getObject(entity, (Field) column.getJavaMember()); if (value != null) { - p.add(columnFamilyName.getBytes(), qualValInBytes, System.currentTimeMillis(), + p.addColumn(columnFamilyName.getBytes(), qualValInBytes, System.currentTimeMillis(), HBaseUtils.getBytes(value)); - present = true; } } catch (PropertyAccessException e1) @@ -130,10 +134,10 @@ public void writeColumns(HTableInterface htable, Object rowKey, Map columns, String columnFamilyName) + public void writeColumns(Table table, Object rowKey, Map columns, String columnFamilyName) throws IOException { - Put p = new Put(HBaseUtils.getBytes(rowKey)); - - boolean isPresent = false; for (String columnName : columns.keySet()) { - p.add(columnFamilyName.getBytes(), Bytes.toBytes(columnName), HBaseUtils.getBytes(columns.get(columnName))); - isPresent = true; + p.addColumn(columnFamilyName.getBytes(), Bytes.toBytes(columnName), HBaseUtils.getBytes(columns.get(columnName))); } - - if (isPresent) - { - htable.put(p); + if (p.isEmpty()) { + return; } + table.put(p); } /* @@ -171,35 +170,30 @@ public void writeColumns(HTableInterface htable, Object rowKey, Map relations, String columnFamilyName) throws IOException { Put p = new Put(HBaseUtils.getBytes(rowKey)); - - boolean isPresent = false; for (RelationHolder r : relations) { if (r != null) { if (containsEmbeddedObjectsOnly) { - p.add(Bytes.toBytes(r.getRelationName()), Bytes.toBytes(r.getRelationName()), + p.addColumn(Bytes.toBytes(r.getRelationName()), Bytes.toBytes(r.getRelationName()), PropertyAccessorHelper.getBytes(r.getRelationValue())); - isPresent = true; } else { - p.add(columnFamilyName.getBytes(), Bytes.toBytes(r.getRelationName()), System.currentTimeMillis(), + p.addColumn(columnFamilyName.getBytes(), Bytes.toBytes(r.getRelationName()), System.currentTimeMillis(), PropertyAccessorHelper.getBytes(r.getRelationValue())); - isPresent = true; } } } - - if (isPresent) - { - htable.put(p); + if (p.isEmpty()) { + return; } + table.put(p); } // TODO: Scope of performance improvement in this code @@ -211,17 +205,14 @@ public void writeRelations(HTableInterface htable, Object rowKey, boolean contai * .client.HTable, java.lang.String, java.util.Map) */ @Override - public void writeForeignKeys(HTableInterface hTable, String rowKey, Map> foreignKeyMap) + public void writeForeignKeys(Table table, String rowKey, Map> foreignKeyMap) throws IOException { Put p = new Put(Bytes.toBytes(rowKey)); // Checking if foreign key column family exists Get g = new Get(Bytes.toBytes(rowKey)); - Result r = hTable.get(g); - - boolean isPresent = false; - + Result r = table.get(g); for (Map.Entry> entry : foreignKeyMap.entrySet()) { String property = entry.getKey(); // Foreign key name @@ -236,28 +227,26 @@ public void writeForeignKeys(HTableInterface hTable, String rowKey, Map> rows) throws IOException + public void persistRows(Map> rows) throws IOException { - List dataSet = new ArrayList(rows.size()); - for (HTableInterface hTable : rows.keySet()) + final List dataSet = new ArrayList<>(rows.size()); + for (TableName tableName : rows.keySet()) { - List row = rows.get(hTable); + List row = rows.get(tableName); for (HBaseDataWrapper data : row) { dataSet.add(preparePut(data.getColumnFamily(), data.getRowKey(), data.getColumns(), data.getValues())); } - hTable.put(dataSet); + executor.execute(new BatchPutRequest(tableName, dataSet)); dataSet.clear(); } } @@ -318,8 +307,6 @@ public void persistRows(Map> rows) throw * @param columns * the columns * @param values TODO - * @param columnFamilyObj - * the column family obj * @return the put * @throws IOException * Signals that an I/O exception has occurred. @@ -333,14 +320,13 @@ private Put preparePut(String columnFamily, Object rowKey, Map