diff --git a/Cargo.lock b/Cargo.lock index ebdc15b..d6498fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,7 +411,7 @@ dependencies = [ "rocksdb 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "stream-cancel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -953,7 +953,7 @@ dependencies = [ "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "twox-hash 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -975,7 +975,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1002,7 +1002,7 @@ dependencies = [ "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rust_decimal 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1098,7 +1098,7 @@ dependencies = [ "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1131,6 +1131,7 @@ dependencies = [ "noria 0.1.2 (git+https://github.com/mit-pdos/noria.git)", "noria-server 0.1.0 (git+https://github.com/mit-pdos/noria.git)", "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1165,7 +1166,7 @@ dependencies = [ "rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1694,7 +1695,7 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.40" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2753,7 +2754,7 @@ dependencies = [ "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" "checksum serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)" = "fec2851eb56d010dc9a21b89ca53ee75e6528bab60c11e89d38390904982da9f" "checksum serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)" = "cb4dc18c61206b08dc98216c98faa0232f4337e1e1b8574551d5bad29ea1b425" -"checksum serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)" = "051c49229f282f7c6f3813f8286cc1e3323e8051823fce42c7ea80fe13521704" +"checksum serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)" = "2f72eb2a68a7dc3f9a691bfda9305a1c017a6215e5a4545c258500d2099a37c2" "checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" "checksum sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b4d8bfd0e469f417657573d8451fb33d16cfe0989359b93baf3a1ffc639543d" "checksum shlex 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" diff --git a/Cargo.toml b/Cargo.toml index af65452..c4fd86b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ chrono = "0.4" tokio-signal = "0.2" tracing = "0.1.3" tracing-subscriber = "0.1" +serde_json = "1.0.41" [dev-dependencies] mysql = "16" diff --git a/src/backend.rs b/src/backend.rs index 63f1f1b..649f7a2 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -1,5 +1,5 @@ use noria::{ - DataType, SyncControllerHandle, SyncTable, SyncView, TableOperation, ZookeeperAuthority, + consensus::Authority, DataType, SyncControllerHandle, SyncTable, SyncView, TableOperation, }; use failure; @@ -46,17 +46,18 @@ impl fmt::Debug for PreparedStatement { } } -struct NoriaBackendInner { - noria: SyncControllerHandle, +struct NoriaBackendInner { + noria: SyncControllerHandle, inputs: BTreeMap, outputs: BTreeMap, } -impl NoriaBackendInner +impl NoriaBackendInner where E: tokio::executor::Executor, + A: Authority + 'static, { - fn new(mut ch: SyncControllerHandle) -> Self { + fn new(mut ch: SyncControllerHandle) -> Self { NoriaBackendInner { inputs: ch .inputs() @@ -109,8 +110,8 @@ where } } -pub struct NoriaBackend { - inner: NoriaBackendInner, +pub struct NoriaBackend { + inner: NoriaBackendInner, log: slog::Logger, ops: Arc, trace_every: Option, @@ -135,12 +136,13 @@ pub struct NoriaBackend { static_responses: bool, } -impl NoriaBackend +impl NoriaBackend where E: tokio::executor::Executor, + A: Authority + 'static, { pub fn new( - ch: SyncControllerHandle, + ch: SyncControllerHandle, auto_increments: Arc>>, query_cache: Arc>>, (ops, trace_every): (Arc, Option), @@ -910,9 +912,10 @@ where } } -impl MysqlShim for &mut NoriaBackend +impl MysqlShim for &mut NoriaBackend where E: tokio::executor::Executor, + A: 'static + Authority, { type Error = io::Error; diff --git a/src/main.rs b/src/main.rs index b26a5db..a26e04c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,9 @@ mod utils; use crate::backend::NoriaBackend; use msql_srv::MysqlIntermediary; use nom_sql::SelectStatement; -use noria::{SyncControllerHandle, ZookeeperAuthority}; +use noria::consensus::{Authority, LocalAuthority, ZookeeperAuthority}; +use noria::{ControllerDescriptor, SyncControllerHandle}; +use serde_json; use std::collections::HashMap; use std::io::{self, BufReader, BufWriter}; use std::sync::atomic::{self, AtomicUsize}; @@ -56,8 +58,7 @@ fn main() { Arg::with_name("zk_addr") .long("zookeeper-address") .short("z") - .default_value("127.0.0.1:2181") - .help("IP:PORT for Zookeeper."), + .help("IP:PORT for Zookeeper. Defaults to 127.0.0.1:2181 if neither this nor server-address is set."), ) .arg( Arg::with_name("port") @@ -67,6 +68,15 @@ fn main() { .takes_value(true) .help("Port to listen on."), ) + .arg( + Arg::with_name("server_addr") + .long("server-address") + .short("h") + .takes_value(true) + .required_unless("zk_addr") + .conflicts_with("zk_addr") + .help("IP:PORT for the Noria Server. Either this ore zookeeper-address is required"), + ) .arg( Arg::with_name("slowlog") .long("log-slow") @@ -103,26 +113,13 @@ fn main() { None }; let slowlog = matches.is_present("slowlog"); - let zk_addr = matches.value_of("zk_addr").unwrap().to_owned(); let sanitize = !matches.is_present("no-sanitize"); let static_responses = !matches.is_present("no-static-responses"); - let listener = tokio::net::tcp::TcpListener::bind(&std::net::SocketAddr::new( - std::net::Ipv4Addr::LOCALHOST.into(), - port, - )) - .unwrap(); - let log = logger_pls(); info!(log, "listening on port {}", port); - let auto_increments: Arc>> = Arc::default(); - let query_cache: Arc>> = Arc::default(); - - let mut zk_auth = ZookeeperAuthority::new(&format!("{}/{}", zk_addr, deployment)).unwrap(); - zk_auth.log_with(log.clone()); - debug!(log, "Connecting to Noria...",); let s = tracing_subscriber::fmt::format::Format::default() .with_timer(tracing_subscriber::fmt::time::Uptime::default()); @@ -130,10 +127,76 @@ fn main() { .on_event(s) .finish(); let tracer = tracing::Dispatch::new(s); - let mut rt = tracing::dispatcher::with_default(&tracer, tokio::runtime::Runtime::new).unwrap(); - let ch = SyncControllerHandle::new(zk_auth, rt.executor()).unwrap(); + let rt = tracing::dispatcher::with_default(&tracer, tokio::runtime::Runtime::new).unwrap(); + debug!(log, "Connected!"); + match (matches.value_of("zk_addr"), matches.value_of("server_addr")) { + (None, Some(addr)) => { + let lcl_auth = LocalAuthority::new(); + let saddr = addr.parse().unwrap(); + let cd = ControllerDescriptor { + external_addr: saddr, + worker_addr: saddr, + domain_addr: saddr, + nonce: 0, + }; + let descriptor_bytes = serde_json::to_vec(&cd).unwrap(); + lcl_auth.become_leader(descriptor_bytes).unwrap(); + let ch = SyncControllerHandle::new(lcl_auth, rt.executor()).unwrap(); + run( + rt, + ch, + log.clone(), + port, + slowlog, + static_responses, + sanitize, + trace_every, + ) + } + (maybe_addr, None) => { + let addr = maybe_addr.unwrap_or("127.0.0.1:2181"); + let mut zk_auth = ZookeeperAuthority::new(&format!("{}/{}", addr, deployment)).unwrap(); + zk_auth.log_with(log.clone()); + let ch = SyncControllerHandle::new(zk_auth, rt.executor()).unwrap(); + run( + rt, + ch, + log.clone(), + port, + slowlog, + static_responses, + sanitize, + trace_every, + ) + } + (Some(_), Some(_)) => unreachable!(), + }; +} + +fn run( + mut rt: tokio::runtime::Runtime, + ch: SyncControllerHandle, + log: slog::Logger, + port: u16, + slowlog: bool, + static_responses: bool, + sanitize: bool, + trace_every: Option, +) where + A: Authority + 'static, + E: tokio::executor::Executor + Clone + Send + 'static, +{ + let listener = tokio::net::tcp::TcpListener::bind(&std::net::SocketAddr::new( + std::net::Ipv4Addr::LOCALHOST.into(), + port, + )) + .unwrap(); + + let auto_increments: Arc>> = Arc::default(); + let query_cache: Arc>> = Arc::default(); + let ctrlc = rt.block_on(future::lazy(tokio_signal::ctrl_c)).unwrap(); let mut listener = listener.incoming().select(ctrlc.then(|r| match r { Ok(_) => Err(io::Error::new(io::ErrorKind::Interrupted, "got ctrl-c")), diff --git a/src/schema.rs b/src/schema.rs index 18504b1..fb9766a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -21,17 +21,17 @@ pub(crate) fn convert_column(cs: &ColumnSpecification) -> msql_srv::Column { SqlType::UnsignedInt(_) => { flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG; msql_srv::ColumnType::MYSQL_TYPE_LONG - }, + } SqlType::Bigint(_) => msql_srv::ColumnType::MYSQL_TYPE_LONGLONG, SqlType::UnsignedBigint(_) => { flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG; msql_srv::ColumnType::MYSQL_TYPE_LONGLONG - }, + } SqlType::Tinyint(_) => msql_srv::ColumnType::MYSQL_TYPE_TINY, SqlType::UnsignedTinyint(_) => { flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG; msql_srv::ColumnType::MYSQL_TYPE_TINY - }, + } SqlType::Bool => msql_srv::ColumnType::MYSQL_TYPE_BIT, SqlType::DateTime(_) => msql_srv::ColumnType::MYSQL_TYPE_DATETIME, SqlType::Float => msql_srv::ColumnType::MYSQL_TYPE_DOUBLE, diff --git a/tests/integration.rs b/tests/integration.rs index 64b0bb4..062b9ce 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -114,7 +114,17 @@ fn setup(deployment: &Deployment) -> mysql::Opts { let stats = (Arc::new(AtomicUsize::new(0)), None); let primed = Arc::new(AtomicBool::new(false)); - let mut b = NoriaBackend::new(ch, auto_increments, query_cache, stats, primed, false, true, true, logger); + let mut b = NoriaBackend::new( + ch, + auto_increments, + query_cache, + stats, + primed, + false, + true, + true, + logger, + ); MysqlIntermediary::run_on_tcp(&mut b, s).unwrap(); rt.shutdown_on_idle().wait().unwrap();