Skip to content

Commit 06d9b6a

Browse files
committed
Merging branch origin/v111. Following changes:
* Added `increment` methods * New constructors for `AbstractHBDAO` that take `org.apache.hadoop.hbase.client.Connection`
1 parent 8f2a4a8 commit 06d9b6a

File tree

8 files changed

+216
-57
lines changed

8 files changed

+216
-57
lines changed

README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public class Citizen implements HBRecord<String> {
3131

3232
@HBColumn(family = "optional", column = "salary")
3333
private Integer sal;
34+
35+
@HBColumn(family = "optional", column = "counter")
36+
private Long counter;
3437

3538
@HBColumn(family = "optional", column = "custom_details")
3639
private Map<String, Integer> customDetails;
@@ -195,7 +198,10 @@ Once defined, you can access, manipulate and persist a row of `citizens` HBase t
195198
Configuration configuration = getConf(); // this is org.apache.hadoop.conf.Configuration
196199

197200
// Create a data access object:
198-
CitizenDAO citizenDao = new CitizenDAO(configuration);
201+
CitizenDAO citizenDao = new CitizenDAO(configuration); // alternatively, you can pass HBase client's Connection to your constructor
202+
203+
// Create new record:
204+
String rowKey = citizenDao.persist(new Citizen("IND", 1, /* more params */)); // Here, output of 'persist' is a String, because Citizen class implements HBRecord<String>
199205

200206
// Fetch a row from "citizens" HBase table with row key "IND#1":
201207
Citizen pe = citizenDao.get("IND#1");
@@ -229,6 +235,8 @@ counterDAO.getOnGets(get1);
229235
Get get2 = citizenDao.getGet("IND#2").setTimeRange(1, 5).setMaxVersions(2); // Advanced HBase row fetch
230236
counterDAO.getOnGets(get2);
231237

238+
citizenDao.increment("IND#2", "counter", 3L); // Increment value of counter by 3
239+
232240
citizenDao.getHBaseTable() // returns HTable instance (in case you want to directly play around)
233241

234242
```
@@ -251,7 +259,7 @@ Add below entry within the `dependencies` section of your `pom.xml`:
251259
<dependency>
252260
<groupId>com.flipkart</groupId>
253261
<artifactId>hbase-object-mapper</artifactId>
254-
<version>1.10</version>
262+
<version>1.11</version>
255263
</dependency>
256264
```
257265
See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.flipkart%22%20AND%20a%3A%22hbase-object-mapper%22) or
@@ -260,7 +268,7 @@ See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](ht
260268
To build this project, follow below simple steps:
261269

262270
1. Do a `git clone` of this repository
263-
2. Checkout latest stable version `git checkout v1.10`
271+
2. Checkout latest stable version `git checkout v1.11`
264272
3. Execute `mvn clean install` from shell
265273

266274
### Please note:

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<modelVersion>4.0.0</modelVersion>
1313
<groupId>com.flipkart</groupId>
1414
<artifactId>hbase-object-mapper</artifactId>
15-
<version>1.10</version>
15+
<version>1.11</version>
1616
<url>https://flipkart-incubator.github.io/hbase-orm/</url>
1717
<scm>
1818
<url>https://github.com/flipkart-incubator/hbase-orm/</url>
@@ -64,7 +64,7 @@
6464
<dependency>
6565
<groupId>com.fasterxml.jackson.core</groupId>
6666
<artifactId>jackson-databind</artifactId>
67-
<version>2.8.7</version>
67+
<version>2.8.11.3</version>
6868
</dependency>
6969
<!-- test dependencies -->
7070
<dependency>

src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java

Lines changed: 133 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package com.flipkart.hbaseobjectmapper;
22

33
import com.flipkart.hbaseobjectmapper.codec.Codec;
4-
import com.flipkart.hbaseobjectmapper.exceptions.FieldNotMappedToHBaseColumnException;
54
import com.google.common.reflect.TypeToken;
65
import org.apache.hadoop.conf.Configuration;
76
import org.apache.hadoop.hbase.Cell;
87
import org.apache.hadoop.hbase.CellUtil;
98
import org.apache.hadoop.hbase.TableName;
109
import org.apache.hadoop.hbase.client.*;
11-
import org.apache.hadoop.hbase.util.Bytes;
1210

1311
import java.io.Closeable;
1412
import java.io.IOException;
@@ -27,50 +25,34 @@
2725
*
2826
* @param <R> Data type of row key (must be '{@link Comparable} with itself' and must be {@link Serializable})
2927
* @param <T> Entity type that maps to an HBase row (this type must have implemented {@link HBRecord} interface)
28+
* @see <a href="https://en.wikipedia.org/wiki/Data_access_object">Data access object</a>
3029
* @see Connection#getTable(TableName)
3130
* @see Table
3231
* @see HTable
33-
* @see <a href="https://en.wikipedia.org/wiki/Data_access_object">Data access object</a>
3432
*/
3533
@SuppressWarnings("WeakerAccess")
3634
public abstract class AbstractHBDAO<R extends Serializable & Comparable<R>, T extends HBRecord<R>> implements Closeable {
3735

3836
protected final HBObjectMapper hbObjectMapper;
39-
protected final Connection connection;
4037
protected final Table table;
4138
protected final Class<R> rowKeyClass;
4239
protected final Class<T> hbRecordClass;
4340
protected final WrappedHBTable<R, T> hbTable;
4441
private final Map<String, Field> fields;
4542

4643
/**
47-
* Constructs a data access object using a custom codec. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>.
48-
* <p>
49-
* <b>Note: </b>If you want to use the default codec, just use the constructor {@link #AbstractHBDAO(Configuration)}
50-
*
51-
* @param configuration Hadoop configuration
52-
* @param codec Your custom codec. If <code>null</code>, default codec is used.
53-
* @throws IOException Exceptions thrown by HBase
54-
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
55-
*/
56-
@SuppressWarnings("unchecked")
57-
protected AbstractHBDAO(Configuration configuration, Codec codec) throws IOException {
58-
this(configuration, HBObjectMapperFactory.construct(codec));
59-
}
60-
61-
/**
62-
* Constructs a data access object using your custom {@link HBObjectMapper}. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>.
44+
* Constructs a data access object using your custom {@link HBObjectMapper}
6345
* <p>
6446
* <br>
6547
* <b>Note: </b>If you want to use the default {@link HBObjectMapper}, just use the constructor {@link #AbstractHBDAO(Configuration)}
6648
*
67-
* @param configuration Hadoop configuration
49+
* @param connection HBase Connection
6850
* @param hbObjectMapper Your custom {@link HBObjectMapper}
6951
* @throws IOException Exceptions thrown by HBase
7052
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
7153
*/
7254
@SuppressWarnings("unchecked")
73-
protected AbstractHBDAO(Configuration configuration, HBObjectMapper hbObjectMapper) throws IOException {
55+
protected AbstractHBDAO(Connection connection, HBObjectMapper hbObjectMapper) throws IOException {
7456
this.hbObjectMapper = hbObjectMapper;
7557
hbRecordClass = (Class<T>) new TypeToken<T>(getClass()) {
7658
}.getRawType();
@@ -81,14 +63,71 @@ protected AbstractHBDAO(Configuration configuration, HBObjectMapper hbObjectMapp
8163
throw new IllegalStateException(String.format("Unable to resolve HBase record/rowkey type (record class is resolving to %s and rowkey class is resolving to %s)", hbRecordClass, rowKeyClass));
8264
}
8365
hbTable = new WrappedHBTable<>(hbRecordClass);
84-
connection = ConnectionFactory.createConnection(configuration);
8566
table = connection.getTable(hbTable.getName());
8667
fields = hbObjectMapper.getHBColumnFields0(hbRecordClass);
8768
}
8869

8970

9071
/**
91-
* Constructs a data access object. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>.
72+
* Constructs a data access object using your custom {@link HBObjectMapper}
73+
* <p>
74+
* <b>Note: </b>If you want to use the default {@link HBObjectMapper}, just use the constructor {@link #AbstractHBDAO(Configuration)}
75+
*
76+
* @param configuration Hadoop configuration
77+
* @param hbObjectMapper Your custom {@link HBObjectMapper}
78+
* @throws IOException Exceptions thrown by HBase
79+
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
80+
*/
81+
@SuppressWarnings("unchecked")
82+
protected AbstractHBDAO(Configuration configuration, HBObjectMapper hbObjectMapper) throws IOException {
83+
this(ConnectionFactory.createConnection(configuration), hbObjectMapper);
84+
}
85+
86+
87+
/**
88+
* Constructs a data access object using your custom codec
89+
* <p>
90+
* <b>Note: </b>If you want to use the default codec, just use the constructor {@link #AbstractHBDAO(Connection)}
91+
*
92+
* @param connection HBase Connection
93+
* @param codec Your custom codec. If <code>null</code>, default codec is used.
94+
* @throws IOException Exceptions thrown by HBase
95+
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
96+
*/
97+
@SuppressWarnings("unchecked")
98+
protected AbstractHBDAO(Connection connection, Codec codec) throws IOException {
99+
this(connection, HBObjectMapperFactory.construct(codec));
100+
}
101+
102+
/**
103+
* Constructs a data access object using your custom codec
104+
* <p>
105+
* <b>Note: </b>If you want to use the default codec, just use the constructor {@link #AbstractHBDAO(Configuration)}
106+
*
107+
* @param configuration Hadoop configuration
108+
* @param codec Your custom codec. If <code>null</code>, default codec is used.
109+
* @throws IOException Exceptions thrown by HBase
110+
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
111+
*/
112+
@SuppressWarnings("unchecked")
113+
protected AbstractHBDAO(Configuration configuration, Codec codec) throws IOException {
114+
this(ConnectionFactory.createConnection(configuration), HBObjectMapperFactory.construct(codec));
115+
}
116+
117+
/**
118+
* Constructs a data access object
119+
*
120+
* @param connection HBase Connection
121+
* @throws IOException Exceptions thrown by HBase
122+
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
123+
*/
124+
@SuppressWarnings("unchecked")
125+
protected AbstractHBDAO(Connection connection) throws IOException {
126+
this(connection, (Codec) null);
127+
}
128+
129+
/**
130+
* Constructs a data access object
92131
*
93132
* @param configuration Hadoop configuration
94133
* @throws IOException Exceptions thrown by HBase
@@ -247,6 +286,70 @@ public List<T> get(R startRowKey, R endRowKey, int numVersionsToFetch) throws IO
247286
return records;
248287
}
249288

289+
private WrappedHBColumn validateAndGetLongColumn(String fieldName) {
290+
Field field = getField(fieldName);
291+
if (!Long.class.equals(field.getType())) {
292+
throw new IllegalArgumentException(String.format("Invalid attempt to increment a non-Long field (%s.%s)", hbRecordClass.getName(), fieldName));
293+
}
294+
return new WrappedHBColumn(field, true);
295+
}
296+
297+
/**
298+
* Increments field by specified amount
299+
*
300+
* @param rowKey Row key of the record whose column needs to be incremented
301+
* @param fieldName Field that needs to be incremented (this must be of {@link Long} type)
302+
* @param amount Amount by which the HBase column needs to be incremented
303+
* @return The new value, post increment
304+
* @throws IOException When HBase call fails
305+
*/
306+
public long increment(R rowKey, String fieldName, long amount) throws IOException {
307+
WrappedHBColumn hbColumn = validateAndGetLongColumn(fieldName);
308+
return table.incrementColumnValue(toBytes(rowKey), hbColumn.familyBytes(), hbColumn.columnBytes(), amount);
309+
}
310+
311+
/**
312+
* Increments field by specified amount
313+
*
314+
* @param rowKey Row key of the record whose column needs to be incremented
315+
* @param fieldName Field that needs to be incremented (this must be of {@link Long} type)
316+
* @param amount Amount by which the HBase column needs to be incremented
317+
* @param durability The persistence guarantee for this increment (see {@link Durability})
318+
* @return The new value, post increment
319+
* @throws IOException When HBase call fails
320+
*/
321+
public long increment(R rowKey, String fieldName, long amount, Durability durability) throws IOException {
322+
WrappedHBColumn hbColumn = validateAndGetLongColumn(fieldName);
323+
return table.incrementColumnValue(toBytes(rowKey), hbColumn.familyBytes(), hbColumn.columnBytes(), amount, durability);
324+
}
325+
326+
/**
327+
* Gets (native) {@link Increment} object for given row key, to be later used in {@link #increment(Increment)} method.
328+
*
329+
* @param rowKey HBase row key
330+
* @return Increment object
331+
*/
332+
public Increment getIncrement(R rowKey) {
333+
return new Increment(toBytes(rowKey));
334+
}
335+
336+
/**
337+
* Performs HBase {@link Table#increment} on the given {@link Increment} object <br>
338+
* <br>
339+
* <b>Note</b>: <ul>
340+
* <li>You may construct {@link Increment} object using the {@link #getIncrement(Serializable) getIncrement} method</li>
341+
* <li>Unlike the {@link #increment(Serializable, String, long)} methods, this method skips some validations (hence, be cautious)</li>
342+
* </ul>
343+
*
344+
* @param increment HBase Increment object
345+
* @return <b>Partial object</b> containing (only) values that were incremented
346+
* @throws IOException When HBase call fails
347+
*/
348+
public T increment(Increment increment) throws IOException {
349+
Result result = table.increment(increment);
350+
return hbObjectMapper.readValue(result, hbRecordClass);
351+
}
352+
250353
/**
251354
* Get specified number of versions of rows from HBase table by a range of row keys (start to end)
252355
*
@@ -389,8 +492,8 @@ private void populateFieldValuesToMap(Field field, Result result, Map<R, Navigab
389492
if (result.isEmpty()) {
390493
return;
391494
}
392-
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
393-
List<Cell> cells = result.getColumnCells(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
495+
WrappedHBColumn hbColumn = new WrappedHBColumn(field, true);
496+
List<Cell> cells = result.getColumnCells(hbColumn.familyBytes(), hbColumn.columnBytes());
394497
for (Cell cell : cells) {
395498
Type fieldType = hbObjectMapper.getFieldType(field, hbColumn.isMultiVersioned());
396499
@SuppressWarnings("unchecked") final R rowKey = hbObjectMapper.bytesToRowKey(CellUtil.cloneRow(cell), hbTable.getCodecFlags(), (Class<T>) field.getDeclaringClass());
@@ -466,10 +569,9 @@ private Map<R, Object> toSingleVersioned(Map<R, NavigableMap<Long, Object>> mult
466569
*/
467570
public NavigableMap<R, NavigableMap<Long, Object>> fetchFieldValues(R startRowKey, R endRowKey, String fieldName, int numVersionsToFetch) throws IOException {
468571
Field field = getField(fieldName);
469-
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
470-
validateFetchInput(field, hbColumn);
572+
WrappedHBColumn hbColumn = new WrappedHBColumn(field, true);
471573
Scan scan = new Scan(toBytes(startRowKey), toBytes(endRowKey));
472-
scan.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
574+
scan.addColumn(hbColumn.familyBytes(), hbColumn.columnBytes());
473575
scan.setMaxVersions(numVersionsToFetch);
474576
ResultScanner scanner = table.getScanner(scan);
475577
NavigableMap<R, NavigableMap<Long, Object>> map = new TreeMap<>();
@@ -503,13 +605,12 @@ public Map<R, Object> fetchFieldValues(R[] rowKeys, String fieldName) throws IOE
503605
*/
504606
public Map<R, NavigableMap<Long, Object>> fetchFieldValues(R[] rowKeys, String fieldName, int numVersionsToFetch) throws IOException {
505607
Field field = getField(fieldName);
506-
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
507-
validateFetchInput(field, hbColumn);
608+
WrappedHBColumn hbColumn = new WrappedHBColumn(field, true);
508609
List<Get> gets = new ArrayList<>(rowKeys.length);
509610
for (R rowKey : rowKeys) {
510611
Get get = new Get(toBytes(rowKey));
511612
get.setMaxVersions(numVersionsToFetch);
512-
get.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
613+
get.addColumn(hbColumn.familyBytes(), hbColumn.columnBytes());
513614
gets.add(get);
514615
}
515616
Result[] results = this.table.get(gets);
@@ -524,12 +625,6 @@ private byte[] toBytes(R rowKey) {
524625
return hbObjectMapper.rowKeyToBytes(rowKey, hbTable.getCodecFlags());
525626
}
526627

527-
private void validateFetchInput(Field field, WrappedHBColumn hbColumn) {
528-
if (!hbColumn.isPresent()) {
529-
throw new FieldNotMappedToHBaseColumnException(hbRecordClass, field.getName());
530-
}
531-
}
532-
533628
@Override
534629
public void close() throws IOException {
535630
table.close();

src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ record = clazz.newInstance();
9797
}
9898
for (Field field : fields) {
9999
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
100-
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(Bytes.toBytes(hbColumn.family()));
100+
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(hbColumn.familyBytes());
101101
if (familyMap == null || familyMap.isEmpty())
102102
continue;
103-
NavigableMap<Long, byte[]> columnVersionsMap = familyMap.get(Bytes.toBytes(hbColumn.column()));
103+
NavigableMap<Long, byte[]> columnVersionsMap = familyMap.get(hbColumn.columnBytes());
104104
if (hbColumn.isSingleVersioned()) {
105105
if (columnVersionsMap == null || columnVersionsMap.isEmpty())
106106
continue;
@@ -256,7 +256,7 @@ private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> Navigabl
256256
for (Field field : fields) {
257257
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
258258
if (hbColumn.isSingleVersioned()) {
259-
byte[] familyName = Bytes.toBytes(hbColumn.family()), columnName = Bytes.toBytes(hbColumn.column());
259+
byte[] familyName = hbColumn.familyBytes(), columnName = hbColumn.columnBytes();
260260
if (!map.containsKey(familyName)) {
261261
map.put(familyName, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
262262
}
@@ -273,7 +273,7 @@ private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> Navigabl
273273
NavigableMap<Long, byte[]> fieldValueVersions = getFieldValuesAsNavigableMapOfBytes(record, field, hbColumn.codecFlags());
274274
if (fieldValueVersions == null)
275275
continue;
276-
byte[] familyName = Bytes.toBytes(hbColumn.family()), columnName = Bytes.toBytes(hbColumn.column());
276+
byte[] familyName = hbColumn.familyBytes(), columnName = hbColumn.columnBytes();
277277
if (!map.containsKey(familyName)) {
278278
map.put(familyName, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
279279
}

0 commit comments

Comments
 (0)