diff --git a/redisdl.py b/redisdl.py index 4ffd360..726f2f1 100755 --- a/redisdl.py +++ b/redisdl.py @@ -141,18 +141,18 @@ def dumps(host='localhost', port=6379, password=None, db=0, pretty=False, class BytesWriteWrapper(object): def __init__(self, stream): self.stream = stream - + def write(self, str): return self.stream.write(str.encode()) def dump(fp, host='localhost', port=6379, password=None, db=0, pretty=False, unix_socket_path=None, encoding='utf-8', keys='*'): - + try: fp.write('') except TypeError: fp = BytesWriteWrapper(fp) - + if pretty: # hack to avoid implementing pretty printing fp.write(dumps(host=host, port=port, password=password, db=db, @@ -276,28 +276,76 @@ def _read_key(key, r, pretty, encoding): return (type, ttl, value) def _reader(r, pretty, encoding, keys='*'): - for encoded_key in r.keys(keys): - key = encoded_key.decode(encoding) - handled = False - for i in range(10): - try: - type, ttl, value = _read_key(encoded_key, r, pretty, encoding) - yield key, type, ttl, value - handled = True - break - except KeyDeletedError: - # do not dump the key - handled = True - break - except redis.WatchError: - # same logic as key type changed - pass - except KeyTypeChangedError: - # retry reading type again - pass - if not handled: - # ran out of retries - raise ConcurrentModificationError('Key %s is being concurrently modified' % key) + encoded_keys = r.keys(keys) + i = 0 + while i < len(encoded_keys): + for key, type, ttl, value in _read_keys(r, encoded_keys[i:i+10000], + pretty=pretty, encoding=encoding): + yield key, type, ttl, value + i += 10000 + +def _read_keys(r, encoded_keys, pretty, encoding): + decoded_keys = [encoded_key.decode() for encoded_key in encoded_keys] + do_keys = decoded_keys + retries = 5 + type_results = None + while len(do_keys) > 0 and retries > 0: + next_do_keys = [] + next_type_results = [] + + if type_results is None: + # first pass, need to get the types. + # on subsequent passes we know the types + # because the previous pass retrieved them and + # found a type mismatch + p = r.pipeline() + for key in do_keys: + p.type(key) + type_results = p.execute() + + p = r.pipeline() + for i in range(len(do_keys)): + key = decoded_keys[i] + type = type_results[i].decode('ascii') + if type == 'none': + # key was deleted by a concurrent operation on the data store. + # issue noops so that the number of results does not change + p.type(key) + p.type(key) + p.type(key) + continue + reader = readers.get(type) + if reader is None: + raise UnknownTypeError("Unknown key type: %s" % type) + reader.send_command(p, key) + r.pttl_or_ttl_pipeline(p, key) + p.type(key) + results = p.execute() + + for i in range(len(do_keys)): + key = decoded_keys[i] + original_type = type_results[i] + if original_type == 'none': + # this is where we actually skip a key that was deleted + # by concurrent operations + continue + final_type = results[i*3+2].decode('ascii') + if original_type != final_type: + # type changed, will retry + next_do_keys.append(key) + # need to update expected type + next_type_results.append(final_type) + continue + reader = readers.get(original_type) + value = reader.handle_response(results[i*3], pretty, encoding) + ttl = r.decode_pttl_or_ttl_pipeline_value(results[i*3+1]) + yield key, final_type, ttl, value + retries -= 1 + do_keys = next_do_keys + type_results = next_type_results + + if len(do_keys) > 0: + raise ConcurrentModificationError('Keys %s are being concurrently modified' % ', '.join(do_keys)) def _empty(r): for key in r.keys(): @@ -372,14 +420,14 @@ def ijson_top_level_items(file, local_streaming_backend): class TextReadWrapper(object): def __init__(self, fp): self.fp = fp - + def read(self, *args, **kwargs): return self.fp.read(*args, **kwargs).decode() class BytesReadWrapper(object): def __init__(self, fp): self.fp = fp - + def read(self, *args, **kwargs): return self.fp.read(*args, **kwargs).encode('utf-8')