diff --git a/happybase/batch.py b/happybase/batch.py index eec63e7..d7f8653 100644 --- a/happybase/batch.py +++ b/happybase/batch.py @@ -57,10 +57,10 @@ def send(self): logger.debug("Sending batch for '%s' (%d mutations on %d rows)", self._table.name, self._mutation_count, len(bms)) if self._timestamp is None: - self._table.connection.client.mutateRows(self._table.name, bms, {}) + self._table.connection.client.mutateRows(self._table.name, bms, {}, no_retry=True) else: self._table.connection.client.mutateRowsTs( - self._table.name, bms, self._timestamp, {}) + self._table.name, bms, self._timestamp, {}, no_retry=True) self._reset_mutations() diff --git a/happybase/client.py b/happybase/client.py new file mode 100644 index 0000000..3729361 --- /dev/null +++ b/happybase/client.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +from thriftpy.thrift import TClient +from thriftpy.thrift import TApplicationException +from thriftpy.transport import TTransportException +from socket import error as socket_error +from collections import deque +from time import sleep +import logging + + +logger = logging.getLogger(__name__) + + +class RecoveringClient(TClient): + def __init__(self, *args, **kwargs): + self._connection = kwargs.pop("connection", None) + self._retries = kwargs.pop("retries", (0, 5, 30)) + super(RecoveringClient, self).__init__(*args, **kwargs) + + def _req(self, _api, *args, **kwargs): + no_retry = kwargs.pop("no_retry", False) + retries = deque(self._retries) + interval = 0 + client = super(RecoveringClient, self) + while True: + try: + return client._req(_api, *args, **kwargs) + except (TApplicationException, socket_error, TTransportException) as exc: + logger.exception("Got exception") + while True: + interval = retries.popleft() if retries else interval + logger.info("Sleeping for %d seconds", interval) + sleep(interval) + logger.info("Trying to reconnect") + try: + self._connection._refresh_thrift_client() + self._connection.open() + client = super(RecoveringClient, self._connection.client) + logger.debug("New client is initialized") + except TTransportException: + logger.exception("Got exception, while trying to reconnect. Continuing") + pass + else: + break + if no_retry: + raise exc \ No newline at end of file diff --git a/happybase/connection.py b/happybase/connection.py index 36c6735..7ec5bf0 100644 --- a/happybase/connection.py +++ b/happybase/connection.py @@ -7,12 +7,12 @@ import logging import six -from thriftpy.thrift import TClient from thriftpy.transport import TBufferedTransport, TFramedTransport, TSocket from thriftpy.protocol import TBinaryProtocol, TCompactProtocol from Hbase_thrift import Hbase, ColumnDescriptor +from .client import RecoveringClient from .table import Table from .util import ensure_bytes, pep8_to_camel_case @@ -157,7 +157,7 @@ def _refresh_thrift_client(self): self.transport = self._transport_class(socket) protocol = self._protocol_class(self.transport, decode_response=False) - self.client = TClient(Hbase, protocol) + self.client = RecoveringClient(Hbase, protocol, connection=self) def _table_name(self, name): """Construct a table name by optionally adding a table name prefix.""" @@ -308,7 +308,7 @@ def create_table(self, name, families): column_descriptors.append(ColumnDescriptor(**kwargs)) - self.client.createTable(name, column_descriptors) + self.client.createTable(name, column_descriptors, no_retry=True) def delete_table(self, name, disable=False): """Delete the specified table. @@ -327,7 +327,7 @@ def delete_table(self, name, disable=False): self.disable_table(name) name = self._table_name(name) - self.client.deleteTable(name) + self.client.deleteTable(name, no_retry=True) def enable_table(self, name): """Enable the specified table. @@ -335,7 +335,7 @@ def enable_table(self, name): :param str name: The table name """ name = self._table_name(name) - self.client.enableTable(name) + self.client.enableTable(name, no_retry=True) def disable_table(self, name): """Disable the specified table. @@ -343,7 +343,7 @@ def disable_table(self, name): :param str name: The table name """ name = self._table_name(name) - self.client.disableTable(name) + self.client.disableTable(name, no_retry=True) def is_table_enabled(self, name): """Return whether the specified table is enabled. @@ -364,6 +364,6 @@ def compact_table(self, name, major=False): """ name = self._table_name(name) if major: - self.client.majorCompact(name) + self.client.majorCompact(name, no_retry=True) else: - self.client.compact(name) + self.client.compact(name, no_retry=True) diff --git a/happybase/table.py b/happybase/table.py index c982ef8..599be94 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -563,7 +563,7 @@ def counter_inc(self, row, column, value=1): :rtype: int """ return self.connection.client.atomicIncrement( - self.name, row, column, value) + self.name, row, column, value, no_retry=True) def counter_dec(self, row, column, value=1): """Atomically decrement (or increments) a counter column.