Skip to content

Commit 2c0e554

Browse files
committed
playing with channel semantics
1 parent e9b2b2e commit 2c0e554

File tree

1 file changed

+37
-18
lines changed

1 file changed

+37
-18
lines changed

async/Async_OpenFlow0x01.ml

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ module ControllerProcess = struct
3232
module Log = Async_OpenFlow_Log
3333
let tags = [("openflow", "openflow0x01")]
3434

35+
let output_log = ref None
36+
37+
let initialize_output () =
38+
Writer.open_file "/home/mark/updates-experiments/scripts/controller.remote"
39+
>>| fun log -> output_log := Some log
40+
41+
let get_output () = match !output_log with
42+
| Some log -> log
43+
3544
module ChunkController = Async_OpenFlowChunk.Controller
3645
module Client_id = struct
3746
module T = struct
@@ -246,24 +255,34 @@ module ControllerProcess = struct
246255
}
247256

248257
let create_from_chunk_hub t h =
249-
let ctl = create_from_chunk t in
258+
let ctl = create_from_chunk t in
259+
initialize_output ()
260+
>>= fun () ->
250261
Pipe.iter (Hub.listen_simple h) ~f:(fun (id, msg) -> match msg with
251262
| `Send (sw_id, msg) -> begin
252-
Log.debug ~tags "send (remote)";
263+
Print.fprintf (get_output ()) "[remote] send\n";
264+
Writer.fsync (get_output ())
265+
>>= fun () ->
253266
send ctl sw_id msg
254267
>>| fun resp -> Hub.send h id (`Send_resp resp)
255268
end
256269
| `Send_to_all msg ->
257-
Log.debug ~tags "send_to_all (remote)";
270+
Print.fprintf (get_output ()) "[remote] send_to_all\n";
271+
Writer.fsync (get_output ())
272+
>>= fun () ->
258273
return (send_to_all ctl msg)
259274
| `Send_ignore_errors (sw_id, msg) ->
260275
return (send_ignore_errors ctl sw_id msg)
261276
| `Listen -> begin
262-
Intf.hub ()
263-
>>=
264-
Hub.open_channel
265-
>>| fun chan -> Deferred.don't_wait_for (Pipe.iter_without_pushback (listen ctl) ~f:(fun elm -> Channel.write chan elm));
266-
Hub.send h id (`Listen_resp chan)
277+
Intf.hub ~buffer_age_limit:`Unlimited ()
278+
>>= fun new_h ->
279+
Deferred.don't_wait_for (Pipe.read (Hub.listen_simple new_h)
280+
>>= function
281+
| `Ok (id, msg) ->
282+
(Pipe.iter_without_pushback (listen ctl)
283+
~f:(Hub.send new_h id)));
284+
Hub.open_channel new_h
285+
>>| fun chan -> Hub.send h id (`Listen_resp chan)
267286
end
268287
| `Individual_stats (pattern, sw_id) -> (individual_stats ctl ~pattern sw_id)
269288
>>| fun resp -> Hub.send h id (`Individual_stats_resp resp)
@@ -280,7 +299,10 @@ module ControllerProcess = struct
280299
| `Set_idle_wait interval -> return (set_idle_wait ctl interval)
281300
| `Set_kill_wait interval -> return (set_kill_wait ctl interval)
282301
| `Get_switches ->
302+
Print.fprintf (get_output ()) "[remote] get_switches\n";
283303
Log.debug ~tags "get_switches (remote)";
304+
Writer.fsync (get_output ())
305+
>>= fun () ->
284306
return (Hub.send h id (`Get_switches_resp (get_switches ctl)))
285307
| `Clear_flows (pattern, sw_id) -> clear_flows ~pattern ctl sw_id
286308
>>| fun resp -> Hub.send h id (`Clear_flows_resp resp)
@@ -343,16 +365,12 @@ module Controller = struct
343365
| `Individual_stats_resp of
344366
(OpenFlow0x01_Stats.individualStats list, exn) Result.t
345367
| `Listen_resp of
346-
([ `Connect of
347-
OpenFlow0x01.switchId * OpenFlow0x01.SwitchFeatures.t
348-
| `Disconnect of SDN_Types.switchId * Core.Std.Sexp.t
349-
| `Message of
350-
SDN_Types.switchId * Message.t ],
368+
([ `Ready ],
351369
[ `Connect of
352370
OpenFlow0x01.switchId * OpenFlow0x01.SwitchFeatures.t
353371
| `Disconnect of SDN_Types.switchId * Core.Std.Sexp.t
354372
| `Message of
355-
SDN_Types.switchId * Message.t ]) Channel.t
373+
SDN_Types.switchId * Message.t]) Channel.t
356374
| `Send_resp of [ `Drop of exn | `Sent of Time.t ]
357375
| `Has_client_id_resp of bool
358376
| `Client_addr_port_resp of (Unix.Inet_addr.t * int) option
@@ -472,10 +490,11 @@ module Controller = struct
472490
Channel.write t `Listen;
473491
let reader,writer = Pipe.create () in
474492
don't_wait_for (
475-
Channel.read t >>= function
476-
| `Listen_resp resp -> Log.debug ~tags "Listen channel returned (local)";
477-
Log.flushed () >>|
478-
fun () -> channel_transfer resp writer);
493+
Log.debug ~tags "About to listen for listen_resp";
494+
Channel.read t >>| function
495+
| `Listen_resp chan -> Log.debug ~tags "Listen channel returned (local)";
496+
Channel.write chan `Ready;
497+
channel_transfer chan writer);
479498
reader
480499

481500
let barrier (t : t) sw_id =

0 commit comments

Comments
 (0)