Skip to content

Commit 7601b7b

Browse files
committed
Refactor replicas for better statuses, break every other driver =)
1 parent 1e407bf commit 7601b7b

File tree

13 files changed

+199
-94
lines changed

13 files changed

+199
-94
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

c/src/server/server_replica.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::{ffi::c_char, ptr::addr_of_mut};
2121

22-
use typedb_driver::{ReplicaType, ServerReplica};
22+
use typedb_driver::{ReplicaRole, ServerReplica};
2323

2424
use crate::common::{
2525
iterator::{iterator_next, CIterator},
@@ -62,8 +62,8 @@ pub extern "C" fn server_replica_get_address(replica_info: *const ServerReplica)
6262

6363
/// Returns whether this is the primary replica of the raft cluster or any of the supporting types.
6464
#[no_mangle]
65-
pub extern "C" fn server_replica_get_type(replica_info: *const ServerReplica) -> ReplicaType {
66-
borrow(replica_info).replica_type()
65+
pub extern "C" fn server_replica_get_type(replica_info: *const ServerReplica) -> ReplicaRole {
66+
borrow(replica_info).role().unwrap() // TODO: Return optional!
6767
}
6868

6969
/// Checks whether this is the primary replica of the raft cluster.
@@ -75,5 +75,5 @@ pub extern "C" fn server_replica_is_primary(replica_info: *const ServerReplica)
7575
/// Returns the raft protocol ‘term’ of this replica.
7676
#[no_mangle]
7777
pub extern "C" fn server_replica_get_term(replica_info: *const ServerReplica) -> i64 {
78-
borrow(replica_info).term() as i64
78+
borrow(replica_info).term().unwrap() as i64 // TODO: Return optional!
7979
}

dependencies/typedb/repositories.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def typedb_protocol():
3030
git_repository(
3131
name = "typedb_protocol",
3232
remote = "https://github.com/typedb/typedb-protocol",
33-
commit = "55ed1423598425fa5ad56c7b7ca417a05d05dac3", # sync-marker: do not remove this comment, this is used for sync-dependencies by @typedb_protocol
33+
commit = "7653c86e6ca7f19ade1d055793c1082b685cac80", # sync-marker: do not remove this comment, this is used for sync-dependencies by @typedb_protocol
3434
)
3535

3636
def typedb_behaviour():

rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060

6161
[dependencies.typedb-protocol]
6262
features = []
63-
rev = "55ed1423598425fa5ad56c7b7ca417a05d05dac3"
63+
rev = "7653c86e6ca7f19ade1d055793c1082b685cac80"
6464
git = "https://github.com/typedb/typedb-protocol"
6565
default-features = false
6666

rust/src/common/error.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,12 @@ error_messages! { ConnectionError
137137
4: "Unable to connect to TypeDB server(s), received network or protocol error: \n{error}",
138138
ServerConnectionIsClosed =
139139
5: "The connection has been closed and no further operation is allowed.",
140+
ServerConnectionIsClosedUnexpectedly =
141+
6: "The connection has been closed unexpectedly and no further operation is allowed.",
140142
TransactionIsClosed =
141-
6: "The transaction is closed and no further operation is allowed.",
143+
7: "The transaction is closed and no further operation is allowed.",
142144
TransactionIsClosedWithErrors { errors: String } =
143-
7: "The transaction is closed because of the error(s):\n{errors}",
145+
8: "The transaction is closed because of the error(s):\n{errors}",
144146
MissingResponseField { field: &'static str } =
145147
9: "Missing field in message received from server: '{field}'. This is either a version compatibility issue or a bug.",
146148
UnknownRequestId { request_id: RequestID } =
@@ -331,6 +333,8 @@ impl Error {
331333
fn from_message(message: &str) -> Self {
332334
if is_rst_stream(message) || is_tcp_connect_error(message) {
333335
Self::Connection(ConnectionError::ServerConnectionFailedNetworking { error: message.to_owned() })
336+
} else if is_reading_body_from_connection_error(message) {
337+
Self::Connection(ConnectionError::ServerConnectionIsClosedUnexpectedly)
334338
} else {
335339
println!("Error Other from message: {message}");
336340
Self::Other(message.to_owned())
@@ -455,6 +459,11 @@ fn is_rst_stream(message: &str) -> bool {
455459
message.contains("Received Rst Stream")
456460
}
457461

462+
fn is_reading_body_from_connection_error(message: &str) -> bool {
463+
// This error can be returned when the server crashes
464+
message.contains("error reading a body from connection")
465+
}
466+
458467
fn is_tcp_connect_error(message: &str) -> bool {
459468
// No TCP connection
460469
message.contains("tcp connect error")

rust/src/connection/network/proto/message.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,7 @@ impl TryIntoProto<authentication::token::create::Req> for Credentials {
325325

326326
impl TryFromProto<connection::open::Res> for Response {
327327
fn try_from_proto(proto: connection::open::Res) -> Result<Self> {
328-
let mut servers = Vec::new();
329-
for server_proto in
330-
proto.servers_all.ok_or(ConnectionError::MissingResponseField { field: "servers_all" })?.servers
331-
{
332-
servers.push(ServerReplica::try_from_proto(server_proto)?);
333-
}
328+
let servers = proto.servers_all.ok_or(ConnectionError::MissingResponseField { field: "servers_all" })?.servers.into_iter().map(|server_proto| ServerReplica::try_from_proto(server_proto)).try_collect()?;
334329
Ok(Self::ConnectionOpen {
335330
connection_id: Uuid::from_slice(
336331
proto
@@ -357,8 +352,7 @@ impl TryFromProto<server_manager::all::Res> for Response {
357352
impl TryFromProto<server_manager::get::Res> for Response {
358353
fn try_from_proto(proto: server_manager::get::Res) -> Result<Self> {
359354
let server_manager::get::Res { server } = proto;
360-
let server =
361-
ServerReplica::try_from_proto(server.ok_or(ConnectionError::MissingResponseField { field: "server" })?)?;
355+
let server = ServerReplica::try_from_proto(server.ok_or(ConnectionError::MissingResponseField { field: "server" })?)?;
362356
Ok(Self::ServersGet { server })
363357
}
364358
}

rust/src/connection/network/proto/server.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,39 +26,41 @@ use super::TryFromProto;
2626
use crate::{
2727
common::Result,
2828
connection::{
29-
server_replica::{ReplicaStatus, ReplicaType, ServerReplica},
29+
server_replica::{ReplicaStatus, ReplicaRole, ServerReplica},
3030
server_version::ServerVersion,
3131
},
3232
error::ConnectionError,
3333
};
3434

3535
impl TryFromProto<ServerProto> for ServerReplica {
36-
fn try_from_proto(proto: ServerProto) -> Result<Self> {
37-
let address = proto.address.parse()?;
38-
let replica_status = match proto.replica_status {
39-
Some(replica_status) => ReplicaStatus::try_from_proto(replica_status)?,
40-
None => ReplicaStatus::default(),
41-
};
42-
Ok(Self::from_private(address, replica_status))
36+
fn try_from_proto(proto: ServerProto) -> Result<ServerReplica> {
37+
let replica_status = proto.replica_status.map(|status| ReplicaStatus::try_from_proto(status)).transpose()?;
38+
match proto.address {
39+
Some(address) => Ok(ServerReplica::available_from_private(address.parse()?, replica_status)),
40+
None => Ok(ServerReplica::Unavailable { replica_status }),
41+
}
4342
}
4443
}
4544

4645
impl TryFromProto<ReplicaStatusProto> for ReplicaStatus {
4746
fn try_from_proto(proto: ReplicaStatusProto) -> Result<Self> {
4847
Ok(Self {
4948
id: proto.replica_id,
50-
replica_type: ReplicaType::try_from_proto(proto.replica_type)?,
49+
role: Option::<ReplicaRole>::try_from_proto(proto.replica_type)?,
5150
term: proto.term,
5251
})
5352
}
5453
}
5554

56-
impl TryFromProto<i32> for ReplicaType {
57-
fn try_from_proto(replica_type: i32) -> Result<Self> {
55+
impl TryFromProto<Option<i32>> for Option<ReplicaRole> {
56+
fn try_from_proto(replica_type: Option<i32>) -> Result<Option<ReplicaRole>> {
57+
let Some(replica_type) = replica_type else {
58+
return Ok(None);
59+
};
5860
match replica_type {
59-
0 => Ok(Self::Primary),
60-
1 => Ok(Self::Candidate),
61-
2 => Ok(Self::Secondary),
61+
0 => Ok(Some(ReplicaRole::Primary)),
62+
1 => Ok(Some(ReplicaRole::Candidate)),
63+
2 => Ok(Some(ReplicaRole::Follower)),
6264
_ => Err(ConnectionError::UnexpectedReplicaType { replica_type }.into()),
6365
}
6466
}

rust/src/connection/server/server_manager.rs

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ use crate::{
4141
error::{ConnectionError, InternalError},
4242
Credentials, DriverOptions, Error, Result,
4343
};
44+
use crate::connection::server_replica::{AvailableServerReplica, Replica};
4445

4546
pub(crate) struct ServerManager {
4647
configured_addresses: Addresses,
47-
replicas: RwLock<HashSet<ServerReplica>>,
48+
replicas: RwLock<HashSet<AvailableServerReplica>>,
4849
replica_connections: RwLock<HashMap<Address, ServerConnection>>,
4950
address_translation: RwLock<AddressTranslation>,
5051

@@ -87,11 +88,10 @@ impl ServerManager {
8788
)
8889
.await?;
8990
let address_translation = addresses.address_translation();
90-
9191
println!("INIT REPLICA CONNECTIONS: {:?}", source_connections);
9292
let server_manager = Self {
9393
configured_addresses: addresses,
94-
replicas: RwLock::new(replicas),
94+
replicas: RwLock::new(Self::filter_unavailable_replicas(replicas)),
9595
replica_connections: RwLock::new(source_connections),
9696
address_translation: RwLock::new(address_translation),
9797
background_runtime,
@@ -207,10 +207,6 @@ impl ServerManager {
207207
self.replica_connections.write().expect("Expected server connections write access")
208208
}
209209

210-
fn read_replicas(&self) -> RwLockReadGuard<'_, HashSet<ServerReplica>> {
211-
self.replicas.read().expect("Expected a read replica lock")
212-
}
213-
214210
fn read_address_translation(&self) -> RwLockReadGuard<'_, AddressTranslation> {
215211
self.address_translation.read().expect("Expected address translation read access")
216212
}
@@ -219,16 +215,6 @@ impl ServerManager {
219215
self.read_replica_connections().values().map(ServerConnection::force_close).try_collect().map_err(Into::into)
220216
}
221217

222-
fn replicas(&self) -> HashSet<ServerReplica> {
223-
let res = self.read_replicas().iter().cloned().collect();
224-
println!("Current replicas: {res:?}");
225-
res
226-
}
227-
228-
pub(crate) fn primary_replica(&self) -> Option<ServerReplica> {
229-
self.read_replicas().iter().filter(|replica| replica.is_primary()).max_by_key(|replica| replica.term()).cloned()
230-
}
231-
232218
pub(crate) fn username(&self) -> Result<String> {
233219
match self.read_replica_connections().iter().next() {
234220
Some((_, replica_connection)) => Ok(replica_connection.username().to_string()),
@@ -271,9 +257,12 @@ impl ServerManager {
271257
P: Future<Output = Result<R>>,
272258
{
273259
println!("STRONG");
274-
let mut primary_replica = match self.primary_replica() {
260+
let mut primary_replica = match self.read_primary_replica() {
275261
Some(replica) => replica,
276-
None => self.seek_primary_replica_in(self.replicas()).await?,
262+
None => {
263+
let replicas: HashSet<_> = self.read_replicas().iter().cloned().collect();
264+
self.seek_primary_replica_in(replicas).await?
265+
},
277266
};
278267

279268
let retries = self.driver_options.primary_failover_retries;
@@ -283,7 +272,8 @@ impl ServerManager {
283272
let private_address = primary_replica.private_address().clone();
284273
match self.execute_on(primary_replica.address(), &private_address, &task).await {
285274
Err(Error::Connection(connection_error)) => {
286-
let replicas_without_old_primary = self.replicas().into_iter().filter(|replica| {
275+
let replicas: HashSet<_> = self.read_replicas().iter().cloned().collect();
276+
let replicas_without_old_primary = replicas.into_iter().filter(|replica| {
287277
println!("REPLICAS: Filter out? {private_address:?} vs mine {:?}", replica.private_address());
288278
replica.private_address() != &private_address
289279
});
@@ -326,12 +316,12 @@ impl ServerManager {
326316
F: Fn(ServerConnection) -> P,
327317
P: Future<Output = Result<R>>,
328318
{
329-
let replicas = self.replicas();
319+
let replicas: HashSet<_> = self.read_replicas().iter().cloned().collect();
330320
self.execute_on_any(replicas, task).await
331321
}
332322

333323
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
334-
async fn execute_on_any<F, P, R>(&self, replicas: impl IntoIterator<Item = ServerReplica>, task: F) -> Result<R>
324+
async fn execute_on_any<F, P, R>(&self, replicas: impl IntoIterator<Item = AvailableServerReplica>, task: F) -> Result<R>
335325
where
336326
F: Fn(ServerConnection) -> P,
337327
P: Future<Output = Result<R>>,
@@ -379,8 +369,8 @@ impl ServerManager {
379369
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
380370
async fn seek_primary_replica_in(
381371
&self,
382-
source_replicas: impl IntoIterator<Item = ServerReplica>,
383-
) -> Result<ServerReplica> {
372+
source_replicas: impl IntoIterator<Item = AvailableServerReplica>,
373+
) -> Result<AvailableServerReplica> {
384374
// TODO: Add retries with sleeps in between? Maybe replica_discovery_attempts should work / be used differently
385375
println!("SEEK");
386376
self.execute_on_any(source_replicas, |replica_connection| async {
@@ -390,13 +380,13 @@ impl ServerManager {
390380
}
391381

392382
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
393-
async fn seek_primary_replica(&self, replica_connection: ServerConnection) -> Result<ServerReplica> {
383+
async fn seek_primary_replica(&self, replica_connection: ServerConnection) -> Result<AvailableServerReplica> {
394384
let address_translation = self.read_address_translation().clone();
395385
println!("SEEK PRIMARY REPLICA");
396386
let replicas = Self::fetch_replicas_from_connection(&replica_connection, &address_translation).await?;
397387
println!("WRITE NEW PRIMARY");
398-
*self.replicas.write().expect("Expected replicas write lock") = replicas;
399-
if let Some(replica) = self.primary_replica() {
388+
*self.replicas.write().expect("Expected replicas write lock") = Self::filter_unavailable_replicas(replicas);
389+
if let Some(replica) = self.read_primary_replica() {
400390
self.update_replica_connections().await?;
401391
Ok(replica)
402392
} else {
@@ -438,7 +428,7 @@ impl ServerManager {
438428
return Ok((source_connections, translated_replicas));
439429
} else {
440430
if let Some(target_replica) =
441-
translated_replicas.into_iter().find(|replica| replica.address() == address)
431+
translated_replicas.into_iter().find(|replica| replica.address() == Some(address))
442432
{
443433
let source_connections = HashMap::from([(address.clone(), replica_connection)]);
444434
return Ok((source_connections, HashSet::from([target_replica])));
@@ -478,7 +468,7 @@ impl ServerManager {
478468
async move { Self::fetch_replicas_from_connection(&replica_connection, &address_translation).await }
479469
})
480470
.await?;
481-
*self.replicas.write().expect("Expected replicas write lock") = replicas.clone();
471+
*self.replicas.write().expect("Expected replicas write lock") = Self::filter_unavailable_replicas(replicas.clone());
482472
Ok(replicas)
483473
}
484474

@@ -497,6 +487,31 @@ impl ServerManager {
497487
replicas.into_iter().map(|replica| replica.translated(address_translation)).collect()
498488
}
499489

490+
fn filter_unavailable_replicas(replicas: impl IntoIterator<Item = ServerReplica>) -> HashSet<AvailableServerReplica> {
491+
replicas.into_iter().filter_map(|replica| match replica {
492+
ServerReplica::Available(available) => Some(available),
493+
ServerReplica::Unavailable { .. } => None,
494+
}).collect()
495+
}
496+
497+
#[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
498+
pub(crate) async fn fetch_primary_replica(&self) -> Result<Option<ServerReplica>> {
499+
let replicas = self.fetch_replicas().await?;
500+
Ok(Self::filter_primary_replica(replicas.iter()))
501+
}
502+
503+
fn read_replicas(&self) -> RwLockReadGuard<'_, HashSet<AvailableServerReplica>> {
504+
self.replicas.read().expect("Expected a read replica lock")
505+
}
506+
507+
fn read_primary_replica(&self) -> Option<AvailableServerReplica> {
508+
Self::filter_primary_replica(self.read_replicas().iter())
509+
}
510+
511+
fn filter_primary_replica<'a, R: Replica + 'a>(replicas: impl IntoIterator<Item = &'a R>) -> Option<R> {
512+
replicas.into_iter().filter(|replica| replica.is_primary()).max_by_key(|replica| replica.term().unwrap_or_default()).cloned()
513+
}
514+
500515
fn server_connection_failed_err(&self, errors: HashMap<Address, Error>) -> Error {
501516
let accessed_addresses =
502517
Addresses::from_addresses(self.read_replicas().iter().map(|replica| replica.address().clone()));

0 commit comments

Comments
 (0)