Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ members = [
"macros",
"examples/paxos",
"examples/ping_pong",
"examples/read_write_server",
"examples/read_write_server", "examples/paxos",
]

exclude = [
Expand Down
2 changes: 1 addition & 1 deletion examples/paxos/paxos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ port = 3000
acceptors = ["a1", "a2", "a3"]
# chaos.msg_loss = { start_ms = 100, probability = 0.7 }
# chaos.msg_duplication = { start_ms = 0, probability = 0.7 , factor=2 }
chaos.msg_delay = { start_ms = 0, delay_range_ms = [500, 1000] , senders = ["a1"] }
# chaos.msg_delay = { start_ms = 0, delay_range_ms = [500, 1000] , senders = ["a1"] }

# [[placement.leader]]
# nodename = "node1"
Expand Down
6 changes: 3 additions & 3 deletions examples/paxos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
mod leader;

pub use reactor_actor::setup_shared_logger_ref;
use reactor_actor::{ActorAddr, RuntimeCtx};
use reactor_actor::{ActorAddr, RuntimeCtx, actor};

use lazy_static::lazy_static;
use serde_json::Value;
Expand All @@ -15,12 +15,12 @@ const SLEEP_MS: u64 = 100;
lazy_static! {
static ref RUNTIME: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap();
}
#[unsafe(no_mangle)]
#[actor]
pub fn acceptor(ctx: RuntimeCtx, _payload: HashMap<String, Value>) {
RUNTIME.spawn(acceptor::acceptor(ctx));
}

#[unsafe(no_mangle)]
#[actor]
pub fn leader(ctx: RuntimeCtx, mut payload: HashMap<String, Value>) {
let acceptors = payload
.remove("acceptors")
Expand Down
4 changes: 3 additions & 1 deletion generic_nctrl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ edition = "2024"
env_logger = "0.11.8"
tokio = { version = "1", features = ["full", "tracing"] }
reactor-node = { path = "../node" }
reactor-inst = { path = "../instrument" }
clap = { version = "4", features = ["derive"] }
serde_json = "1.0.140"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
reactor-inst = { path = "../instrument", optional=true }
log.workspace = true


[features]
default = []
swagger = ["reactor-node/swagger"]
jaeger = ["reactor-inst"]



11 changes: 11 additions & 0 deletions generic_nctrl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ pub struct Cli {
#[tokio::main]
async fn main() {
let cli = Cli::parse();

#[cfg(feature = "jaeger")]
let _gurad = reactor_inst::init_tracing();

#[cfg(not(feature = "jaeger"))]
{
use env_logger::Builder;
use log::LevelFilter;

Builder::new().filter_level(LevelFilter::Info).init();
}

node_controller(cli.port, cli.dir).await;
}
1 change: 0 additions & 1 deletion reactor-dashboard/ui/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ function App() {

try {
const res = await api.getStatus();
console.log(res.data.loaded_libs);

const newData: NodeData = {
actors: Array.isArray(res.data.actors) ? res.data.actors : [],
Expand Down
31 changes: 25 additions & 6 deletions reactor-dashboard/ui/src/components/deploy-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ function OpToNodes(nodes: Node[]): Record<string, string[]> {
return opToNodes
}

function OpToLib(nodes: Node[]): Record<string, string> {
const opToLib: Record<string, string> = {}

for (const node of nodes) {
for (const [lib, ops] of Object.entries(node.data.loaded_libs)) {
for (const op of ops){
opToLib[op] = lib;
}
}
}
return opToLib;
}

export default function JobRunner({ nodes }: DialogDemoProps) {
const [selectedOp, setSelectedOp] = useState<string | null>(null);
const [selectedNode, setSelectedNode] = useState<string | null>(null);
Expand All @@ -157,8 +170,14 @@ export default function JobRunner({ nodes }: DialogDemoProps) {
const [isJsonValid, setIsJsonValid] = useState<boolean>(true);

const opToNodes = OpToNodes(nodes);
const opToLib = OpToLib(nodes);
const allOps = Object.keys(opToNodes);
const nodesForOp = selectedOp ? opToNodes[selectedOp] ?? [] : []
const nodesForOp = selectedOp ? opToNodes[selectedOp] ?? [] : [];

let jc = new JobController(placement);
for (const node of nodes){
jc.registerNode(node.hostname, node.hostname, node.port);
}

const handlePlace = () => {
if (!selectedOp) {
Expand Down Expand Up @@ -204,16 +223,16 @@ export default function JobRunner({ nodes }: DialogDemoProps) {
setPlacement(newPlacement);
};

const handleDeploy = () =>{
let jc = new JobController(placement);
jc.startJob([...placement.get_actors()]);
const handleDeploy = () => {
jc.startJob([...placement.get_ops(opToLib)]);
toast.success("Job Deployed sucessfully");
};

return (
<Sheet>
<Toaster position="top-center" richColors/>
<SheetTrigger asChild>
<Button onClick={handleDeploy}>Deploy Job</Button>
<Button>Deploy Job</Button>
</SheetTrigger>
<SheetContent className="w-[1000px] sm:w-[1000px]">
<SheetHeader>
Expand Down Expand Up @@ -295,7 +314,7 @@ export default function JobRunner({ nodes }: DialogDemoProps) {
<SheetFooter>
<Button variant="secondary">Load</Button>
<Button variant="secondary">Save changes</Button>
<Button type="submit">Deploy</Button>
<Button type="submit" onClick={handleDeploy}>Deploy</Button>
<SheetClose asChild>
<Button variant="outline">Close</Button>
</SheetClose>
Expand Down
10 changes: 10 additions & 0 deletions reactor-dashboard/ui/src/reactor-ctrl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class ManualPlacementManager implements PlacementManager {
}

place(opInfo: LogicalOp): PhysicalOp[] {
console.log(this.map);
const ops = this.map.get(opInfo.name);
if (!ops) {
throw new Error(`No physical ops found for logical op: ${opInfo.name}`);
Expand Down Expand Up @@ -93,6 +94,15 @@ export class ManualPlacementManager implements PlacementManager {
return this.actors;
}

get_ops(opToLib: Record<string, string>): Set<LogicalOp> {
const ops = Object.entries(opToLib).map(([name, libName]) => ({
name,
libName,
}));

return new Set(ops);
}

}

export class NodeHandle {
Expand Down