Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ kube = { version = "1.1", features = ["derive", "runtime"] }
log = "0.4"
rand = "0.8"
uuid = { version = "1.6", features = ["v4"] }
tower = "0.5"

object_store = { version = "0.12.0", features = [
"aws",
Expand Down
12 changes: 9 additions & 3 deletions src/bin/distributed-datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use clap::Parser;
use distributed_datafusion::{
friendly::new_friendly_name, proxy_service::DDProxyService, setup,
worker_service::DDWorkerService,
worker_service::DDWorkerService, worker_discovery::{EnvDiscovery, WorkerDiscovery},
};

#[derive(Parser)]
Expand Down Expand Up @@ -30,10 +30,16 @@ async fn main() -> Result<()> {
setup();

let args = Args::parse();

match args.mode.as_str() {
"proxy" => {
let service = DDProxyService::new(new_friendly_name()?, args.port, None).await?;
// TODO: put the k8s or ENV decision behind some flag, WARNING: this will kick the discovery so workers should be up
let discovery: Arc<dyn WorkerDiscovery> = if std::env::var("DD_WORKER_ADDRESSES").is_ok() {
Arc::new(EnvDiscovery::new().await?)
} else {
Arc::new(K8sDiscovery::new().await?)
};
let service = DDProxyService::new(new_friendly_name()?, args.port,discovery, None).await?;
service.serve().await?;
}
"worker" => {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub mod util;
pub mod vocab;
pub mod worker_discovery;
pub mod worker_service;
pub mod transport_traits;
pub mod transport;
pub mod test_worker;

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
Expand Down
195 changes: 78 additions & 117 deletions src/planning.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
collections::HashMap,
env,
sync::{Arc, LazyLock},
collections::HashMap, env, sync::{Arc, LazyLock}
};

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -30,19 +28,10 @@ use itertools::Itertools;
use prost::Message;

use crate::{
analyze::{DistributedAnalyzeExec, DistributedAnalyzeRootExec},
isolator::PartitionIsolatorExec,
logging::{debug, error, info, trace},
max_rows::MaxRowsExec,
physical::DDStageOptimizerRule,
result::{DDError, Result},
stage::DDStageExec,
stage_reader::{DDStageReaderExec, QueryId},
util::{display_plan_with_partition_counts, get_client, physical_plan_to_bytes, wait_for},
vocab::{
analyze::{DistributedAnalyzeExec, DistributedAnalyzeRootExec}, isolator::PartitionIsolatorExec, logging::{debug, info, trace, error}, max_rows::MaxRowsExec, physical::DDStageOptimizerRule, result::{DDError, Result}, stage::DDStageExec, stage_reader::{DDStageReaderExec, QueryId}, transport::WorkerTransport, util::{display_plan_with_partition_counts, physical_plan_to_bytes, wait_for}, vocab::{
Addrs, CtxAnnotatedOutputs, CtxHost, CtxPartitionGroup, CtxStageAddrs, CtxStageId, DDTask,
Host, Hosts, PartitionAddrs, StageAddrs,
},
}
};

#[derive(Debug)]
Expand Down Expand Up @@ -439,109 +428,84 @@ pub fn add_distributed_analyze(
pub async fn distribute_stages(
query_id: &str,
stages: Vec<DDStage>,
worker_addrs: Vec<Host>,
workers: Vec<(Host, Arc<dyn WorkerTransport>)>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<(Addrs, Vec<DDTask>)> {
// map of worker name to address
// FIXME: use types over tuples of strings, as we can accidently swap them and
// not know

// a map of worker name to host
let mut workers: HashMap<String, Host> = worker_addrs
.iter()
.map(|host| (host.name.clone(), host.clone()))
// materialise a name-keyed map so we can remove “bad” workers on each retry
let mut valid_workers: HashMap<_, _> = workers
.into_iter()
.map(|(h, tx)| (h.name.clone(), (h, tx)))
.collect();

for attempt in 0..3 {
if workers.is_empty() {
if valid_workers.is_empty() {
return Err(anyhow!("No workers available to distribute stages").into());
}

// all stages to workers
let (task_datas, final_addrs) =
assign_to_workers(query_id, &stages, workers.values().collect(), codec)?;
let current: Vec<_> = valid_workers.values().cloned().collect();
let (tasks, final_addrs, tx_host_pairs) =
assign_to_workers(query_id, &stages, current, codec)?;

match try_distribute_tasks(&tasks, &tx_host_pairs).await {
Ok(_) => return Ok((final_addrs, tasks)),

// we retry this a few times to ensure that the workers are ready
// and can accept the stages
match try_distribute_tasks(&task_datas).await {
Ok(_) => return Ok((final_addrs, task_datas)),
Err(DDError::WorkerCommunicationError(bad_worker)) => {
// remove the poisoned worker and retry on the non poisoned workers
Err(DDError::WorkerCommunicationError(bad_host)) => {
error!(
"distribute stages for query {query_id} attempt {attempt} failed removing \
worker {bad_worker}. Retrying..."
"distribute_stages: attempt {attempt} – \
worker {} failed; will retry without it",
bad_host.name
);
// if we cannot communicate with a worker, we remove it from the list of workers
workers.remove(&bad_worker.name);
valid_workers.remove(&bad_host.name);
}

// any other error is terminal
Err(e) => return Err(e),
}
if attempt == 2 {
return Err(
anyhow!("Failed to distribute query {query_id} stages after 3 attempts").into(),
);
}
}
unreachable!()

unreachable!("retry loop exits on success or early return on error");
}

/// try to distribute the stages to the workers, if we cannot communicate with a
/// worker return it as the element in the Err
async fn try_distribute_tasks(task_datas: &[DDTask]) -> Result<()> {
// we can use the stage data to distribute the stages to workers
for task_data in task_datas {
async fn try_distribute_tasks(
tasks: &[DDTask],
tx_host_pairs: &[(Arc<dyn WorkerTransport>, Host)],
) -> Result<()> {
for ((task, (tx, host))) in tasks.iter().zip(tx_host_pairs) {
trace!(
"Distributing Task: stage_id {}, pg: {:?} to worker: {:?}",
task_data.stage_id,
task_data.partition_group,
task_data.assigned_host
"Sending stage {} / pg {:?} to {}",
task.stage_id,
task.partition_group,
host
);

// populate its child stages
let mut stage_data = task_data.clone();
stage_data.stage_addrs = Some(get_stage_addrs_from_tasks(
&stage_data.child_stage_ids,
task_datas,
// embed the StageAddrs of all children before shipping
let mut stage = task.clone();
stage.stage_addrs = Some(get_stage_addrs_from_tasks(
&stage.child_stage_ids,
tasks,
)?);

let host = stage_data
.assigned_host
.clone()
.context("Assigned host is missing for task data")?;

let mut client = match get_client(&host) {
Ok(client) => client,
Err(e) => {
error!("Couldn't not communicate with worker {e:#?}");
return Err(DDError::WorkerCommunicationError(
host.clone(), // here
));
}
};

let mut buf = vec![];
stage_data
.encode(&mut buf)
.context("Failed to encode stage data to buf")?;
let mut buf = Vec::new();
stage.encode(&mut buf).map_err(anyhow::Error::from)?;

let action = Action {
r#type: "add_plan".to_string(),
r#type: "add_plan".into(),
body: buf.into(),
};

let mut response = client
// gRPC call, if it fails, transport poisons itself on failure and removes the address from the registry
let mut stream = tx
.do_action(action)
.await
.context("Failed to send action to worker")?;
.map_err(|_| DDError::WorkerCommunicationError(host.clone()))?;

// consume this empty response to ensure the action was successful
while let Some(_res) = response
.try_next()
.await
.context("error consuming do_action response")?
{
// we don't care about the response, just that it was successful
}
trace!("do action success for stage_id: {}", stage_data.stage_id);
// drain the (empty) response – ensures the worker actually accepted it
while stream.try_next().await? != None {}

trace!("stage {} delivered to {}", stage.stage_id, host);
}
Ok(())
}
Expand All @@ -552,40 +516,35 @@ async fn try_distribute_tasks(task_datas: &[DDTask]) -> Result<()> {
fn assign_to_workers(
query_id: &str,
stages: &[DDStage],
worker_addrs: Vec<&Host>,
workers: Vec<(Host, Arc<dyn WorkerTransport>)>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<(Vec<DDTask>, Addrs)> {
let mut task_datas = vec![];
let mut worker_idx = 0;
) -> Result<(Vec<DDTask>, Addrs, Vec<(Arc<dyn WorkerTransport>, Host)>)> {
let mut task_datas = Vec::new();
let mut tx_host_pairs = Vec::new();

trace!(
"assigning stages: {:?}",
stages
.iter()
.map(|s| format!("stage_id: {}, pgs:{:?}", s.stage_id, s.partition_groups))
.join(",\n")
);
// round-robin scheduler
let mut idx = 0;
let n_workers = workers.len();

// keep track of which worker has the root of the plan tree (highest stage
// number)
let mut max_stage_id = -1;
// keep track of where the root of the plan will live (highest stage id)
let mut max_stage_id: i64 = -1;
let mut final_addrs = Addrs::default();

for stage in stages {
for partition_group in stage.partition_groups.iter() {
for pg in &stage.partition_groups {
let plan_bytes = physical_plan_to_bytes(stage.plan.clone(), codec)?;

let host = worker_addrs[worker_idx].clone();
worker_idx = (worker_idx + 1) % worker_addrs.len();
// pick next worker
let (host, tx) = workers[idx].clone();
idx = (idx + 1) % n_workers;

if stage.stage_id as isize > max_stage_id {
// this wasn't the last stage
max_stage_id = stage.stage_id as isize;
// remember which host serves the final stage
if stage.stage_id as i64 > max_stage_id {
max_stage_id = stage.stage_id as i64;
final_addrs.clear();
}
if stage.stage_id as isize == max_stage_id {
for part in partition_group.iter() {
// we are the final stage, so we will be the one to serve this partition
if stage.stage_id as i64 == max_stage_id {
for part in pg {
final_addrs
.entry(stage.stage_id)
.or_default()
Expand All @@ -595,22 +554,24 @@ fn assign_to_workers(
}
}

let task_data = DDTask {
query_id: query_id.to_string(),
task_datas.push(DDTask {
query_id: query_id.to_owned(),
stage_id: stage.stage_id,
plan_bytes,
partition_group: partition_group.to_vec(),
child_stage_ids: stage.child_stage_ids().unwrap_or_default().to_vec(),
stage_addrs: None, // will be calculated and filled in later
partition_group: pg.clone(),
child_stage_ids: stage.child_stage_ids().unwrap_or_default(),
stage_addrs: None, // filled in later
num_output_partitions: stage.plan.output_partitioning().partition_count() as u64,
full_partitions: stage.full_partitions,
assigned_host: Some(host),
};
task_datas.push(task_data);
assigned_host: Some(host.clone()),
});

// keep the order **exactly** aligned with task_datas
tx_host_pairs.push((tx, host));
}
}

Ok((task_datas, final_addrs))
Ok((task_datas, final_addrs, tx_host_pairs))
}

fn get_stage_addrs_from_tasks(target_stage_ids: &[u64], stages: &[DDTask]) -> Result<StageAddrs> {
Expand Down
14 changes: 7 additions & 7 deletions src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,30 @@ use crate::{
stage_reader::DDStageReaderExec,
util::{display_plan_with_partition_counts, get_addrs, start_up},
vocab::{Addrs, Host},
worker_discovery::get_worker_addresses,
worker_discovery::{ WorkerDiscovery},
};

pub struct DDProxyHandler {
/// our host info, useful for logging
pub host: Host,

pub planner: QueryPlanner,
pub discovery: Arc<dyn WorkerDiscovery>,

/// Optional customizer for our context and proto serde
pub customizer: Option<Arc<dyn Customizer>>,
}

impl DDProxyHandler {
pub fn new(name: String, addr: String, customizer: Option<Arc<dyn Customizer>>) -> Self {
// call this function to bootstrap the worker discovery mechanism
get_worker_addresses().expect("Could not get worker addresses upon startup");

pub fn new(name: String, addr: String, discovery: Arc<dyn WorkerDiscovery>, customizer: Option<Arc<dyn Customizer>>) -> Self {
let host = Host {
name: name.clone(),
addr: addr.clone(),
};
Self {
host: host.clone(),
planner: QueryPlanner::new(customizer.clone()),
planner: QueryPlanner::new(customizer.clone(), discovery.clone()),
discovery,
customizer,
}
}
Expand Down Expand Up @@ -292,6 +291,7 @@ impl DDProxyService {
pub async fn new(
name: String,
port: usize,
discovery: Arc<dyn WorkerDiscovery>,
ctx_customizer: Option<Arc<dyn Customizer>>,
) -> Result<Self> {
debug!("Creating DDProxyService!");
Expand All @@ -305,7 +305,7 @@ impl DDProxyService {

info!("DDProxyService bound to {addr}");

let handler = Arc::new(DDProxyHandler::new(name, addr.clone(), ctx_customizer));
let handler = Arc::new(DDProxyHandler::new(name, addr.clone(), discovery, ctx_customizer));

Ok(Self {
listener,
Expand Down
Loading
Loading