storage: allow singleton sources on multi-replica clusters#31890
Conversation
5f9de72 to
804de79
Compare
This hides the implementation detail and will make it easier to change how we track the data inside Instance. Which latter I'll do in one of the next commits. We have to use a `Box<dyn ...>` because the trait needs to be object safe, that is dyn-compatible.
804de79 to
9413c53
Compare
|
@bkirwi & @petrosagg If we're happy with this approach, we can also use it for sinks on multi-replica clusters. |
| #[derive(Debug)] | ||
| struct ActiveIngestion { | ||
| /// Whether the ingestion prefers running on a single replica. | ||
| prefers_single_replica: bool, |
There was a problem hiding this comment.
Is this a property of an active ingestion? I would expect this information to be found in the collection state for the given GlobalId. Having it here implies that it can potentially disagree with the ingestion, right?
There was a problem hiding this comment.
It's a property of the IngestionDescription, and yeah, this is a cache of that, so that I don't have to re-derive it from the history. If you prefer that, I can also change it so we get it from the history instead. I don't think this is performance-sensitive code.
|
|
||
| /// Replays commands to the specified replica. | ||
| pub fn replay_commands(&mut self, replica_id: ReplicaId) { | ||
| let commands = self.history.iter().cloned().collect::<Vec<_>>(); |
There was a problem hiding this comment.
Haven't tried it but it feels like the collect and the into_iter below can be deleted.
|
|
||
| let filtered_commands = commands | ||
| .into_iter() | ||
| .map(|command| match command.clone() { |
There was a problem hiding this comment.
Aren't the commands already owned since you called cloned above?
| .map(|command| match command.clone() { | ||
| StorageCommand::RunIngestions(mut cmds) => { | ||
| cmds.retain(|cmd| self.is_active_replica(&cmd.id, &replica_id)); | ||
| StorageCommand::RunIngestions(cmds) |
There was a problem hiding this comment.
This may end up with no commands, you could switch the iterator combinator to filter_map and return None in this case to avoid sending the command
| } | ||
| command => command, | ||
| }) | ||
| .collect::<Vec<_>>(); |
There was a problem hiding this comment.
Also seems it can be deleted (and also didn't try it)
There was a problem hiding this comment.
this one is getting around a borrowing problem, the iterator wants to access self and the call for getting the replica does as well
| .description | ||
| .desc | ||
| .connection | ||
| .prefers_single_replica(); |
There was a problem hiding this comment.
I see, so this method already exists there. Is there a reason we can't use this instead of de-normalizing it in the ActiveIngestion struct?
There was a problem hiding this comment.
By "already exist", you mean it's added in this PR? 😅
But yes, see my answer up here: https://github.com/MaterializeInc/materialize/pull/31890/files/9413c5366af22285e88e7c7ad279fcfcf1a20d65#r1999229940
| /// we never change the scheduling decision for single-replica ingestions | ||
| /// unless we have to, that is unless the replica that they are running on | ||
| /// goes away. We do this, so that we don't send a mix of "run"/"allow | ||
| /// compaction"/"run" messages to replicas, which wouldn't deal well with |
There was a problem hiding this comment.
How is this avoided? I would expect that if I create a replica A and then create and drop replica B then replica A will receive run, allow compaction, run, allow compaction for that ingestion while we switch back and forth.
There was a problem hiding this comment.
In that case replica A would receive a RunIngestion and then nothing more. From above:
Crucially, we never change the scheduling decision for single-replica ingestions unless we have to, that is unless the replica that they are running on goes away. We do this, so that we don't send a mix of "run"/"allow compaction"/"run" messages to replicas
I can highlight it more that this is the more important property we uphold
There was a problem hiding this comment.
Does that mean that if I create a replica and then drop it my pg sources will never get scheduled again?
There was a problem hiding this comment.
No, in that case it would be scheduled on another replica.
Here's the docstring, but slightly massaged to highlight the important property. I'll change the code to that.
/// An important property of this scheduling algorithm is that we never
/// change the scheduling decision for single-replica ingestions unless we
/// have to, that is unless the replica that they are running on goes away.
/// We do this, so that we don't send a mix of "run"/"allow
/// compaction"/"run" messages to replicas, which wouldn't deal well with
/// this. When we _do_ have to make a scheduling decision we schedule a
/// single-replica ingestion on the last replica.
Does this help?
I want to change the last part (along with the implementation) to this, though:
/// this. When we _do_ have to make a scheduling decision we schedule a
/// single-replica ingestion on the first replica, according to the sort
/// order of `ReplicaId`. We do this latter so that the scheduling decision
/// is stable across restarts of `environmentd`/the controller.
| pub fn get_ingestion_description( | ||
| &self, | ||
| id: &GlobalId, | ||
| ) -> Option<mz_storage_types::sources::IngestionDescription<CollectionMetadata>> { |
There was a problem hiding this comment.
nit: maybe import IngestionDescription at the top to avoid the long path
| if let Some(ingestion_id) = self.ingestion_exports.get(id) { | ||
| // Right now, only ingestions can have per-replica scheduling | ||
| // decisions. | ||
| if let Some(ingestion) = self.active_ingestions.get(ingestion_id) { |
There was a problem hiding this comment.
nit: feels cleaner to make this a match instead of an if let with an early return to avoid falling through
|
In general, the more similar the replicas of a particular cluster are, the happier I am. For the Kafka source, where we only filter at the end, things feel very similar and I am very happy. For these other sources, where we end up sending different command streams to different replicas, I am more nervous! I think it probably doesn't make sense to revisit the design here, especially given the timing of everything, but I'd be interested in exploring a different approach for the sink. (eg. having the controller pick a leader, but only doing the filtering of the Kafka writes "at the last minute" in the sink.) |
9413c53 to
25feff9
Compare
Agreed on all those accounts! This design was chosen because it was determined to be quite hard for the sources to share their replication slot. I'd be very happy if we can come up with a design that allows us to run the same dataflows on all replicas. Also, though, this thing here is not at all a trap door decision, we can easily change things in future releases. |
25feff9 to
731e1e3
Compare
731e1e3 to
689fdd8
Compare
|
@petrosagg I addressed your easy comments, and the question around scheduling is now resolved. Could you take another look? If you like, I'll also remove the cached |
|
(Just TBC - once Petros is satisfied I am also satisfied!) |
petrosagg
left a comment
There was a problem hiding this comment.
Looks great! Thanks for explaining the scheduling thing in the call
This PR extends the approach of MaterializeInc#31890 to the case of sinks. The handling is identical to that of single replica sources. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
This PR extends the approach of MaterializeInc#31890 to the case of sinks. The handling is identical to that of single replica sources. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Implements https://github.com/MaterializeInc/database-issues/issues/9079
Per the design in https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20250127_multi_replica_scheduling_singleton_sources.md
There's now an issue around tracking "active replicas" in the storage controller, for the purposes of tracking
DroppedIdmessages. I have a good idea how to fix it but wanted to get this out for review now.