Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
501 changes: 230 additions & 271 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion hrctl/app/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func schedule(c *cli.Context) error {
return errors.Wrap(err, "unable to get job from file")
}

conn, err := connect(c.String("manager"))
conn, err := connect(c.GlobalString("manager"))
if err != nil {
return errors.Wrap(err, "unable to connect to manager")
}
Expand Down
8 changes: 4 additions & 4 deletions manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ clap = "2.26"
config = "0.8.0"
failure = "0.1.1"
fern = "0.5.3"
futures = "0.1.18"
grpc = "0.2.1"
futures = "0.1.20"
grpc = "0.3.0"
heracles-proto = { path = "../proto"}
lapin-futures = "0.10.0"
lapin-futures = "0.11.1"
lazy_static = "1.0"
log = "0.4"
protobuf = "1.4"
rayon = "1.0.0"
tokio = "0.1.4"
tokio = "0.1.5"
uuid = { version = "0.6", features = ["v4"] }
95 changes: 70 additions & 25 deletions manager/src/broker/amqp.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,113 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;

use lapin::channel::{BasicProperties, BasicPublishOptions, Channel, QueueDeclareOptions};
use lapin::client::{Client, ConnectionOptions};
use lapin::types::FieldTable;
use protobuf::Message;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio;

use super::*;
use settings::SETTINGS;

pub struct AMQPBrokerConnection {
channel: Channel<TcpStream>,
queue_name: String,
channel: Arc<Channel<TcpStream>>,
}

impl BrokerConnection for AMQPBrokerConnection {
/// Sends a `Task` to the broker.
///
/// The `Option<bool>` returned represents whether the message was acked (`Some(true)`), nacked
/// (`Some(false)`), or the queue is not a confirm queue (`None`).
fn send<'a>(&'a self, task: &'a Task) -> Box<Future<Item = Option<bool>, Error = Error> + 'a> {
fn send(&self, task: Task) -> Box<Future<Item = Option<bool>, Error = Error> + Send + 'static> {
let task_id = task.get_id().to_string();
let ch = self.channel.clone();
let queue_name: String = SETTINGS.read().unwrap().get("broker.queue_name").unwrap();

let ret = future::lazy(move || future::done(task.write_to_bytes()))
.map_err(|e| e.context(BrokerError::TaskSerialisationFailure { task_id }))
.from_err()
.and_then(move |bytes| {
self.channel
.basic_publish(
"",
&self.queue_name,
&bytes,
&BasicPublishOptions::default(),
BasicProperties::default(),
)
.from_err()
info!("publishing task");

ch.basic_publish(
"",
&queue_name.as_str(),
&bytes,
&BasicPublishOptions::default(),
BasicProperties::default(),
).from_err()
});
Box::new(ret)
}
}

pub fn connect(addr: SocketAddr) -> impl Future<Item = AMQPBrokerConnection, Error = Error> {
let queue_name = SETTINGS.read().unwrap().get("broker_queue_name").unwrap();
// pub fn connect(addr: SocketAddr) -> Result<AMQPBrokerConnection, Error> {
// let queue_name: String = SETTINGS.read().unwrap().get("broker.queue_name").unwrap();
// let queue_options = QueueDeclareOptions {
// durable: true,
// ..Default::default()
// };

// tokio::run(TcpStream::connect(&addr)
// .and_then(|stream| Client::connect(stream, &ConnectionOptions::default()))
// .and_then(|(client, _)| client.create_channel())
// .and_then(move |channel| {
// channel
// .queue_declare(&queue_name.as_str(), &queue_options, &FieldTable::new())
// .and_then(move |_| {
// info!("AMQP queue `{}` successfully declared.", queue_name);
// future::ok(channel)
// })
// })
// .map_err(|e| e.context(BrokerError::ConnectionFailed).into())
// .and_then(move |channel| {
// AMQPBrokerConnection {
// channel: Arc::new(channel),
// }
// }))
// }


pub fn connect(addr: SocketAddr) -> Result<AMQPBrokerConnection, Error> {
let queue_name: String = SETTINGS.read().unwrap().get("broker.queue_name").unwrap();
let queue_options = QueueDeclareOptions {
durable: true,
..Default::default()
};

TcpStream::connect(&addr)
// I assume this can't actually be here.
// let mut bc = AMQPBrokerConnection{
// channel: Arc::new(),
// };

let broker_conn = TcpStream::connect(&addr)
.and_then(|stream| Client::connect(stream, &ConnectionOptions::default()))
.and_then(|(client, _)| client.create_channel())
.and_then(move |channel| {
channel
.queue_declare(queue_name, &queue_options, &FieldTable::new())
.queue_declare(&queue_name.as_str(), &queue_options, &FieldTable::new())
.and_then(move |_| {
info!("AMQP queue `{}` successfully declared.", queue_name);
future::ok(channel)
})
})
.map_err(|e| e.context(BrokerError::ConnectionFailed).into())
.and_then(move |channel| {
future::ok(AMQPBrokerConnection {
channel,
queue_name: queue_name.to_string(),
})
})
}
// .map_err(|e| e.context(BrokerError::ConnectionFailed).into())
.map_err(|e| error!("{}", e))
.and_then(|channel| {
// Add the channel to the broker connection
// bc.channel = Arc::new(channel);
future::ok(())
});

// This is definatelly bad, but we can't just do run because it will block
thread::spawn(move || {
tokio::run(broker_conn);
});

Ok(AMQPBrokerConnection{
channel: Arc::new(broker_conn),
})
}
4 changes: 2 additions & 2 deletions manager/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use futures::Future;

