diff --git a/_oasis b/_oasis index 9cab88b..53b5483 100644 --- a/_oasis +++ b/_oasis @@ -48,7 +48,8 @@ Library async cstruct.async, threads, sexplib.syntax, - sexplib + sexplib, + async_parallel InternalModules: Async_OpenFlow_Log, Async_OpenFlow_Message, diff --git a/_tags b/_tags index 980b20a..46ebc80 100644 --- a/_tags +++ b/_tags @@ -1,5 +1,5 @@ # OASIS_START -# DO NOT EDIT (digest: c549d3b3eb1ea3a63e2936cb07d78beb) +# DO NOT EDIT (digest: a559d07643364864392828b1b85386b8) # Ignore VCS directories, you can use the same kind of rule outside # OASIS_START/STOP if you want to exclude directories that contains # useless stuff for the build process @@ -27,6 +27,7 @@ true: annot, bin_annot # Library async "async/async.cmxs": use_async : package(async) +: package(async_parallel) : package(core) : package(cstruct) : package(cstruct.async) @@ -83,6 +84,7 @@ true: annot, bin_annot : use_quickcheck # Executable ping_test "ping-test/PingTest.byte": package(async) +"ping-test/PingTest.byte": package(async_parallel) "ping-test/PingTest.byte": package(core) "ping-test/PingTest.byte": package(cstruct) "ping-test/PingTest.byte": package(cstruct.async) @@ -98,6 +100,7 @@ true: annot, bin_annot "ping-test/PingTest.byte": use_async "ping-test/PingTest.byte": use_openflow : package(async) +: package(async_parallel) : package(core) : package(cstruct) : package(cstruct.async) @@ -114,6 +117,7 @@ true: annot, bin_annot : use_openflow # Executable learning_switch "examples/Learning_Switch.byte": package(async) +"examples/Learning_Switch.byte": package(async_parallel) "examples/Learning_Switch.byte": package(core) "examples/Learning_Switch.byte": package(cstruct) "examples/Learning_Switch.byte": package(cstruct.async) @@ -127,6 +131,7 @@ true: annot, bin_annot "examples/Learning_Switch.byte": use_openflow # Executable learning_switch0x04 "examples/Learning_Switch0x04.byte": package(async) +"examples/Learning_Switch0x04.byte": package(async_parallel) "examples/Learning_Switch0x04.byte": package(core) "examples/Learning_Switch0x04.byte": package(cstruct) "examples/Learning_Switch0x04.byte": package(cstruct.async) @@ -139,6 +144,7 @@ true: annot, bin_annot "examples/Learning_Switch0x04.byte": use_async "examples/Learning_Switch0x04.byte": use_openflow : package(async) +: package(async_parallel) : package(core) : package(cstruct) : package(cstruct.async) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index 5f1f472..8448421 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -43,6 +43,7 @@ module Platform : sig -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] -> ?monitor_connections:bool + -> ?log_level:Async.Std.Log.Level.t -> port:int -> unit -> t Deferred.t @@ -51,7 +52,7 @@ module Platform : sig val close : t -> Client_id.t -> unit - val has_client_id : t -> Client_id.t -> bool + val has_client_id : t -> Client_id.t -> bool Deferred.t val send : t @@ -66,9 +67,9 @@ module Platform : sig val client_addr_port : t -> Client_id.t - -> (Unix.Inet_addr.t * int) option + -> (Unix.Inet_addr.t * int) option Deferred.t - val listening_port : t -> int + val listening_port : t -> int Deferred.t end @@ -178,8 +179,8 @@ module OpenFlow0x01 : sig open OpenFlow0x01_Core open OpenFlow0x01_Stats - val get_switches : t -> SDN_Types.switchId list - val get_switch_features : t -> SDN_Types.switchId -> OpenFlow0x01.SwitchFeatures.t option + val get_switches : t -> SDN_Types.switchId list Deferred.t + val get_switch_features : t -> SDN_Types.switchId -> OpenFlow0x01.SwitchFeatures.t option Deferred.t val clear_flows : ?pattern:pattern -> t -> Client_id.t @@ -267,6 +268,7 @@ module SDN : sig -> ?log_disconnects:bool (** default is [true] *) -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] -> ?monitor_connections:bool + -> ?log_level:Async.Std.Log.Level.t -> port:int -> unit -> t Deferred.t diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 205f82a..1235b00 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -25,9 +25,11 @@ end include Async_OpenFlow_Message.MakeSerializers (Message) -module Controller = struct +module ControllerProcess = struct open Async.Std + open Async_parallel + (* Note: because we run in a different process, the settings for Log have to be transferred explicitly (i.e. this defaults to level:`Info, even if we set it to `Debug somewhere else *) module Log = Async_OpenFlow_Log let tags = [("openflow", "openflow0x01")] @@ -87,7 +89,7 @@ module Controller = struct let has_client_id t sw_id = match client_id_of_switch t sw_id with | Some c_id -> ChunkController.has_client_id t.sub c_id - | None -> false + | None -> return false let get_switches t = SwitchMap.keys t.s2c @@ -131,28 +133,11 @@ module Controller = struct let client_addr_port t sw_id = match client_id_of_switch t sw_id with | Some c_id -> ChunkController.client_addr_port t.sub c_id - | None -> None + | None -> return None let listening_port t = ChunkController.listening_port t.sub - let create_from_chunk t = { - sub = t; - shakes = ClientSet.create (); - c2s = ClientMap.create (); - s2c = SwitchMap.create (); - switch_features = Hashtbl.Poly.create () - } - - let create ?max_pending_connections - ?verbose - ?log_disconnects - ?buffer_age_limit - ?monitor_connections ~port () = - ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ?monitor_connections ~port () - >>| create_from_chunk - let openflow0x01 t evt = match evt with | `Connect (c_id, version) -> @@ -228,10 +213,10 @@ module Controller = struct let open Deferred.Result in begin if clear then clear_flows t sw_id else return () end >>= fun () -> - let sends = List.map flow_mods + Deferred.(List.map ~how:`Parallel flow_mods ~f:(fun f -> send_result t sw_id (0l, M.FlowModMsg f)) - in - all_ignore sends + >>| (fun sends -> + Core.Std.Result.all_ignore sends)) let send_pkt_out (t:t) (sw_id:Client_id.t) pkt_out = send_result t sw_id (0l, M.PacketOutMsg pkt_out) @@ -279,5 +264,349 @@ module Controller = struct | _ -> assert false) | _ -> assert false) + let launch_cpu_process () = + don't_wait_for (Pipe.iter_without_pushback (Cpu_usage.samples ()) + ~f:(fun pct -> Log.printf ~tags ~level:`Debug "[remote] %s CPU usage" (Percent.to_string pct))) + + let create_from_chunk t = + { sub = t + ; shakes = ClientSet.create () + ; c2s = ClientMap.create () + ; s2c = SwitchMap.create () + ; switch_features = Hashtbl.Poly.create () + } + + let create_from_chunk_hub t h = + launch_cpu_process (); + let ctl = create_from_chunk t in + Pipe.iter (Hub.listen_simple h) ~f:(fun (id, msg) -> match msg with + | `Send (sw_id, msg) -> begin + Log.debug ~tags "[remote] send"; + send ctl sw_id msg + >>| fun resp -> Hub.send h id (`Send_resp resp) + end + | `Send_to_all msg -> + Log.debug ~tags "[remote] send_to_all"; + return (send_to_all ctl msg) + | `Send_ignore_errors (sw_id, msg) -> + Log.debug ~tags "[remote] send_ignore_errors"; + return (send_ignore_errors ctl sw_id msg) + | `Listen -> begin + Log.debug ~tags "[remote] listen"; + Intf.hub ~buffer_age_limit:`Unlimited () + >>= fun new_h -> + Deferred.don't_wait_for (Pipe.read (Hub.listen_simple new_h) + >>= function + | `Ok (id, msg) -> + (Pipe.iter_without_pushback (listen ctl) + ~f:(Hub.send new_h id))); + Hub.open_channel new_h + >>| fun chan -> Hub.send h id (`Listen_resp chan) + end + | `Individual_stats (pattern, sw_id) -> + Log.debug ~tags "[remote] individual_stats"; + (individual_stats ctl ~pattern sw_id) + >>| fun resp -> Hub.send h id (`Individual_stats_resp resp) + | `Port_stats (sw_id, pt) -> + Log.debug ~tags "[remote] port_stats"; + (port_stats ctl sw_id pt) + >>| fun resp -> Hub.send h id (`Port_stats_resp resp) + | `Barrier args -> + Log.debug ~tags "[remote] barrier"; + barrier ctl args + >>| fun resp -> Hub.send h id (`Barrier_resp resp) + | `Close sw_id -> + Log.debug ~tags "[remote] close"; + return (close ctl sw_id) + | `Has_client_id sw_id -> + Log.debug ~tags "[remote] has_client_id"; + has_client_id ctl sw_id + >>| fun resp -> Hub.send h id (`Has_client_id_resp resp) + | `Client_addr_port sw_id -> + Log.debug ~tags "[remote] client_addr_port"; + client_addr_port ctl sw_id + >>| fun resp -> Hub.send h id (`Client_addr_port_resp resp) + | `Listening_port -> + Log.debug ~tags "[remote] listening_port"; + listening_port ctl + >>| fun resp -> Hub.send h id (`Listening_port_resp resp) + | `Set_monitor_interval interval -> + Log.debug ~tags "[remote] set_monitor_interval"; + return (set_monitor_interval ctl interval) + | `Set_idle_wait interval -> + Log.debug ~tags "[remote] set_idle_wait"; + return (set_idle_wait ctl interval) + | `Set_kill_wait interval -> + Log.debug ~tags "[remote] set_kill_wait"; + return (set_kill_wait ctl interval) + | `Get_switches -> + Log.debug ~tags "[remote] get_switches"; + return (Hub.send h id (`Get_switches_resp (get_switches ctl))) + | `Get_switch_features sw_id -> + Log.debug ~tags "[remote] get_switch_features"; + return (Hub.send h id (`Get_switch_features_resp (get_switch_features ctl sw_id))) + | `Clear_flows (pattern, sw_id) -> + Log.debug ~tags "[remote] clear_flows"; + clear_flows ~pattern ctl sw_id + >>| fun resp -> Hub.send h id (`Clear_flows_resp resp) + | `Send_flow_mods (clear, sw_id, flow_mods) -> + Log.debug ~tags "[remote] send_flow_mods"; + send_flow_mods ~clear ctl sw_id flow_mods + >>| fun resp -> Hub.send h id (`Send_flow_mods_resp resp) + | `Send_pkt_out (sw_id, pkt_out) -> + Log.debug ~tags "[remote] send_pkt_out"; + send_pkt_out ctl sw_id pkt_out + >>| fun resp -> Hub.send h id (`Send_pkt_out_resp resp) + | `Aggregate_stats (pattern, sw_id) -> + Log.debug ~tags "[remote] aggregate_stats"; + aggregate_stats ~pattern ctl sw_id + >>| fun resp -> Hub.send h id (`Aggregate_stats_resp resp) + + ) + + let create ?max_pending_connections + ?verbose + ?log_disconnects + ?buffer_age_limit + ?monitor_connections + ?log_level ~port h = + ChunkController.create ?max_pending_connections ?verbose ?log_disconnects + ?buffer_age_limit ?monitor_connections ?log_level ~port () + >>= (fun t -> create_from_chunk_hub t h) + +end +module Controller = struct + open ControllerProcess + open Async.Std + open Async_parallel + + module Log = Async_OpenFlow_Log + let tags = [("openflow", "openflow0x01")] + + (* We can not call read() on the same pipe concurrently. + Somehow this is happening sometimes, so we need to + enforce this invariant locally with condition variables. *) + + let read_outstanding = ref false + + let read_finished = Condition.create () + + module Client_id = ControllerProcess.Client_id + type t = ([ `Barrier of SwitchMap.key + | `Individual_stats of + C.pattern * + SwitchMap.key + | `Port_stats of + SwitchMap.key * + OpenFlow0x01_Core.portId + | `Listen + | `Send of + SwitchMap.key * + Message.t + | `Send_to_all of + Message.t + | `Send_ignore_errors of + SwitchMap.key * + Message.t + | `Close of Client_id.t + | `Has_client_id of Client_id.t + | `Client_addr_port of Client_id.t + | `Listening_port + | `Set_monitor_interval of Core.Std.Time.Span.t + | `Set_idle_wait of Core.Std.Time.Span.t + | `Set_kill_wait of Core.Std.Time.Span.t + | `Get_switches + | `Get_switch_features of SwitchMap.key + | `Clear_flows of OpenFlow0x01_Core.pattern * Client_id.t + | `Send_flow_mods of bool * Client_id.t * OpenFlow0x01_Core.flowMod list + | `Send_pkt_out of Client_id.t * OpenFlow0x01_Core.packetOut + | `Aggregate_stats of OpenFlow0x01_Core.pattern * Client_id.t + ], + [ `Barrier_resp of (unit, exn) Result.t + | `Individual_stats_resp of + (OpenFlow0x01_Stats.individualStats list, exn) Result.t + | `Port_stats_resp of + (OpenFlow0x01_Stats.portStats, exn) Result.t + | `Listen_resp of + ([ `Ready ], + [ `Connect of + OpenFlow0x01.switchId * OpenFlow0x01.SwitchFeatures.t + | `Disconnect of SDN_Types.switchId * Core.Std.Sexp.t + | `Message of + SDN_Types.switchId * Message.t]) Channel.t + | `Send_resp of [ `Drop of exn | `Sent of Time.t ] + | `Has_client_id_resp of bool + | `Client_addr_port_resp of (Unix.Inet_addr.t * int) option + | `Listening_port_resp of int + | `Get_switches_resp of SDN_Types.switchId list + | `Get_switch_features_resp of OpenFlow0x01.SwitchFeatures.t option + | `Clear_flows_resp of (unit, exn) Result.t + | `Send_flow_mods_resp of (unit, exn) Result.t + | `Send_pkt_out_resp of (unit, exn) Result.t + | `Aggregate_stats_resp of (OpenFlow0x01_Stats.aggregateStats, exn) Result.t + ]) Channel.t + let rec clear_to_read () = if (!read_outstanding) + then Condition.wait read_finished >>= clear_to_read + else return (read_outstanding := true) + + let signal_read () = read_outstanding := false; + Condition.broadcast read_finished () + + let aggregate_stats ?(pattern=C.match_all) (t : t) sw_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] aggregate_stats"; + Channel.write t (`Aggregate_stats (pattern, sw_id)); + Channel.read t >>| function + | `Aggregate_stats_resp resp -> signal_read (); resp + + let send_pkt_out (t : t) (sw_id:Client_id.t) pkt_out = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] send_pkt_out"; + Channel.write t (`Send_pkt_out (sw_id, pkt_out)); + Channel.read t >>| function + | `Send_pkt_out_resp resp -> signal_read (); resp + + let send_flow_mods ?(clear=true) (t : t) (sw_id:Client_id.t) flow_mods = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] send_flow_mods"; + Channel.write t (`Send_flow_mods (clear, sw_id, flow_mods)); + Channel.read t >>| function + | `Send_flow_mods_resp resp -> signal_read (); resp + + let clear_flows ?(pattern=C.match_all) (t : t) (sw_id:Client_id.t) = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] clear_flows"; + Channel.write t (`Clear_flows (pattern, sw_id)); + Channel.read t >>| function + | `Clear_flows_resp resp -> signal_read (); resp + + let get_switches (t : t) = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] get_switches"; + Channel.write t `Get_switches; + Channel.read t >>| function + | `Get_switches_resp resp -> signal_read (); resp + + let set_kill_wait t (s:Time.Span.t) = + Log.debug ~tags "[local] set_kill_wait"; + Channel.write t (`Set_kill_wait s) + + let set_monitor_interval t (s:Time.Span.t) = + Log.debug ~tags "[local] set_monitor_interval"; + Channel.write t (`Set_monitor_interval s) + + let set_idle_wait t (s:Time.Span.t) : unit = + Log.debug ~tags "[local] set_idle_wait"; + Channel.write t (`Set_idle_wait s) + + let listening_port (t : t) = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] set_listening_port"; + Channel.write t `Listening_port; + Channel.read t >>| function + | `Listening_port_resp resp -> signal_read (); resp + + let client_addr_port (t : t) sw_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] client_addr_port"; + Channel.write t (`Client_addr_port sw_id); + Channel.read t >>| function + | `Client_addr_port_resp resp -> signal_read (); resp + + let send_to_all (t : t) msg = + Log.debug ~tags "[local] send_to_all"; + Channel.write t (`Send_to_all msg) + + let send_ignore_errors (t : t) sw_id msg = + Log.debug ~tags "[local] send_ignore_errors"; + Channel.write t (`Send_ignore_errors (sw_id, msg)) + + let has_client_id (t : t) sw_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] has_client_id"; + Channel.write t (`Has_client_id sw_id); + Channel.read t >>| function + | `Has_client_id_resp resp -> signal_read (); resp + + let close (t : t) sw_id = + Log.debug ~tags "[local] close"; + Channel.write t (`Close sw_id) + + type e = ControllerProcess.e + type m = ControllerProcess.m + type c = ControllerProcess.c + + let create_from_chunk chunk = + Log.debug ~tags "[local] create_from_chunk"; + Intf.spawn (create_from_chunk_hub chunk) >>| fun (c,_) -> + c + + let create ?max_pending_connections + ?verbose + ?log_disconnects + ?buffer_age_limit + ?monitor_connections + ?log_level + ~port () : t Deferred.t = + Log.debug ~tags "[local] create"; + Intf.spawn (create ?max_pending_connections + ?verbose + ?log_disconnects + ?buffer_age_limit + ?monitor_connections + ?log_level ~port) >>| fun (c,_) -> + c + + let send (t : t) sw_id msg = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] send"; + Channel.write t (`Send (sw_id, msg)); + Channel.read t >>| function + | `Send_resp resp -> signal_read (); resp + + let channel_transfer chan writer = + Deferred.forever () (fun _ -> Channel.read chan >>= + Pipe.write writer) + let listen (t : t) = + Log.debug ~tags "[local] listen"; + Channel.write t `Listen; + let reader,writer = Pipe.create () in + don't_wait_for ( + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] About to listen for listen_resp"; + Channel.read t >>| function + | `Listen_resp chan -> Log.debug ~tags "[local] Listen channel returned"; + signal_read (); + Channel.write chan `Ready; + channel_transfer chan writer); + reader + + let barrier (t : t) sw_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] barrier"; + Channel.write t (`Barrier sw_id); + Channel.read t >>| function + | `Barrier_resp resp -> signal_read (); resp + + let individual_stats ?(pattern=C.match_all) (t : t) sw_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] individual_stats"; + Channel.write t (`Individual_stats (pattern, sw_id)); + Channel.read t >>| function + | `Individual_stats_resp resp -> signal_read (); resp + + let port_stats (t : t) sw_id pt_id = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] port_stats"; + Channel.write t (`Port_stats (sw_id, pt_id)); + Channel.read t >>| function + | `Port_stats_resp resp -> signal_read (); resp + + let get_switch_features (t : t) (switch_id : SDN_Types.switchId) = + clear_to_read () >>= fun () -> + Log.debug ~tags "[local] get_switch_features"; + Channel.write t (`Get_switch_features switch_id); + Channel.read t >>| function + | `Get_switch_features_resp resp -> signal_read (); resp end diff --git a/async/Async_OpenFlow0x04.ml b/async/Async_OpenFlow0x04.ml index a00efaa..8025fba 100644 --- a/async/Async_OpenFlow0x04.ml +++ b/async/Async_OpenFlow0x04.ml @@ -99,9 +99,10 @@ module Controller = struct ?verbose ?log_disconnects ?buffer_age_limit - ?monitor_connections ~port () = + ?monitor_connections + ?log_level ~port () = ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ?monitor_connections ~port () + ?buffer_age_limit ?monitor_connections ?log_level ~port () >>| create_from_chunk (* XXX(seliopou): Raises `Not_found` if the client is no longer connected. *) diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 65807f4..fc03bd3 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -2,6 +2,7 @@ open Core.Std module Platform = Async_OpenFlow_Platform module Header = OpenFlow_Header +module Log = Async_OpenFlow_Log module Message : Platform.Message with type t = (Header.t * Cstruct.t) = struct @@ -220,7 +221,11 @@ module Controller = struct ?verbose ?log_disconnects ?buffer_age_limit - ?(monitor_connections=false) ~port () = + ?(monitor_connections=false) + ?log_level ~port () = + (match log_level with + | Some level -> Async_OpenFlow_Log.set_level level + | None -> ()); Platform.create ?max_pending_connections ?verbose ?log_disconnects ?buffer_age_limit ~monitor_connections ~port () >>| function t -> @@ -241,10 +246,13 @@ module Controller = struct Platform.close t.platform c_id let has_client_id t c_id = - Platform.has_client_id t.platform c_id && - match Client_id.Table.find t.clients c_id with + Platform.has_client_id t.platform c_id >>| function + | true -> begin + match Client_id.Table.find t.clients c_id with | Some({ Conn.version = Some(_) }) -> true | _ -> false + end + | false -> false let send t c_id m = Platform.send t.platform c_id m diff --git a/async/Async_OpenFlow_Log.ml b/async/Async_OpenFlow_Log.ml index c089bd1..1fc9d28 100644 --- a/async/Async_OpenFlow_Log.ml +++ b/async/Async_OpenFlow_Log.ml @@ -1,5 +1,7 @@ open Core.Std open Async.Std -module Log = Log.Make_global () -include Log \ No newline at end of file +(* module Log = Log.Make_global () *) +(* include Log *) +module Log = Log.Global +include Log diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index ca2039e..4a2ff61 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -29,6 +29,7 @@ module type S = sig -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] -> ?monitor_connections:bool + -> ?log_level:Async.Std.Log.Level.t -> port:int -> unit -> t Deferred.t @@ -37,7 +38,7 @@ module type S = sig val close : t -> Client_id.t -> unit - val has_client_id : t -> Client_id.t -> bool + val has_client_id : t -> Client_id.t -> bool Deferred.t val send : t @@ -52,9 +53,9 @@ module type S = sig val client_addr_port : t -> Client_id.t - -> (Unix.Inet_addr.t * int) option + -> (Unix.Inet_addr.t * int) option Deferred.t - val listening_port : t -> int + val listening_port : t -> int Deferred.t end @@ -114,6 +115,7 @@ module Make(Message : Message) () = struct ?log_disconnects ?buffer_age_limit ?monitor_connections + ?log_level ~port () = Impl.create ?max_pending_connections ?verbose ?log_disconnects ?buffer_age_limit ~port ~auth:(fun _ _ _ -> return `Allow) () @@ -129,7 +131,7 @@ module Make(Message : Message) () = struct let close = Impl.close - let has_client_id = Impl.has_client_id + let has_client_id a b = return (Impl.has_client_id a b) let send t c_id m = Monitor.try_with (fun () -> Impl.send t c_id m) @@ -141,8 +143,8 @@ module Make(Message : Message) () = struct let send_to_all = Impl.send_to_all - let client_addr_port = Impl.client_addr_port + let client_addr_port a b = return (Impl.client_addr_port a b) - let listening_port = Impl.port + let listening_port a = return (Impl.port a) end diff --git a/async/Async_SDN.ml b/async/Async_SDN.ml index 414f9cd..b51c8f9 100644 --- a/async/Async_SDN.ml +++ b/async/Async_SDN.ml @@ -2,7 +2,7 @@ open Core.Std open Async.Std module Chunk_Controller = Async_OpenFlowChunk.Controller -module OF0x01_Controller = Async_OpenFlow0x01.Controller +module OF0x01_Controller = Async_OpenFlow0x01.ControllerProcess module OF0x04_Controller = Async_OpenFlow0x04.Controller module SDN = SDN_Types @@ -42,15 +42,16 @@ let create ?max_pending_connections ?verbose ?log_disconnects ?buffer_age_limit - ?monitor_connections ~port () = + ?monitor_connections + ?log_level ~port () = Chunk_Controller.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ?monitor_connections ~port () + ?buffer_age_limit ?monitor_connections ?log_level ~port () >>| function chunk -> - { sub_chunk = chunk - ; sub_0x01 = OF0x01_Controller.create_from_chunk chunk - ; sub_0x04 = OF0x04_Controller.create_from_chunk chunk - ; conns = ClientMap.create () - } + { sub_chunk = chunk + ; sub_0x01 = OF0x01_Controller.create_from_chunk chunk + ; sub_0x04 = OF0x04_Controller.create_from_chunk chunk + ; conns = ClientMap.create () + } let listen_of0x01 (t : t) : Chunk_Controller.h Pipe.Writer.t * e Pipe.Reader.t = let module OF = OpenFlow0x01 in diff --git a/lib/META b/lib/META index 020e018..f7109bc 100644 --- a/lib/META +++ b/lib/META @@ -1,5 +1,5 @@ # OASIS_START -# DO NOT EDIT (digest: 50896b7eece3c83175a07eab36bf8295) +# DO NOT EDIT (digest: 3ed4de1c9afed0b9ffdb6c045c1e9f7f) version = "0.9.1" description = "Serialization library for OpenFlow" requires = @@ -23,7 +23,8 @@ package "quickcheck" ( package "async" ( version = "0.9.1" description = "Serialization library for OpenFlow" - requires = "async openflow cstruct.async threads sexplib.syntax sexplib" + requires = + "async openflow cstruct.async threads sexplib.syntax sexplib async_parallel" archive(byte) = "async.cma" archive(byte, plugin) = "async.cma" archive(native) = "async.cmxa" diff --git a/opam b/opam index 69f7185..6b41b73 100644 --- a/opam +++ b/opam @@ -21,6 +21,7 @@ depends: [ "packet" {>= "0.3.1" } "sexplib" "async" {>= "112.17.00" } + "async_parallel" "quickcheck" "ounit" {test} "pa_ounit" {test} diff --git a/setup.ml b/setup.ml index 12f22b5..9244b7f 100644 --- a/setup.ml +++ b/setup.ml @@ -1,7 +1,7 @@ -(* setup.ml generated for the first time by OASIS v0.4.4 *) +(* setup.ml generated for the first time by OASIS v0.4.5 *) (* OASIS_START *) -(* DO NOT EDIT (digest: 9f4cf3830241729ff5c49b4e742e23e6) *) +(* DO NOT EDIT (digest: fc5394ec3af8e024ee0f348e82c7a798) *) (* Regenerated by OASIS v0.4.5 Visit http://oasis.forge.ocamlcore.org for more information and @@ -6991,7 +6991,8 @@ let setup_t = FindlibPackage ("cstruct.async", None); FindlibPackage ("threads", None); FindlibPackage ("sexplib.syntax", None); - FindlibPackage ("sexplib", None) + FindlibPackage ("sexplib", None); + FindlibPackage ("async_parallel", None) ]; bs_build_tools = [ExternalTool "ocamlbuild"]; bs_c_sources = []; @@ -7253,7 +7254,8 @@ let setup_t = }; oasis_fn = Some "_oasis"; oasis_version = "0.4.5"; - oasis_digest = Some "\255l\181d\005jR\006\236l\167\n`\t\147\180"; + oasis_digest = + Some "\206Z\165?\020\169\248\248\160\178\b\137\223P\174\148"; oasis_exec = None; oasis_setup_args = []; setup_update = false @@ -7261,6 +7263,6 @@ let setup_t = let setup () = BaseSetup.setup setup_t;; -# 7265 "setup.ml" +# 7267 "setup.ml" (* OASIS_STOP *) let () = setup ();;