diff --git a/happybase/__init__.py b/happybase/__init__.py index 11e350d..91c40eb 100644 --- a/happybase/__init__.py +++ b/happybase/__init__.py @@ -8,6 +8,7 @@ from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection from .table import Table from .batch import Batch +from .counter_batch import CounterBatch from .pool import ConnectionPool, NoConnectionsAvailable # TODO: properly handle errors defined in Thrift specification diff --git a/happybase/counter_batch.py b/happybase/counter_batch.py new file mode 100644 index 0000000..fca2a2d --- /dev/null +++ b/happybase/counter_batch.py @@ -0,0 +1,48 @@ +from happybase.hbase.ttypes import TIncrement +from collections import defaultdict + + +class CounterBatch(object): + def __init__(self, table, batch_size=None): + self.table = table + self.batch_size = batch_size + self.batch = defaultdict(int) + self.batch_count = 0 + + def counter_inc(self, row, column, value=1): + self.batch[(row, column)] += value + self.batch_count += 1 + self._check_send() + + def counter_dec(self, row, column, value=1): + self.counter_inc(row, column, -value) + + def send(self): + increment_rows = [ + TIncrement(table=self.table.name, row=key[0], column=key[1], ammount=value) + for key, value in self.batch.iteritems() + ] + self.table.connection.client.incrementRows(increment_rows) + self.batch.clear() + self.batch_count = 0 + + def _check_send(self): + if self.batch_size and (self.batch_count >= self.batch_size): + self.send() + + # + # Context manager methods + # + + def __enter__(self): + """Called upon entering a ``with`` block""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Called upon exiting a ``with`` block""" + # TODO: Examine the exception and decide whether or not to send + # For now we always send + if exc_type is not None: + pass + + self.send() diff --git a/happybase/table.py b/happybase/table.py index 3cb26b7..55817f7 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -10,6 +10,7 @@ from .hbase.ttypes import TScan from .util import thrift_type_to_dict, str_increment, OrderedDict from .batch import Batch +from .counter_batch import CounterBatch logger = logging.getLogger(__name__) @@ -215,7 +216,7 @@ def cells(self, row, column, versions=None, timestamp=None, def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, include_timestamp=False, batch_size=1000, scan_batching=None, - limit=None, sorted_columns=False): + limit=None, sorted_columns=False, reversed=False): """Create a scanner for data in the table. This method returns an iterable that can be used for looping over the @@ -270,6 +271,9 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, * The `sorted_columns` argument is only available when using HBase 0.96 (or up). + * The `reversed` option is only available when using HBase 0.98 + (or up). + .. versionadded:: 0.8 `sorted_columns` argument @@ -287,6 +291,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, :param bool scan_batching: server-side scan batching (optional) :param int limit: max number of rows to return :param bool sorted_columns: whether to return sorted columns + :param bool reversed: whether or not to reverse the row ordering :return: generator yielding the rows matching the scan :rtype: iterable of `(row_key, row_data)` tuples @@ -369,6 +374,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, filterString=filter, batchSize=scan_batching, sortColumns=sorted_columns, + reversed=reversed, ) scan_id = self.connection.client.scannerOpenWithScan( self.name, scan, {}) @@ -497,6 +503,23 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, del kwargs['self'] return Batch(table=self, **kwargs) + def counter_batch(self, batch_size=None): + """Create a new batch of counter operation for this table. + + This method returns a new :py:class:`CounterBatch` instance that can be used + for mass counter manipulation. + + If given, the `batch_size` argument specifies the maximum batch size + after which the batch should send the mutations to the server. By + default this is unbounded. + + :param int batch_size: batch size (optional) + + :return: CounterBatch instance + :rtype: :py:class:`CounterBatch` + """ + return CounterBatch(table=self, batch_size=batch_size) + # # Atomic counters #