Skip to content

Commit 3c7da54

Browse files
committed
storage: infrastructure for independent source output streams
This PR puts in the basis for allowing a source implementation to produce an independent DD collection per output. The situation before this PR was that the `SourceRender` trait required a single, multiplexed DD collection to be produced, of type `(usize, D)`, where the `usize` designated the output. Since all outputs were multiplexed, a single frontier had to describe their overall progress, which described the "slowest" one. This is generally fine when all outputs more or less march forwards together but it's not ok when a new subsource is added to a source that has otherwise been running for a while. In this situation the upper frontier of the multiplexed collection would necessarily have to stay stuck until the new subsource finished its snapshot and caught up with the other ones, making the previously healthy sources unavailable for all this time. This PR fixes this by requiring a `BTreeMap<GlobalId, Collection>` output type from source implementations. This way each subsource can be driven independently and a new subsource can be added without imposing a frontier stall to the previously ingested subsources. This PR does not change any of the source implementations to take advantage of this new interface, since that would create a giant PR. Instead, this only changes the interface and the fallout of all the changes in the various generic parts of the pipeline. Follow up PRs will be done that target individual source implementation and change them to be directly produce multiple collections. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent ff4b6fd commit 3c7da54

File tree

18 files changed

+412
-523
lines changed

18 files changed

+412
-523
lines changed

src/storage-types/src/sources.rs

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -141,43 +141,6 @@ impl<S> IngestionDescription<S> {
141141
}
142142
}
143143

144-
impl<S: Clone> IngestionDescription<S> {
145-
pub fn indexed_source_exports(
146-
&self,
147-
primary_source_id: &GlobalId,
148-
) -> BTreeMap<GlobalId, IndexedSourceExport<S>> {
149-
let mut source_exports = BTreeMap::new();
150-
// `self.source_exports` contains all source-exports (e.g. subsources & tables) as well as
151-
// the primary source relation. It's not guaranteed that the primary source relation is
152-
// the first element in the map, however it much be set to output 0 to align with
153-
// assumptions in source implementations. This is the case even if the primary
154-
// export will not have any data output for it, since output 0 is the convention
155-
// used for errors that should halt the entire source dataflow.
156-
// TODO: See if we can simplify this to avoid needing to include the primary output
157-
// if no data will be exported to it. This requires refactoring all error output handling.
158-
let mut next_output = 1;
159-
for (id, export) in self.source_exports.iter() {
160-
let ingestion_output = if id == primary_source_id {
161-
0
162-
} else {
163-
let idx = next_output;
164-
next_output += 1;
165-
idx
166-
};
167-
168-
source_exports.insert(
169-
*id,
170-
IndexedSourceExport {
171-
ingestion_output,
172-
export: export.clone(),
173-
},
174-
);
175-
}
176-
177-
source_exports
178-
}
179-
}
180-
181144
impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
182145
fn alter_compatible(
183146
&self,
@@ -278,14 +241,6 @@ impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
278241
}
279242
}
280243

281-
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
282-
pub struct IndexedSourceExport<S = ()> {
283-
/// Which output index from the ingestion this export refers to.
284-
pub ingestion_output: usize,
285-
/// The SourceExport
286-
pub export: SourceExport<S>,
287-
}
288-
289244
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
290245
pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
291246
/// The collection metadata needed to write the exported data

src/storage/src/decode.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
571571
let health = transient_errors.map(|err: Rc<CsrConnectError>| {
572572
let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
573573
HealthStatusMessage {
574-
index: 0,
574+
id: None,
575575
namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
576576
StatusNamespace::Ssh
577577
} else {

0 commit comments

Comments
 (0)