Skip to content

Commit 1d81f96

Browse files
committed
feat(core): Add create2 support
1 parent 5764da0 commit 1d81f96

File tree

3 files changed

+78
-13
lines changed

3 files changed

+78
-13
lines changed

kazoo/client.py

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
CheckVersion,
3232
CloseInstance,
3333
Create,
34+
Create2,
3435
Delete,
3536
Exists,
3637
GetChildren,
@@ -68,8 +69,11 @@
6869
string_types = six.string_types
6970
bytes_types = (six.binary_type,)
7071

71-
CLOSED_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED,
72-
KeeperState.CLOSED)
72+
CLOSED_STATES = (
73+
KeeperState.EXPIRED_SESSION,
74+
KeeperState.AUTH_FAILED,
75+
KeeperState.CLOSED
76+
)
7377
ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL)
7478
ENVI_VERSION_KEY = 'zookeeper.version'
7579
log = logging.getLogger(__name__)
@@ -856,7 +860,7 @@ def sync(self, path):
856860
return self.sync_async(path).get()
857861

858862
def create(self, path, value=b"", acl=None, ephemeral=False,
859-
sequence=False, makepath=False):
863+
sequence=False, makepath=False, include_data=False):
860864
"""Create a node with the given value as its data. Optionally
861865
set an ACL on the node.
862866
@@ -904,7 +908,13 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
904908
with a unique index.
905909
:param makepath: Whether the path should be created if it
906910
doesn't exist.
907-
:returns: Real path of the new node.
911+
:param include_data:
912+
Include the :class:`~kazoo.protocol.states.ZnodeStat` of
913+
the node in addition to its real path. This option changes
914+
the return value to be a tuple of (path, stat).
915+
916+
:returns: Real path of the new node, or tuple if `include_data`
917+
is `True`
908918
:rtype: str
909919
910920
:raises:
@@ -923,21 +933,28 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
923933
:exc:`~kazoo.exceptions.ZookeeperError` if the server
924934
returns a non-zero error code.
925935
936+
.. versionadded:: 1.1
937+
The `makepath` option.
938+
.. versionadded:: 2.7
939+
The `include_data` option.
926940
"""
927941
acl = acl or self.default_acl
928-
return self.create_async(path, value, acl=acl, ephemeral=ephemeral,
929-
sequence=sequence, makepath=makepath).get()
942+
return self.create_async(
943+
path, value, acl=acl, ephemeral=ephemeral,
944+
sequence=sequence, makepath=makepath, include_data=include_data
945+
).get()
930946

931947
def create_async(self, path, value=b"", acl=None, ephemeral=False,
932-
sequence=False, makepath=False):
948+
sequence=False, makepath=False, include_data=False):
933949
"""Asynchronously create a ZNode. Takes the same arguments as
934950
:meth:`create`.
935951
936952
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
937953
938954
.. versionadded:: 1.1
939955
The makepath option.
940-
956+
.. versionadded:: 2.7
957+
The `include_data` option.
941958
"""
942959
if acl is None and self.default_acl:
943960
acl = self.default_acl
@@ -956,6 +973,8 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
956973
raise TypeError("Invalid type for 'sequence' (bool expected)")
957974
if not isinstance(makepath, bool):
958975
raise TypeError("Invalid type for 'makepath' (bool expected)")
976+
if not isinstance(include_data, bool):
977+
raise TypeError("Invalid type for 'include_data' (bool expected)")
959978

960979
flags = 0
961980
if ephemeral:
@@ -970,7 +989,9 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
970989
@capture_exceptions(async_result)
971990
def do_create():
972991
result = self._create_async_inner(
973-
path, value, acl, flags, trailing=sequence)
992+
path, value, acl, flags,
993+
trailing=sequence, include_data=include_data
994+
)
974995
result.rawlink(create_completion)
975996

976997
@capture_exceptions(async_result)
@@ -981,7 +1002,11 @@ def retry_completion(result):
9811002
@wrap(async_result)
9821003
def create_completion(result):
9831004
try:
984-
return self.unchroot(result.get())
1005+
if include_data:
1006+
new_path, stat = result.get()
1007+
return self.unchroot(new_path), stat
1008+
else:
1009+
return self.unchroot(result.get())
9851010
except NoNodeError:
9861011
if not makepath:
9871012
raise
@@ -994,10 +1019,16 @@ def create_completion(result):
9941019
do_create()
9951020
return async_result
9961021

997-
def _create_async_inner(self, path, value, acl, flags, trailing=False):
1022+
def _create_async_inner(self, path, value, acl, flags,
1023+
trailing=False, include_data=False):
9981024
async_result = self.handler.async_result()
1025+
if include_data:
1026+
opcode = Create2
1027+
else:
1028+
opcode = Create
1029+
9991030
call_result = self._call(
1000-
Create(_prefix_root(self.chroot, path, trailing=trailing),
1031+
opcode(_prefix_root(self.chroot, path, trailing=trailing),
10011032
value, acl, flags), async_result)
10021033
if call_result is False:
10031034
# We hit a short-circuit exit on the _call. Because we are

kazoo/protocol/serialization.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,27 @@ def unchroot(client, response):
351351
return resp
352352

353353

354+
class Create2(namedtuple('Create2', 'path data acl flags')):
355+
type = 15
356+
357+
def serialize(self):
358+
b = bytearray()
359+
b.extend(write_string(self.path))
360+
b.extend(write_buffer(self.data))
361+
b.extend(int_struct.pack(len(self.acl)))
362+
for acl in self.acl:
363+
b.extend(int_struct.pack(acl.perms) +
364+
write_string(acl.id.scheme) + write_string(acl.id.id))
365+
b.extend(int_struct.pack(self.flags))
366+
return b
367+
368+
@classmethod
369+
def deserialize(cls, bytes, offset):
370+
path, offset = read_string(bytes, offset)
371+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
372+
return path, stat
373+
374+
354375
class Reconfig(namedtuple('Reconfig',
355376
'joining leaving new_members config_id')):
356377
type = 16

kazoo/tests/test_client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,19 @@ def test_create_exists(self):
775775
path = client.create("/1")
776776
self.assertRaises(NodeExistsError, client.create, path)
777777

778+
def test_create_stat(self):
779+
if TRAVIS_ZK_VERSION:
780+
version = TRAVIS_ZK_VERSION
781+
else:
782+
version = self.client.server_version()
783+
if not version or version < (3, 5):
784+
raise SkipTest("Must use Zookeeper 3.5 or above")
785+
client = self.client
786+
path, stat1 = client.create("/1", b"bytes", include_data=True)
787+
data, stat2 = client.get("/1")
788+
eq_(data, b"bytes")
789+
eq_(stat1, stat2)
790+
778791
def test_create_get_set(self):
779792
nodepath = "/" + uuid.uuid4().hex
780793

@@ -1099,8 +1112,8 @@ def test_basic_create(self):
10991112
t.create('/fred', ephemeral=True)
11001113
t.create('/smith', sequence=True)
11011114
results = t.commit()
1102-
eq_(results[0], '/freddy')
11031115
eq_(len(results), 3)
1116+
eq_(results[0], '/freddy')
11041117
self.assertTrue(results[2].startswith('/smith0'))
11051118

11061119
def test_bad_creates(self):

0 commit comments

Comments
 (0)