Skip to content

Commit b8365e8

Browse files
authored
fix: Client::create does not work on ZooKeeper 3.4.x (#19)
This commit adds client side assumed ZooKeeper version so that client can fallback to `OpCode::create` instead of `OpCode::create2` on 3.4.x. Resolves #18.
1 parent 4e52d34 commit b8365e8

File tree

3 files changed

+86
-8
lines changed

3 files changed

+86
-8
lines changed

src/client/mod.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ impl CreateSequence {
199199
/// Client encapsulates ZooKeeper session to interact with ZooKeeper cluster.
200200
///
201201
/// Besides semantic errors, node operations could also fail due to cluster availability and
202-
/// limitations, e.g. [Error::ConnectionLoss], [Error::QuotaExceeded] and so on.
202+
/// capabilities, e.g. [Error::ConnectionLoss], [Error::QuotaExceeded], [Error::Unimplemented] and
203+
/// so on.
203204
///
204205
/// All remote operations will fail after session expired, failed or closed.
205206
///
@@ -211,6 +212,7 @@ impl CreateSequence {
211212
#[derive(Clone, Debug)]
212213
pub struct Client {
213214
chroot: OwnedChroot,
215+
version: Version,
214216
session: (SessionId, Vec<u8>),
215217
session_timeout: Duration,
216218
requester: mpsc::UnboundedSender<SessionOperation>,
@@ -232,13 +234,14 @@ impl Client {
232234

233235
pub(crate) fn new(
234236
chroot: OwnedChroot,
237+
version: Version,
235238
session: (SessionId, Vec<u8>),
236239
timeout: Duration,
237240
requester: mpsc::UnboundedSender<SessionOperation>,
238241
state_receiver: watch::Receiver<SessionState>,
239242
) -> Client {
240243
let state_watcher = StateWatcher::new(state_receiver);
241-
Client { chroot, session, session_timeout: timeout, requester, state_watcher }
244+
Client { chroot, version, session, session_timeout: timeout, requester, state_watcher }
242245
}
243246

244247
fn validate_path<'a>(&'a self, path: &'a str) -> Result<ChrootPath<'a>> {
@@ -422,6 +425,12 @@ impl Client {
422425
/// * [Error::NoNode] if parent node does not exist.
423426
/// * [Error::NoChildrenForEphemerals] if parent node is ephemeral.
424427
/// * [Error::InvalidAcl] if acl is invalid or empty.
428+
///
429+
/// # Notable behaviors
430+
/// The resulting [Stat] will be [Stat::is_invalid] if assumed server version is 3.4 series or
431+
/// below. See [ClientBuilder::assume_server_version] and [ZOOKEEPER-1297][].
432+
///
433+
/// [ZOOKEEPER-1297]: https://issues.apache.org/jira/browse/ZOOKEEPER-1297
425434
pub fn create<'a: 'f, 'b: 'f, 'f>(
426435
&'a self,
427436
path: &'b str,
@@ -449,8 +458,10 @@ impl Client {
449458
OpCode::CreateTtl
450459
} else if create_mode.is_container() {
451460
OpCode::CreateContainer
452-
} else {
461+
} else if self.version >= Version(3, 5, 0) {
453462
OpCode::Create2
463+
} else {
464+
OpCode::Create
454465
};
455466
let flags = create_mode.as_flags(ttl != 0);
456467
let request = CreateRequest { path: chroot_path, data, acls: options.acls, flags, ttl };
@@ -461,7 +472,8 @@ impl Client {
461472
let server_path = record::unmarshal_entity::<&str>(&"server path", &mut buf)?;
462473
let client_path = util::strip_root_path(server_path, self.chroot.root())?;
463474
let sequence = if sequential { Self::parse_sequence(client_path, path)? } else { CreateSequence(-1) };
464-
let stat = record::unmarshal::<Stat>(&mut buf)?;
475+
let stat =
476+
if op_code == OpCode::Create { Stat::new_invalid() } else { record::unmarshal::<Stat>(&mut buf)? };
465477
Ok((stat, sequence))
466478
})
467479
}
@@ -1508,10 +1520,14 @@ impl Drop for OwnedLockClient {
15081520
}
15091521
}
15101522

1523+
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
1524+
pub(crate) struct Version(u32, u32, u32);
1525+
15111526
/// Builder for [Client] with more options than [Client::connect].
15121527
#[derive(Clone, Debug)]
15131528
pub struct ClientBuilder {
15141529
authes: Vec<AuthPacket>,
1530+
version: Version,
15151531
session: Option<(SessionId, Vec<u8>)>,
15161532
readonly: bool,
15171533
detached: bool,
@@ -1523,6 +1539,7 @@ impl ClientBuilder {
15231539
fn new() -> Self {
15241540
Self {
15251541
authes: Default::default(),
1542+
version: Version(u32::MAX, u32::MAX, u32::MAX),
15261543
session: None,
15271544
readonly: false,
15281545
detached: false,
@@ -1565,6 +1582,20 @@ impl ClientBuilder {
15651582
self
15661583
}
15671584

1585+
/// Specifies client assumed server version of ZooKeeper cluster.
1586+
///
1587+
/// Client will issue server compatible protocol to avoid [Error::Unimplemented] for some
1588+
/// operations. See [Client::create] for an example.
1589+
///
1590+
/// See [ZOOKEEPER-1381][] and [ZOOKEEPER-3762][] for references.
1591+
///
1592+
/// [ZOOKEEPER-1381]: https://issues.apache.org/jira/browse/ZOOKEEPER-1381
1593+
/// [ZOOKEEPER-3762]: https://issues.apache.org/jira/browse/ZOOKEEPER-3762
1594+
pub fn assume_server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
1595+
self.version = Version(major, minor, patch);
1596+
self
1597+
}
1598+
15681599
/// Detaches creating session so it will not be closed after all client instances dropped.
15691600
pub fn detach(&mut self) -> &mut Self {
15701601
self.detached = true;
@@ -1608,7 +1639,8 @@ impl ClientBuilder {
16081639
tokio::spawn(async move {
16091640
session.serve(servers, sock, buf, connecting_depot, receiver).await;
16101641
});
1611-
let client = Client::new(chroot.to_owned(), session_info, session_timeout, sender, state_receiver);
1642+
let client =
1643+
Client::new(chroot.to_owned(), self.version, session_info, session_timeout, sender, state_receiver);
16121644
Ok(client)
16131645
}
16141646
}

src/session/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct WatchedEvent {
8585
///
8686
/// # Notable behaviors
8787
/// * This feature was shipped in 3.9.0, `zxid` wil be [WatchedEvent::NO_ZXID] for earlier versions. See [ZOOKEEPER-4655].
88-
/// * It is possible to receive multiple events with same `zxid` and even same `path` due to [MultiWriter]. See [ZOOKEEPER-4695].
88+
/// * It is possible to receive multiple events with same `zxid` and even same `path` due to [crate::MultiWriter]. See [ZOOKEEPER-4695].
8989
///
9090
/// [ZOOKEEPER-4655]: https://issues.apache.org/jira/browse/ZOOKEEPER-4655
9191
/// [ZOOKEEPER-4695]: https://issues.apache.org/jira/browse/ZOOKEEPER-4695

tests/zookeeper.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,32 @@ fn random_data() -> Vec<u8> {
2323
rng.sample_iter(Standard).take(32).collect()
2424
}
2525

26-
fn zookeeper_image_with_properties<'a>(mut properties: Vec<&'a str>) -> GenericImage {
26+
fn zookeeper_image_with_version_and_properties<'a>(version: &'a str, mut properties: Vec<&'a str>) -> GenericImage {
2727
properties.insert(0, "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=");
2828
properties.insert(0, "-Dzookeeper.enableEagerACLCheck=true");
2929
let jvm_properties = properties.join(" ");
3030
let healthcheck = Healthcheck::default()
3131
.with_cmd(["./bin/zkServer.sh", "status"].iter())
3232
.with_interval(Duration::from_secs(2))
3333
.with_retries(60);
34-
GenericImage::new("zookeeper", "3.9.0")
34+
GenericImage::new("zookeeper", version)
3535
.with_env_var("SERVER_JVMFLAGS", jvm_properties)
3636
.with_healthcheck(healthcheck)
3737
.with_wait_for(WaitFor::Healthcheck)
3838
}
3939

40+
fn zookeeper_image_with_properties<'a>(properties: Vec<&'a str>) -> GenericImage {
41+
zookeeper_image_with_version_and_properties("3.9.0", properties)
42+
}
43+
4044
fn zookeeper_image() -> GenericImage {
4145
zookeeper_image_with_properties(Vec::default())
4246
}
4347

48+
fn zookeeper34_image() -> GenericImage {
49+
zookeeper_image_with_version_and_properties("3.4", Vec::default())
50+
}
51+
4452
async fn example() {
4553
let docker = DockerCli::default();
4654
let zookeeper = docker.run(zookeeper_image());
@@ -591,6 +599,44 @@ async fn test_create_container() {
591599
assert_that!(client.delete("/container", None).await.unwrap_err()).is_equal_to(zk::Error::NoNode);
592600
}
593601

602+
#[test_log::test(tokio::test)]
603+
async fn test_zookeeper34() {
604+
let docker = DockerCli::default();
605+
let zookeeper = docker.run(zookeeper34_image());
606+
let zk_port = zookeeper.get_host_port(2181);
607+
let cluster = format!("127.0.0.1:{}", zk_port);
608+
let client = zk::Client::builder().assume_server_version(3, 4, u32::MAX).connect(&cluster).await.unwrap();
609+
let (stat, _sequence) = client.create("/a", b"a1", PERSISTENT_OPEN).await.unwrap();
610+
assert_that!(stat.is_invalid()).is_true();
611+
612+
let (data, stat) = client.get_data("/a").await.unwrap();
613+
assert_eq!(data, b"a1".to_vec());
614+
assert_eq!(stat.version, 0);
615+
616+
let (data, stat, watcher) = client.get_and_watch_data("/a").await.unwrap();
617+
assert_eq!(data, b"a1".to_vec());
618+
assert_eq!(stat.version, 0);
619+
620+
let stat = client.set_data("/a", b"a2", Some(0)).await.unwrap();
621+
assert_eq!(stat.version, 1);
622+
623+
let event = watcher.changed().await;
624+
assert_eq!(event.path, "/a");
625+
assert_eq!(event.zxid, -1);
626+
assert_eq!(event.event_type, zk::EventType::NodeDataChanged);
627+
628+
let (data, stat, watcher) = client.get_and_watch_data("/a").await.unwrap();
629+
assert_eq!(data, b"a2".to_vec());
630+
assert_eq!(stat.version, 1);
631+
632+
client.delete("/a", Some(1)).await.unwrap();
633+
634+
let event = watcher.changed().await;
635+
assert_eq!(event.path, "/a");
636+
assert_eq!(event.zxid, -1);
637+
assert_eq!(event.event_type, zk::EventType::NodeDeleted);
638+
}
639+
594640
#[test_log::test(tokio::test)]
595641
async fn test_mkdir() {
596642
let docker = DockerCli::default();

0 commit comments

Comments
 (0)