Skip to content

Commit 74bed71

Browse files
committed
refactored OpenFlow0x01 controller into new process
1 parent 3255fd7 commit 74bed71

File tree

10 files changed

+265
-43
lines changed

10 files changed

+265
-43
lines changed

_oasis

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ Library async
4848
cstruct.async,
4949
threads,
5050
sexplib.syntax,
51-
sexplib
51+
sexplib,
52+
async_parallel
5253
InternalModules:
5354
Async_OpenFlow_Log,
5455
Async_OpenFlow_Message,

_tags

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# OASIS_START
2-
# DO NOT EDIT (digest: c549d3b3eb1ea3a63e2936cb07d78beb)
2+
# DO NOT EDIT (digest: a559d07643364864392828b1b85386b8)
33
# Ignore VCS directories, you can use the same kind of rule outside
44
# OASIS_START/STOP if you want to exclude directories that contains
55
# useless stuff for the build process
@@ -27,6 +27,7 @@ true: annot, bin_annot
2727
# Library async
2828
"async/async.cmxs": use_async
2929
<async/*.ml{,i,y}>: package(async)
30+
<async/*.ml{,i,y}>: package(async_parallel)
3031
<async/*.ml{,i,y}>: package(core)
3132
<async/*.ml{,i,y}>: package(cstruct)
3233
<async/*.ml{,i,y}>: package(cstruct.async)
@@ -83,6 +84,7 @@ true: annot, bin_annot
8384
<test/*.ml{,i,y}>: use_quickcheck
8485
# Executable ping_test
8586
"ping-test/PingTest.byte": package(async)
87+
"ping-test/PingTest.byte": package(async_parallel)
8688
"ping-test/PingTest.byte": package(core)
8789
"ping-test/PingTest.byte": package(cstruct)
8890
"ping-test/PingTest.byte": package(cstruct.async)
@@ -98,6 +100,7 @@ true: annot, bin_annot
98100
"ping-test/PingTest.byte": use_async
99101
"ping-test/PingTest.byte": use_openflow
100102
<ping-test/*.ml{,i,y}>: package(async)
103+
<ping-test/*.ml{,i,y}>: package(async_parallel)
101104
<ping-test/*.ml{,i,y}>: package(core)
102105
<ping-test/*.ml{,i,y}>: package(cstruct)
103106
<ping-test/*.ml{,i,y}>: package(cstruct.async)
@@ -114,6 +117,7 @@ true: annot, bin_annot
114117
<ping-test/*.ml{,i,y}>: use_openflow
115118
# Executable learning_switch
116119
"examples/Learning_Switch.byte": package(async)
120+
"examples/Learning_Switch.byte": package(async_parallel)
117121
"examples/Learning_Switch.byte": package(core)
118122
"examples/Learning_Switch.byte": package(cstruct)
119123
"examples/Learning_Switch.byte": package(cstruct.async)
@@ -127,6 +131,7 @@ true: annot, bin_annot
127131
"examples/Learning_Switch.byte": use_openflow
128132
# Executable learning_switch0x04
129133
"examples/Learning_Switch0x04.byte": package(async)
134+
"examples/Learning_Switch0x04.byte": package(async_parallel)
130135
"examples/Learning_Switch0x04.byte": package(core)
131136
"examples/Learning_Switch0x04.byte": package(cstruct)
132137
"examples/Learning_Switch0x04.byte": package(cstruct.async)
@@ -139,6 +144,7 @@ true: annot, bin_annot
139144
"examples/Learning_Switch0x04.byte": use_async
140145
"examples/Learning_Switch0x04.byte": use_openflow
141146
<examples/*.ml{,i,y}>: package(async)
147+
<examples/*.ml{,i,y}>: package(async_parallel)
142148
<examples/*.ml{,i,y}>: package(core)
143149
<examples/*.ml{,i,y}>: package(cstruct)
144150
<examples/*.ml{,i,y}>: package(cstruct.async)

async/Async_OpenFlow.mli

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ module Platform : sig
5151

5252
val close : t -> Client_id.t -> unit
5353

54-
val has_client_id : t -> Client_id.t -> bool
54+
val has_client_id : t -> Client_id.t -> bool Deferred.t
5555

5656
val send
5757
: t
@@ -66,9 +66,9 @@ module Platform : sig
6666
val client_addr_port
6767
: t
6868
-> Client_id.t
69-
-> (Unix.Inet_addr.t * int) option
69+
-> (Unix.Inet_addr.t * int) option Deferred.t
7070

71-
val listening_port : t -> int
71+
val listening_port : t -> int Deferred.t
7272

7373
end
7474

@@ -178,7 +178,7 @@ module OpenFlow0x01 : sig
178178
open OpenFlow0x01_Core
179179
open OpenFlow0x01_Stats
180180

181-
val get_switches : t -> SDN_Types.switchId list
181+
val get_switches : t -> SDN_Types.switchId list Deferred.t
182182

183183
val clear_flows
184184
: ?pattern:pattern -> t -> Client_id.t

async/Async_OpenFlow0x01.ml

Lines changed: 226 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ end
2525

2626
include Async_OpenFlow_Message.MakeSerializers (Message)
2727

28-
module Controller = struct
28+
module ControllerProcess = struct
2929
open Async.Std
30+
open Async_parallel
3031

3132
module Log = Async_OpenFlow_Log
3233
let tags = [("openflow", "openflow0x01")]
@@ -123,22 +124,6 @@ module Controller = struct
123124
let listening_port t =
124125
ChunkController.listening_port t.sub
125126

126-
let create_from_chunk t =
127-
{ sub = t
128-
; shakes = ClientSet.create ()
129-
; c2s = ClientMap.create ()
130-
; s2c = SwitchMap.create ()
131-
}
132-
133-
let create ?max_pending_connections
134-
?verbose
135-
?log_disconnects
136-
?buffer_age_limit
137-
?monitor_connections ~port () =
138-
ChunkController.create ?max_pending_connections ?verbose ?log_disconnects
139-
?buffer_age_limit ?monitor_connections ~port ()
140-
>>| create_from_chunk
141-
142127
let openflow0x01 t evt =
143128
match evt with
144129
| `Connect (c_id, version) ->
@@ -252,4 +237,228 @@ module Controller = struct
252237
| (_, M.StatsReplyMsg (IndividualFlowRep r)) -> Result.Ok r
253238
| _ -> assert false)
254239
| _ -> assert false)
240+
241+
let create_from_chunk t =
242+
{ sub = t
243+
; shakes = ClientSet.create ()
244+
; c2s = ClientMap.create ()
245+
; s2c = SwitchMap.create ()
246+
}
247+
248+
let create_from_chunk_hub t h =
249+
let ctl = create_from_chunk t in
250+
Pipe.iter (Hub.listen_simple h) ~f:(fun (id, msg) -> match msg with
251+
| `Send (sw_id, msg) -> begin
252+
send ctl sw_id msg
253+
>>| fun resp -> Hub.send h id (`Send_resp resp)
254+
end
255+
| `Send_to_all msg ->
256+
return (send_to_all ctl msg)
257+
| `Send_ignore_errors (sw_id, msg) ->
258+
return (send_ignore_errors ctl sw_id msg)
259+
| `Listen -> begin
260+
Intf.hub ()
261+
>>=
262+
Hub.open_channel
263+
>>| fun chan -> Deferred.don't_wait_for (Pipe.iter_without_pushback (listen ctl) ~f:(fun elm -> Channel.write chan elm));
264+
Hub.send h id (`Listen_resp chan)
265+
end
266+
| `Individual_stats (pattern, sw_id) -> (individual_stats ctl ~pattern sw_id)
267+
>>| fun resp -> Hub.send h id (`Individual_stats_resp resp)
268+
| `Barrier args -> barrier ctl args
269+
>>| fun resp -> Hub.send h id (`Barrier_resp resp)
270+
| `Close sw_id -> return (close ctl sw_id)
271+
| `Has_client_id sw_id -> has_client_id ctl sw_id
272+
>>| fun resp -> Hub.send h id (`Has_client_id_resp resp)
273+
| `Client_addr_port sw_id -> client_addr_port ctl sw_id
274+
>>| fun resp -> Hub.send h id (`Client_addr_port_resp resp)
275+
| `Listening_port -> listening_port ctl
276+
>>| fun resp -> Hub.send h id (`Listening_port_resp resp)
277+
| `Set_monitor_interval interval -> return (set_monitor_interval ctl interval)
278+
| `Set_idle_wait interval -> return (set_idle_wait ctl interval)
279+
| `Set_kill_wait interval -> return (set_kill_wait ctl interval)
280+
| `Get_switches ->
281+
return (Hub.send h id (`Get_switches_resp (get_switches ctl)))
282+
| `Clear_flows (pattern, sw_id) -> clear_flows ~pattern ctl sw_id
283+
>>| fun resp -> Hub.send h id (`Clear_flows_resp resp)
284+
| `Send_flow_mods (clear, sw_id, flow_mods) -> send_flow_mods ~clear ctl sw_id flow_mods
285+
>>| fun resp -> Hub.send h id (`Send_flow_mods_resp resp)
286+
| `Send_pkt_out (sw_id, pkt_out) -> send_pkt_out ctl sw_id pkt_out
287+
>>| fun resp -> Hub.send h id (`Send_pkt_out_resp resp)
288+
| `Aggregate_stats (pattern, sw_id) -> aggregate_stats ~pattern ctl sw_id
289+
>>| fun resp -> Hub.send h id (`Aggregate_stats_resp resp)
290+
291+
)
292+
293+
let create ?max_pending_connections
294+
?verbose
295+
?log_disconnects
296+
?buffer_age_limit
297+
?monitor_connections ~port h =
298+
ChunkController.create ?max_pending_connections ?verbose ?log_disconnects
299+
?buffer_age_limit ?monitor_connections ~port ()
300+
>>= (fun t -> create_from_chunk_hub t h)
301+
302+
end
303+
304+
module Controller = struct
305+
open ControllerProcess
306+
open Async.Std
307+
open Async_parallel
308+
309+
module Client_id = ControllerProcess.Client_id
310+
type t = ([ `Barrier of SwitchMap.key
311+
| `Individual_stats of
312+
C.pattern *
313+
SwitchMap.key
314+
| `Listen
315+
| `Send of
316+
SwitchMap.key *
317+
Message.t
318+
| `Send_to_all of
319+
Message.t
320+
| `Send_ignore_errors of
321+
SwitchMap.key *
322+
Message.t
323+
| `Close of Client_id.t
324+
| `Has_client_id of Client_id.t
325+
| `Client_addr_port of Client_id.t
326+
| `Listening_port
327+
| `Set_monitor_interval of Core.Std.Time.Span.t
328+
| `Set_idle_wait of Core.Std.Time.Span.t
329+
| `Set_kill_wait of Core.Std.Time.Span.t
330+
| `Get_switches
331+
| `Clear_flows of OpenFlow0x01_Core.pattern * Client_id.t
332+
| `Send_flow_mods of bool * Client_id.t * OpenFlow0x01_Core.flowMod list
333+
| `Send_pkt_out of Client_id.t * OpenFlow0x01_Core.packetOut
334+
| `Aggregate_stats of OpenFlow0x01_Core.pattern * Client_id.t
335+
],
336+
[ `Barrier_resp of (unit, exn) Result.t
337+
| `Individual_stats_resp of
338+
(OpenFlow0x01_Stats.individualStats list, exn) Result.t
339+
| `Listen_resp of
340+
([ `Connect of
341+
OpenFlow0x01.switchId * OpenFlow0x01.SwitchFeatures.t
342+
| `Disconnect of SDN_Types.switchId * Core.Std.Sexp.t
343+
| `Message of
344+
SDN_Types.switchId * Message.t ],
345+
[ `Connect of
346+
OpenFlow0x01.switchId * OpenFlow0x01.SwitchFeatures.t
347+
| `Disconnect of SDN_Types.switchId * Core.Std.Sexp.t
348+
| `Message of
349+
SDN_Types.switchId * Message.t ]) Channel.t
350+
| `Send_resp of [ `Drop of exn | `Sent of Time.t ]
351+
| `Has_client_id_resp of bool
352+
| `Client_addr_port_resp of (Unix.Inet_addr.t * int) option
353+
| `Listening_port_resp of int
354+
| `Get_switches_resp of SDN_Types.switchId list
355+
| `Clear_flows_resp of (unit, exn) Result.t
356+
| `Send_flow_mods_resp of (unit, exn) Result.t
357+
| `Send_pkt_out_resp of (unit, exn) Result.t
358+
| `Aggregate_stats_resp of (OpenFlow0x01_Stats.aggregateStats, exn) Result.t
359+
]) Channel.t
360+
361+
let aggregate_stats ?(pattern=C.match_all) (t : t) sw_id =
362+
Channel.write t (`Aggregate_stats (pattern, sw_id));
363+
Channel.read t >>| function
364+
| `Aggregate_stats_resp resp -> resp
365+
366+
let send_pkt_out (t : t) (sw_id:Client_id.t) pkt_out =
367+
Channel.write t (`Send_pkt_out (sw_id, pkt_out));
368+
Channel.read t >>| function
369+
| `Send_pkt_out_resp resp -> resp
370+
371+
let send_flow_mods ?(clear=true) (t : t) (sw_id:Client_id.t) flow_mods =
372+
Channel.write t (`Send_flow_mods (clear, sw_id, flow_mods));
373+
Channel.read t >>| function
374+
| `Send_flow_mods_resp resp -> resp
375+
376+
let clear_flows ?(pattern=C.match_all) (t : t) (sw_id:Client_id.t) =
377+
Channel.write t (`Clear_flows (pattern, sw_id));
378+
Channel.read t >>| function
379+
| `Clear_flows_resp resp -> resp
380+
381+
let get_switches (t : t) =
382+
Channel.write t `Get_switches;
383+
Channel.read t >>| function
384+
| `Get_switches_resp resp -> resp
385+
386+
let set_kill_wait t (s:Time.Span.t) =
387+
Channel.write t (`Set_kill_wait s)
388+
389+
let set_monitor_interval t (s:Time.Span.t) =
390+
Channel.write t (`Set_monitor_interval s)
391+
392+
let set_idle_wait t (s:Time.Span.t) : unit =
393+
Channel.write t (`Set_idle_wait s)
394+
395+
let listening_port (t : t) =
396+
Channel.write t `Listening_port;
397+
Channel.read t >>| function
398+
| `Listening_port_resp resp -> resp
399+
400+
let client_addr_port (t : t) sw_id =
401+
Channel.write t (`Client_addr_port sw_id);
402+
Channel.read t >>| function
403+
| `Client_addr_port_resp resp -> resp
404+
405+
let send_to_all (t : t) msg =
406+
Channel.write t (`Send_to_all msg)
407+
408+
let send_ignore_errors (t : t) sw_id msg =
409+
Channel.write t (`Send_ignore_errors (sw_id, msg))
410+
411+
let has_client_id (t : t) sw_id =
412+
Channel.write t (`Has_client_id sw_id);
413+
Channel.read t >>| function
414+
| `Has_client_id_resp resp -> resp
415+
416+
let close (t : t) sw_id =
417+
Channel.write t (`Close sw_id)
418+
419+
type e = ControllerProcess.e
420+
type m = ControllerProcess.m
421+
type c = ControllerProcess.c
422+
423+
let create_from_chunk chunk =
424+
Intf.spawn (create_from_chunk_hub chunk) >>| fun (c,_) ->
425+
c
426+
427+
let create ?max_pending_connections
428+
?verbose
429+
?log_disconnects
430+
?buffer_age_limit
431+
?monitor_connections ~port () : t Deferred.t =
432+
Intf.spawn (create ?max_pending_connections
433+
?verbose
434+
?log_disconnects
435+
?buffer_age_limit
436+
?monitor_connections ~port) >>| fun (c,_) ->
437+
c
438+
439+
let send (t : t) sw_id msg =
440+
Channel.write t (`Send (sw_id, msg));
441+
Channel.read t >>| function
442+
| `Send_resp resp -> resp
443+
444+
let channel_transfer chan writer =
445+
Deferred.forever () (fun _ -> Channel.read chan >>=
446+
Pipe.write writer)
447+
let listen (t : t) =
448+
Channel.write t `Listen;
449+
let reader,writer = Pipe.create () in
450+
don't_wait_for (
451+
Channel.read t >>| function
452+
| `Listen_resp resp -> channel_transfer resp writer);
453+
reader
454+
455+
let barrier (t : t) sw_id =
456+
Channel.write t (`Barrier sw_id);
457+
Channel.read t >>| function
458+
| `Barrier_resp resp -> resp
459+
460+
let individual_stats ?(pattern=C.match_all) (t : t) sw_id =
461+
Channel.write t (`Individual_stats (pattern, sw_id));
462+
Channel.read t >>| function
463+
| `Individual_stats_resp resp -> resp
255464
end

async/Async_OpenFlowChunk.ml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,13 @@ module Controller = struct
241241
Platform.close t.platform c_id
242242

243243
let has_client_id t c_id =
244-
Platform.has_client_id t.platform c_id &&
245-
match Client_id.Table.find t.clients c_id with
244+
Platform.has_client_id t.platform c_id >>| function
245+
| true -> begin
246+
match Client_id.Table.find t.clients c_id with
246247
| Some({ Conn.version = Some(_) }) -> true
247248
| _ -> false
249+
end
250+
| false -> false
248251

249252
let send t c_id m =
250253
Platform.send t.platform c_id m

0 commit comments

Comments
 (0)