use heracles_proto::datatypes::Task;

pub trait BrokerConnection {
fn send<'a>(&'a self, &'a Task) -> Box<Future<Item = Option<bool>, Error = Error> + 'a>;
pub trait BrokerConnection: Send + Sync {
fn send(&self, Task) -> Box<Future<Item = Option<bool>, Error = Error> + Send + 'static>;
}

#[derive(Debug, Fail)]
Expand Down
30 changes: 26 additions & 4 deletions manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use failure::*;
use tokio::prelude::*;

use heracles_manager::settings::SETTINGS;
use heracles_manager::{broker, optparse, settings};
use heracles_manager::{broker, optparse, scheduler, server, state, settings};

use std::path::PathBuf;
use std::sync::Arc;
use std::thread;

fn main() {
if let Err(err) = run() {
Expand All @@ -27,13 +31,31 @@ fn run() -> Result<(), Error> {
let arg_matches = optparse::parse_cmd_options();
settings::init(&arg_matches)?;

let broker_addr = SETTINGS.read().unwrap().get("broker_address")?;
let broker_conn = broker::amqp::connect(broker_addr);
let broker_addr = SETTINGS.read().unwrap().get("broker.address")?;
let broker_conn = Arc::new(broker::amqp::connect(broker_addr)?);

let state_location: String = SETTINGS.read().unwrap().get("state.location")?;
let store = Arc::new(state::FileStore::new(&PathBuf::from(state_location))?);

let schdlr = Arc::new(scheduler::Scheduler::new(broker_conn, store)?);

let srv = server::Server::new(schdlr.clone())?;

thread::spawn(move || {
loop {
srv.is_alive();
}
});

// thread::spawn(move || {
// tokio::spawn(broker_conn.channel.clone());
// });

info!("Starting main event loop.");
// We give this an empty future so that it will never terminate and continue driving other
// futures to completion.
tokio::run(future::empty());
tokio::run(schdlr.run());
// tokio::run(schdl.run(broker_conn))
Ok(())
}

Expand Down
10 changes: 8 additions & 2 deletions manager/src/optparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn parse_cmd_options<'a>() -> ArgMatches<'a> {
.author("Heracles Authors <[email protected]>")
.about("Scheduling service for the Heracles network.")
.arg(
Arg::with_name("input_chunk_size")
Arg::with_name("scheduler.input_chunk_size")
.help("The size (in MiB) of the chunks created from the input files.")
.long("input-chunk-size")
.long_help(
Expand All @@ -17,6 +17,12 @@ Each chunk corresponds to one map task, so this can be used to scale the job.",
)
.takes_value(true),
)
.arg(
Arg::with_name("scheduler.input_queue_size")
.help("The size of the buffer to the input queue of the scheduler.")
.long("scheduler-input-queue-size")
.takes_value(true),
)
.arg(
Arg::with_name("broker.address")
.help("The address of the broker server the manager should connect to.")
Expand All @@ -30,7 +36,7 @@ Each chunk corresponds to one map task, so this can be used to scale the job.",
.takes_value(true),
)
.arg(
Arg::with_name("server_port")
Arg::with_name("server.port")
.help("Port on which the gRPC server is running")
.long("server-port")
.takes_value(true),
Expand Down
Loading