Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
match self.sequencer {
SequencerAccess::Local { .. } => Ok(*self.known_global_tail.get()),
SequencerAccess::Remote { .. } => {
todo!("find_tail() is not implemented yet")
}
SequencerAccess::Remote { ref handle } => handle.find_tail().await,
}
}

Expand Down
78 changes: 75 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore};

use restate_core::{
network::{
rpc_router::{RpcRouter, RpcToken},
rpc_router::{RpcError, RpcRouter, RpcToken},
NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection,
},
task_center, ShutdownError, TaskKind,
};
use restate_types::{
config::Configuration,
logs::{metadata::SegmentIndex, LogId, LogletOffset, Record},
net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus},
logs::{metadata::SegmentIndex, LogId, LogletOffset, Record, SequenceNumber, TailState},
net::replicated_loglet::{
Append, Appended, CommonRequestHeader, GetSequencerInfo, SequencerStatus,
},
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
Expand Down Expand Up @@ -205,6 +207,76 @@ where

Ok(connection)
}

/// Attempts to find tail.
///
/// This first tries to find tail by synchronizing with sequencer. If this failed
/// duo to sequencer not reachable, it will immediately try to find tail by querying
/// fmajority of loglet servers
pub async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
// try to sync with sequencer
if self.sync_sequencer_tail().await.is_ok() {
return Ok(*self.known_global_tail.get());
}

// otherwise we need to try to fetch this from the log servers.
self.sync_log_servers_tail().await?;
Ok(*self.known_global_tail.get())
Comment on lines +218 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there value in racing these two variants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I get your question here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we should run both variants for obtaining the known global tail in parallel/concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarification. Yeah, that's definitely a good idea.

}

/// Synchronize known_global_tail with the sequencer
async fn sync_sequencer_tail(&self) -> Result<(), NetworkError> {
let result = self
.sequencers_rpc
.info
.call(
&self.networking,
self.params.sequencer,
GetSequencerInfo {
header: CommonRequestHeader {
log_id: self.log_id,
loglet_id: self.params.loglet_id,
segment_index: self.segment_index,
},
},
)
.await
Comment on lines +232 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the request or response get lost? Would sync_sequencer_tail get stuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's definitely something that need to be handled. I have paused working on this draft PR in favour of #2019 and wasn't sure if I should continue on this one.

.map(|incoming| incoming.into_body());

let info = match result {
Ok(info) => info,
Err(RpcError::Shutdown(shutdown)) => return Err(NetworkError::Shutdown(shutdown)),
Err(RpcError::SendError(err)) => return Err(err.source),
};

match info.header.status {
SequencerStatus::Ok => {
// update header info
if let Some(offset) = info.header.known_global_tail {
self.known_global_tail.notify_offset_update(offset);
}
}
SequencerStatus::Sealed => {
self.known_global_tail.notify(
true,
info.header
.known_global_tail
.unwrap_or(LogletOffset::INVALID),
);
}
_ => {
unreachable!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe state why this case is unreachable. Maybe also don't use the wildcard. That way we'll see the places where a newly SequencerStatus variant needs to be handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thank you :)

}
};

Ok(())
}

/// A fallback mechanism in case sequencer is not available
/// to try and sync known_global_tail with fmajority of LogServers
async fn sync_log_servers_tail(&self) -> Result<(), OperationError> {
todo!()
}
Comment on lines +277 to +279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where #2019 comes into play, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I started on this before #2019 was available. I think #2019 renders this PR obsolete since fetching the tail from the sequencer is just an improvement.

}

/// RemoteSequencerConnection represents a single open connection
Expand Down
6 changes: 4 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::MessageRouterBuilder;
use restate_types::net::log_server::{GetLogletInfo, GetRecords, Release, Seal, Store, Trim};
use restate_types::net::replicated_loglet::Append;
use restate_types::net::replicated_loglet::{Append, GetSequencerInfo};

/// Used by replicated loglets to send requests and receive responses from log-servers
/// Cloning this is cheap and all clones will share the same internal trackers.
Expand Down Expand Up @@ -56,14 +56,16 @@ impl LogServersRpc {
#[derive(Clone)]
pub struct SequencersRpc {
pub append: RpcRouter<Append>,
pub info: RpcRouter<GetSequencerInfo>,
}

impl SequencersRpc {
/// Registers all routers into the supplied message router. This ensures that
/// responses are routed correctly.
pub fn new(router_builder: &mut MessageRouterBuilder) -> Self {
let append = RpcRouter::new(router_builder);
let info = RpcRouter::new(router_builder);

Self { append }
Self { append, info }
}
}
2 changes: 2 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ enum TargetName {
// ReplicatedLoglet
REPLICATED_LOGLET_APPEND = 40;
REPLICATED_LOGLET_APPENDED = 41;
REPLICATED_LOGLET_GET_INFO = 42;
REPLICATED_LOGLET_INFO = 43;
}

enum NodeStatus {
Expand Down
29 changes: 28 additions & 1 deletion crates/types/src/net/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl Append {
self.payloads
.iter()
.map(|p| p.estimated_encode_size())
.sum()
.sum::<usize>()
+ size_of::<CommonRequestHeader>()
}
}

Expand Down Expand Up @@ -137,3 +138,29 @@ impl Appended {
self
}
}

define_rpc! {
@request = GetSequencerInfo,
@response = SequencerInfo,
@request_target = TargetName::ReplicatedLogletGetInfo,
@response_target = TargetName::ReplicatedLogletInfo,
}

// ** APPEND
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetSequencerInfo {
#[serde(flatten)]
pub header: CommonRequestHeader,
}

impl GetSequencerInfo {
pub fn estimated_encode_size(&self) -> usize {
size_of::<CommonRequestHeader>()
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SequencerInfo {
#[serde(flatten)]
pub header: CommonResponseHeader,
}
Loading