Skip to content

Commit 2dde0c6

Browse files
committed
Sink re-work
1 parent a4e9057 commit 2dde0c6

File tree

5 files changed

+148
-205
lines changed

5 files changed

+148
-205
lines changed

server/src/main.rs

Lines changed: 74 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use getopts::Options;
1616

1717
use itertools::Itertools;
1818

19-
use timely::dataflow::channels::pact::{Exchange, Pipeline};
19+
use timely::dataflow::channels::pact::Exchange;
2020
use timely::dataflow::operators::generic::OutputHandle;
2121
use timely::dataflow::operators::{Operator, Probe};
2222
use timely::logging::TimelyEvent;
@@ -33,7 +33,8 @@ use slab::Slab;
3333
use ws::connection::{ConnEvent, Connection};
3434

3535
use declarative_dataflow::server::{Config, CreateAttribute, Request, Server, TxId};
36-
use declarative_dataflow::{Eid, Error, ImplContext, ResultDiff};
36+
use declarative_dataflow::sinks::Sinkable;
37+
use declarative_dataflow::{Eid, Error, ResultDiff};
3738

3839
/// Server timestamp type.
3940
#[cfg(not(feature = "real-time"))]
@@ -598,21 +599,25 @@ fn main() {
598599
}
599600
}
600601
Request::Interest(req) => {
602+
let interests = server.interests
603+
.entry(req.name.clone())
604+
.or_insert_with(HashSet::new);
605+
606+
// We need to check this, because we only want to setup
607+
// the dataflow on the first interest.
608+
let was_first = interests.is_empty();
609+
601610
// All workers keep track of every client's interests, s.t. they
602611
// know when to clean up unused dataflows.
612+
interests.insert(Token(client));
603613

604-
let client_token = Token(client);
605-
server.interests
606-
.entry(req.name.clone())
607-
.or_insert_with(HashSet::new)
608-
.insert(client_token);
609-
614+
// For multi-tenant flows, we need to keep track of the worker
615+
// that is managing this client's connection.
610616
if let Some(_) = req.tenant {
611617
server.tenant_owner.borrow_mut().insert(Token(client), command.owner as u64);
612618
}
613619

614-
if server.context.global_arrangement(&req.name).is_none() {
615-
620+
if was_first {
616621
let send_results_handle = send_results.clone();
617622
let send_tenant_results_handle = send_tenant_results.clone();
618623

@@ -644,112 +649,73 @@ fn main() {
644649
let mut buffer = Vec::new();
645650
let tenant_owner = server.tenant_owner.clone();
646651

647-
delayed
648-
.inner
649-
.unary(
650-
Exchange::new(move |(ref tuple, _t, _diff): &ResultDiff<T>| {
651-
let tenant: Eid = tuple[offset].clone().into();
652-
tenant_owner.borrow().get(&Token(tenant as usize)).unwrap().clone()
653-
}),
654-
"MultiTenantResults",
655-
move |_cap, _info| {
656-
move |input, _output: &mut OutputHandle<_, (), _>| {
657-
input.for_each(|_time, data| {
658-
data.swap(&mut buffer);
659-
660-
let per_tenant = buffer
661-
.drain(..)
662-
.group_by(|(tuple, _, _)| {
663-
let tenant: Eid = tuple[offset].clone().into();
664-
tenant as usize
665-
});
666-
667-
for (tenant, batch) in &per_tenant {
668-
send_tenant_results_handle
669-
.send((name.clone(), Token(tenant), batch.collect()))
670-
.unwrap();
671-
}
672-
});
673-
}
674-
})
675-
.probe_with(&mut server.probe);
652+
let pact = Exchange::new(move |(ref tuple, _t, _diff): &ResultDiff<T>| {
653+
let tenant: Eid = tuple[offset].clone().into();
654+
tenant_owner.borrow().get(&Token(tenant as usize)).unwrap().clone()
655+
});
656+
657+
let sunk = match req.sink {
658+
Some(sink) => sink.sink(&delayed.inner, pact).expect("sinking failed"),
659+
None => {
660+
delayed
661+
.inner
662+
.unary(pact, "MultiTenantResults", move |_cap, _info| {
663+
move |input, _output: &mut OutputHandle<_, ResultDiff<T>, _>| {
664+
input.for_each(|_time, data| {
665+
data.swap(&mut buffer);
666+
667+
let per_tenant = buffer
668+
.drain(..)
669+
.group_by(|(tuple, _, _)| {
670+
let tenant: Eid = tuple[offset].clone().into();
671+
tenant as usize
672+
});
673+
674+
for (tenant, batch) in &per_tenant {
675+
send_tenant_results_handle
676+
.send((name.clone(), Token(tenant), batch.collect()))
677+
.unwrap();
678+
}
679+
});
680+
}
681+
})
682+
}
683+
};
684+
685+
sunk.probe_with(&mut server.probe);
676686
} else {
677-
delayed
678-
.inner
679-
.unary_notify(
680-
Exchange::new(move |_| owner as u64),
681-
"ResultsRecv",
682-
vec![],
683-
move |input, _output: &mut OutputHandle<_, (), _>, _notificator| {
684-
685-
// due to the exchange pact, this closure is only
686-
// executed by the owning worker
687-
688-
// @TODO only forward inputs up to the frontier!
689-
690-
input.for_each(|_time, data| {
691-
send_results_handle
692-
.send((name.clone(), data.to_vec()))
693-
.unwrap();
694-
});
695-
})
696-
.probe_with(&mut server.probe);
697-
}
687+
let pact = Exchange::new(move |_| owner as u64);
688+
689+
let sunk = match req.sink {
690+
Some(sink) => sink.sink(&delayed.inner, pact).expect("sinking failed"),
691+
None => {
692+
delayed
693+
.inner
694+
.unary(pact, "ResultsRecv", move |_cap, _info| {
695+
move |input, _output: &mut OutputHandle<_, ResultDiff<T>, _>| {
696+
// due to the exchange pact, this closure is only
697+
// executed by the owning worker
698+
699+
// @TODO only forward inputs up to the frontier!
700+
701+
input.for_each(|_time, data| {
702+
send_results_handle
703+
.send((name.clone(), data.to_vec()))
704+
.unwrap();
705+
});
706+
}
707+
})
708+
}
709+
};
698710

711+
sunk.probe_with(&mut server.probe);
712+
}
699713
}
700714
}
701715
});
702716
}
703717
}
704718
Request::Uninterest(name) => server.uninterest(Token(command.client), &name),
705-
Request::Flow(source, sink) => {
706-
// @TODO?
707-
// We treat sinks as single-use right now.
708-
match server.context.internal.sinks.remove(&sink) {
709-
None => {
710-
let error = Error {
711-
category: "df.error.category/not-found",
712-
message: format!("Unknown sink {}", sink),
713-
};
714-
send_errors.send((Token(client), error, last_tx)).unwrap();
715-
}
716-
Some(mut sink_handle) => {
717-
let server_handle = &mut server;
718-
let send_errors_handle = &send_errors;
719-
720-
worker.dataflow::<T, _, _>(move |scope| {
721-
match server_handle.interest(&source, scope) {
722-
Err(error) => {
723-
send_errors_handle.send((Token(client), error, last_tx)).unwrap();
724-
}
725-
Ok(relation) => {
726-
// @TODO Ideally we only ever want to "send" references
727-
// to local trace batches.
728-
relation
729-
.inner
730-
.sink(Pipeline, "Flow", move |input| {
731-
input.for_each(|_time, data| {
732-
for (tuple, time, diff) in data.to_vec().drain(..) {
733-
sink_handle.update_at(tuple, time, diff);
734-
}
735-
});
736-
737-
let frontier = input.frontier().frontier();
738-
if frontier.is_empty() {
739-
// @TODO
740-
// sink_handle.close();
741-
sink_handle.flush();
742-
} else {
743-
sink_handle.advance_to(frontier[0]);
744-
sink_handle.flush();
745-
}
746-
});
747-
}
748-
}
749-
});
750-
}
751-
}
752-
}
753719
Request::Register(req) => {
754720
if let Err(error) = server.register(req) {
755721
send_errors.send((Token(client), error, last_tx)).unwrap();
@@ -762,13 +728,6 @@ fn main() {
762728
}
763729
});
764730
}
765-
Request::RegisterSink(req) => {
766-
worker.dataflow::<T, _, _>(|scope| {
767-
if let Err(error) = server.register_sink(req, scope) {
768-
send_errors.send((Token(client), error, last_tx)).unwrap();
769-
}
770-
});
771-
}
772731
Request::CreateAttribute(CreateAttribute { name, config }) => {
773732
worker.dataflow::<T, _, _>(|scope| {
774733
if let Err(error) = server.context.internal.create_attribute(&name, config, scope) {

src/domain/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ pub struct Domain<T: Timestamp + Lattice + TotalOrder> {
2626
now_at: T,
2727
/// Input handles to attributes in this domain.
2828
input_sessions: HashMap<String, InputSession<T, (Value, Value), isize>>,
29-
/// Input handles to named sinks in this domain.
30-
pub sinks: HashMap<String, InputSession<T, Vec<Value>, isize>>,
3129
/// The probe keeping track of progress in this domain.
3230
probe: ProbeHandle<T>,
3331
/// Configurations for attributes in this domain.
@@ -51,7 +49,6 @@ where
5149
Domain {
5250
now_at: start_at,
5351
input_sessions: HashMap::new(),
54-
sinks: HashMap::new(),
5552
probe: ProbeHandle::new(),
5653
attributes: HashMap::new(),
5754
forward: HashMap::new(),

src/server/mod.rs

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@ use timely::order::TotalOrder;
1212
use timely::progress::Timestamp;
1313

1414
use differential_dataflow::collection::Collection;
15-
use differential_dataflow::input::Input;
1615
use differential_dataflow::lattice::Lattice;
1716

1817
use crate::domain::Domain;
1918
use crate::plan::{ImplContext, Implementable};
20-
use crate::sinks::{Sink, Sinkable};
19+
use crate::sinks::Sink;
2120
use crate::sources::{Source, Sourceable};
2221
use crate::Rule;
2322
use crate::{
@@ -69,6 +68,8 @@ pub struct Interest {
6968
/// Granularity (in seconds or tx ids) at which to send
7069
/// results. None indicates no delay.
7170
pub granularity: Option<u64>,
71+
/// An optional sink configuration.
72+
pub sink: Option<Sink>,
7273
}
7374

7475
/// A request with the intent of synthesising one or more new rules
@@ -81,16 +82,6 @@ pub struct Register {
8182
pub publish: Vec<String>,
8283
}
8384

84-
/// A request with the intent of attaching an external system as a
85-
/// named sink.
86-
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
87-
pub struct RegisterSink {
88-
/// A globally unique name.
89-
pub name: String,
90-
/// A sink configuration.
91-
pub sink: Sink,
92-
}
93-
9485
/// A request with the intent of creating a new named, globally
9586
/// available input that can be transacted upon.
9687
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
@@ -113,16 +104,11 @@ pub enum Request {
113104
/// stopped. Once all interested clients have sent this, the
114105
/// dataflow can be cleaned up.
115106
Uninterest(String),
116-
/// Expresses interest in a named relation, but directing results
117-
/// to be forwarded to a sink.
118-
Flow(String, String),
119107
/// Registers one or more named relations.
120108
Register(Register),
121109
/// A request with the intent of attaching to an external data
122110
/// source that publishes one or more attributes and relations.
123111
RegisterSource(Source),
124-
/// Registers an external data sink.
125-
RegisterSink(RegisterSink),
126112
/// Creates a named input handle that can be `Transact`ed upon.
127113
CreateAttribute(CreateAttribute),
128114
/// Advances the specified domain to the specified time.
@@ -435,23 +421,6 @@ impl<Token: Hash + Eq + Copy> Server<u64, Token> {
435421

436422
Ok(())
437423
}
438-
439-
/// Handle a RegisterSink request.
440-
pub fn register_sink<S: Scope<Timestamp = u64>>(
441-
&mut self,
442-
req: RegisterSink,
443-
scope: &mut S,
444-
) -> Result<(), Error> {
445-
let RegisterSink { name, sink } = req;
446-
447-
let (input, collection) = scope.new_collection();
448-
449-
sink.sink(&collection.inner)?;
450-
451-
self.context.internal.sinks.insert(name, input);
452-
453-
Ok(())
454-
}
455424
}
456425

457426
#[cfg(feature = "real-time")]
@@ -470,21 +439,4 @@ impl<Token: Hash + Eq + Copy> Server<std::time::Duration, Token> {
470439

471440
Ok(())
472441
}
473-
474-
/// Handle a RegisterSink request.
475-
pub fn register_sink<S: Scope<Timestamp = std::time::Duration>>(
476-
&mut self,
477-
req: RegisterSink,
478-
scope: &mut S,
479-
) -> Result<(), Error> {
480-
let RegisterSink { name, sink } = req;
481-
482-
let (input, collection) = scope.new_collection();
483-
484-
sink.sink(&collection.inner)?;
485-
486-
self.context.internal.sinks.insert(name, input);
487-
488-
Ok(())
489-
}
490442
}

0 commit comments

Comments
 (0)