Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chrono = "0.4"
tokio-signal = "0.2"
tracing = "0.1.3"
tracing-subscriber = "0.1"
serde_json = "1.0.41"

[dev-dependencies]
mysql = "16"
Expand Down
23 changes: 13 additions & 10 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use noria::{
DataType, SyncControllerHandle, SyncTable, SyncView, TableOperation, ZookeeperAuthority,
consensus::Authority, DataType, SyncControllerHandle, SyncTable, SyncView, TableOperation,
};

use failure;
Expand Down Expand Up @@ -46,17 +46,18 @@ impl fmt::Debug for PreparedStatement {
}
}

struct NoriaBackendInner<E> {
noria: SyncControllerHandle<ZookeeperAuthority, E>,
struct NoriaBackendInner<E, A: Authority + 'static> {
noria: SyncControllerHandle<A, E>,
inputs: BTreeMap<String, SyncTable>,
outputs: BTreeMap<String, SyncView>,
}

impl<E> NoriaBackendInner<E>
impl<E, A> NoriaBackendInner<E, A>
where
E: tokio::executor::Executor,
A: Authority + 'static,
{
fn new(mut ch: SyncControllerHandle<ZookeeperAuthority, E>) -> Self {
fn new(mut ch: SyncControllerHandle<A, E>) -> Self {
NoriaBackendInner {
inputs: ch
.inputs()
Expand Down Expand Up @@ -109,8 +110,8 @@ where
}
}

pub struct NoriaBackend<E> {
inner: NoriaBackendInner<E>,
pub struct NoriaBackend<E, A: Authority + 'static> {
inner: NoriaBackendInner<E, A>,
log: slog::Logger,
ops: Arc<atomic::AtomicUsize>,
trace_every: Option<usize>,
Expand All @@ -135,12 +136,13 @@ pub struct NoriaBackend<E> {
static_responses: bool,
}

impl<E> NoriaBackend<E>
impl<E, A> NoriaBackend<E, A>
where
E: tokio::executor::Executor,
A: Authority + 'static,
{
pub fn new(
ch: SyncControllerHandle<ZookeeperAuthority, E>,
ch: SyncControllerHandle<A, E>,
auto_increments: Arc<RwLock<HashMap<String, atomic::AtomicUsize>>>,
query_cache: Arc<RwLock<HashMap<SelectStatement, String>>>,
(ops, trace_every): (Arc<atomic::AtomicUsize>, Option<usize>),
Expand Down Expand Up @@ -910,9 +912,10 @@ where
}
}

impl<W: io::Write, E> MysqlShim<W> for &mut NoriaBackend<E>
impl<W: io::Write, E, A> MysqlShim<W> for &mut NoriaBackend<E, A>
where
E: tokio::executor::Executor,
A: 'static + Authority,
{
type Error = io::Error;

Expand Down
99 changes: 81 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ mod utils;
use crate::backend::NoriaBackend;
use msql_srv::MysqlIntermediary;
use nom_sql::SelectStatement;
use noria::{SyncControllerHandle, ZookeeperAuthority};
use noria::consensus::{Authority, LocalAuthority, ZookeeperAuthority};
use noria::{ControllerDescriptor, SyncControllerHandle};
use serde_json;
use std::collections::HashMap;
use std::io::{self, BufReader, BufWriter};
use std::sync::atomic::{self, AtomicUsize};
Expand Down Expand Up @@ -56,8 +58,7 @@ fn main() {
Arg::with_name("zk_addr")
.long("zookeeper-address")
.short("z")
.default_value("127.0.0.1:2181")
.help("IP:PORT for Zookeeper."),
.help("IP:PORT for Zookeeper. Defaults to 127.0.0.1:2181 if neither this nor server-address is set."),
)
.arg(
Arg::with_name("port")
Expand All @@ -67,6 +68,15 @@ fn main() {
.takes_value(true)
.help("Port to listen on."),
)
.arg(
Arg::with_name("server_addr")
.long("server-address")
.short("h")
.takes_value(true)
.required_unless("zk_addr")
.conflicts_with("zk_addr")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably also be added to the zk_addr argument

.help("IP:PORT for the Noria Server. Either this ore zookeeper-address is required"),
)
.arg(
Arg::with_name("slowlog")
.long("log-slow")
Expand Down Expand Up @@ -103,37 +113,90 @@ fn main() {
None
};
let slowlog = matches.is_present("slowlog");
let zk_addr = matches.value_of("zk_addr").unwrap().to_owned();
let sanitize = !matches.is_present("no-sanitize");
let static_responses = !matches.is_present("no-static-responses");

let listener = tokio::net::tcp::TcpListener::bind(&std::net::SocketAddr::new(
std::net::Ipv4Addr::LOCALHOST.into(),
port,
))
.unwrap();

let log = logger_pls();

info!(log, "listening on port {}", port);

let auto_increments: Arc<RwLock<HashMap<String, AtomicUsize>>> = Arc::default();
let query_cache: Arc<RwLock<HashMap<SelectStatement, String>>> = Arc::default();

let mut zk_auth = ZookeeperAuthority::new(&format!("{}/{}", zk_addr, deployment)).unwrap();
zk_auth.log_with(log.clone());

debug!(log, "Connecting to Noria...",);
let s = tracing_subscriber::fmt::format::Format::default()
.with_timer(tracing_subscriber::fmt::time::Uptime::default());
let s = tracing_subscriber::FmtSubscriber::builder()
.on_event(s)
.finish();
let tracer = tracing::Dispatch::new(s);
let mut rt = tracing::dispatcher::with_default(&tracer, tokio::runtime::Runtime::new).unwrap();
let ch = SyncControllerHandle::new(zk_auth, rt.executor()).unwrap();
let rt = tracing::dispatcher::with_default(&tracer, tokio::runtime::Runtime::new).unwrap();

debug!(log, "Connected!");

match (matches.value_of("zk_addr"), matches.value_of("server_addr")) {
(None, Some(addr)) => {
let lcl_auth = LocalAuthority::new();
let saddr = addr.parse().unwrap();
let cd = ControllerDescriptor {
external_addr: saddr,
worker_addr: saddr,
domain_addr: saddr,
nonce: 0,
};
let descriptor_bytes = serde_json::to_vec(&cd).unwrap();
lcl_auth.become_leader(descriptor_bytes).unwrap();
let ch = SyncControllerHandle::new(lcl_auth, rt.executor()).unwrap();
run(
rt,
ch,
log.clone(),
port,
slowlog,
static_responses,
sanitize,
trace_every,
)
}
(maybe_addr, None) => {
let addr = maybe_addr.unwrap_or("127.0.0.1:2181");
let mut zk_auth = ZookeeperAuthority::new(&format!("{}/{}", addr, deployment)).unwrap();
zk_auth.log_with(log.clone());
let ch = SyncControllerHandle::new(zk_auth, rt.executor()).unwrap();
run(
rt,
ch,
log.clone(),
port,
slowlog,
static_responses,
sanitize,
trace_every,
)
}
(Some(_), Some(_)) => unreachable!(),
};
}

fn run<A, E>(
mut rt: tokio::runtime::Runtime,
ch: SyncControllerHandle<A, E>,
log: slog::Logger,
port: u16,
slowlog: bool,
static_responses: bool,
sanitize: bool,
trace_every: Option<usize>,
) where
A: Authority + 'static,
E: tokio::executor::Executor + Clone + Send + 'static,
{
let listener = tokio::net::tcp::TcpListener::bind(&std::net::SocketAddr::new(
std::net::Ipv4Addr::LOCALHOST.into(),
port,
))
.unwrap();

let auto_increments: Arc<RwLock<HashMap<String, AtomicUsize>>> = Arc::default();
let query_cache: Arc<RwLock<HashMap<SelectStatement, String>>> = Arc::default();

let ctrlc = rt.block_on(future::lazy(tokio_signal::ctrl_c)).unwrap();
let mut listener = listener.incoming().select(ctrlc.then(|r| match r {
Ok(_) => Err(io::Error::new(io::ErrorKind::Interrupted, "got ctrl-c")),
Expand Down
6 changes: 3 additions & 3 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub(crate) fn convert_column(cs: &ColumnSpecification) -> msql_srv::Column {
SqlType::UnsignedInt(_) => {
flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG;
msql_srv::ColumnType::MYSQL_TYPE_LONG
},
}
SqlType::Bigint(_) => msql_srv::ColumnType::MYSQL_TYPE_LONGLONG,
SqlType::UnsignedBigint(_) => {
flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG;
msql_srv::ColumnType::MYSQL_TYPE_LONGLONG
},
}
SqlType::Tinyint(_) => msql_srv::ColumnType::MYSQL_TYPE_TINY,
SqlType::UnsignedTinyint(_) => {
flags |= msql_srv::ColumnFlags::UNSIGNED_FLAG;
msql_srv::ColumnType::MYSQL_TYPE_TINY
},
}
SqlType::Bool => msql_srv::ColumnType::MYSQL_TYPE_BIT,
SqlType::DateTime(_) => msql_srv::ColumnType::MYSQL_TYPE_DATETIME,
SqlType::Float => msql_srv::ColumnType::MYSQL_TYPE_DOUBLE,
Expand Down
12 changes: 11 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,17 @@ fn setup(deployment: &Deployment) -> mysql::Opts {

let stats = (Arc::new(AtomicUsize::new(0)), None);
let primed = Arc::new(AtomicBool::new(false));
let mut b = NoriaBackend::new(ch, auto_increments, query_cache, stats, primed, false, true, true, logger);
let mut b = NoriaBackend::new(
ch,
auto_increments,
query_cache,
stats,
primed,
false,
true,
true,
logger,
);

MysqlIntermediary::run_on_tcp(&mut b, s).unwrap();
rt.shutdown_on_idle().wait().unwrap();
Expand Down