diff --git a/src/main/java/org/flowforwarding/warp/demo/api/JavaDynamicLauncher.java b/src/main/java/org/flowforwarding/warp/demo/api/JavaDynamicLauncher.java new file mode 100644 index 0000000..3140128 --- /dev/null +++ b/src/main/java/org/flowforwarding/warp/demo/api/JavaDynamicLauncher.java @@ -0,0 +1,36 @@ +package org.flowforwarding.warp.demo.api; + +import java.util.HashSet; + +import org.flowforwarding.warp.controller.Controller; +import org.flowforwarding.warp.controller.session.SessionHandlerRef; + +import org.flowforwarding.warp.controller.api.SessionHandler; +import org.flowforwarding.warp.controller.api.dynamic.DynamicMessageHandler; +import org.flowforwarding.warp.controller.api.fixed.SpecificVersionMessageHandler; + +import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderFactoryAdapter; +import org.flowforwarding.warp.protocol.adapter.JDriverMessage; +import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderAdapter; +import org.flowforwarding.warp.protocol.ofmessages.OFMessageProviderFactoryAvroProtocol; + +class JavaDynamicLauncher{ + public static void main(String[] args){ + IOFMessageProviderFactoryAdapter factory = new IOFMessageProviderFactoryAdapter(new OFMessageProviderFactoryAvroProtocol()); + + final SessionHandlerRef launcher = SessionHandler.makeRef( + factory, + new HashSet>() + {{ + //add(new SimpleJavaDynamicHandler()); + }}, + new HashSet>() + {{ + add(new SimpleJavaOfp13Handler()); + }}); + + Controller.launch(new HashSet() {{ + add(launcher); + }}); + } +} diff --git a/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaDynamicHandler.java b/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaDynamicHandler.java new file mode 100644 index 0000000..e3b891c --- /dev/null +++ b/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaDynamicHandler.java @@ -0,0 +1,38 @@ +/** + * В© 2013 FlowForwarding.Org + * All Rights Reserved. Use is subject to license terms. + */ +package org.flowforwarding.warp.demo.api; + +import org.flowforwarding.warp.controller.api.dynamic.DynamicMessageHandler; +import org.flowforwarding.warp.protocol.adapter.JDriverMessage; +import org.flowforwarding.warp.protocol.adapter.JDriverMessageBuilder; +import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderAdapter; + +class SimpleJavaDynamicHandler extends DynamicMessageHandler { + @Override + public short[] supportedVersions(){ + return new short[] { 4 }; //1.3 only + } + + @Override + public JDriverMessage[] onDynamicMessage(IOFMessageProviderAdapter driver, long dpid, JDriverMessage msg) { + if(msg.isTypeOf("ofp_switch_features_reply")){ + System.out.println("DPID from dynamic message: " + msg.primitiveField("datapathId")); + JDriverMessageBuilder reqHeader = driver.getBuilder("ofp_header") + .setMember("xid", 0) + .setMember("length", 8 + 5); + + JDriverMessageBuilder request = driver.getBuilder("echo_request") + .setMember("header", reqHeader.build()) + .setMember("elements", new byte[]{2, 2, 2, 2, 2}); + return new JDriverMessage[] { request.build() }; + } + else if(msg.isTypeOf("echo_reply")){ + System.out.println("[OF-INFO] DPID: " + dpid + " Length of echo reply: " + msg.primitivesSequence("elements").length); + return new JDriverMessage[]{}; + } + else return new JDriverMessage[]{}; + } +} + diff --git a/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaOfp13Handler.java b/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaOfp13Handler.java new file mode 100644 index 0000000..81d0d96 --- /dev/null +++ b/src/main/java/org/flowforwarding/warp/demo/api/SimpleJavaOfp13Handler.java @@ -0,0 +1,27 @@ +package org.flowforwarding.warp.demo.api; + +import org.flowforwarding.warp.controller.api.fixed.BuilderInput; +import org.flowforwarding.warp.controller.api.fixed.v13.*; +import org.flowforwarding.warp.protocol.adapter.JDriverMessage; +import org.flowforwarding.warp.protocol.adapter.JDriverMessageBuilder; + +class SimpleJavaOfp13Handler extends Ofp13MessageHandler { + + public SimpleJavaOfp13Handler() { + super(JDriverMessage.class); + } + + @Override + public BuilderInput[] onEchoReply(long dpid, EchoReply msg) { + System.out.println("[OF-INFO] DPID: " + dpid + " Length of echo reply: " + msg.elements().length); + return new BuilderInput[0]; + } + + @Override + public BuilderInput[] onFeaturesReply(long dpid, FeaturesReply msg) { + System.out.println("DPID from dynamic message: " + msg.datapathId()); + System.out.println("Features: " + msg.capabilities()); + + return new BuilderInput[] { new EchoRequestInput(msg.header().xid(), new byte[]{2, 2, 2, 2, 2}) }; + } +} \ No newline at end of file diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..20949ba --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,52 @@ +ofp13 { + Header { + type_name = "ofp_header" + + xid = "xid" + length = "length" + } + + Hello { + type_name = "ofp_hello" + + header = "header" + elements = "elements" + } + + HelloElemVersionBitmap { + type_name = "ofp_hello_elem_version_bitmap" + + bitmaps = "bitmaps" + } + + FeaturesRequest { + header = "header" + type_name = "ofp_switch_features_request" + } + + FeaturesReply { + type_name = "ofp_switch_features_reply" + + header = "header" + + datapath_id = "datapathId" + n_buffers = "nBuffers" + n_tables = "nTables" + auxiliary_id = "auxiliaryId" + capabilities = "capabilities" + } + + EchoRequest { + type_name = "echo_request" + + header = "header" + elements = "elements" + } + + EchoReply { + type_name = "echo_reply" + + header = "header" + elements = "elements" + } +} \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/adapter/adapter.scala b/src/main/scala/org/flowforwarding/warp/adapter/adapter.scala index e42fca2..91e82d1 100644 --- a/src/main/scala/org/flowforwarding/warp/adapter/adapter.scala +++ b/src/main/scala/org/flowforwarding/warp/adapter/adapter.scala @@ -1,41 +1,65 @@ package org.flowforwarding.warp.protocol.adapter import scala.util.Try - -import org.flowforwarding.warp.controller.session.{OFSessionHandler, MessageDriverFactory, OFMessage, MessageDriver} - +import org.flowforwarding.warp.controller.session._ +import org.flowforwarding.warp.controller.api.dynamic._ import org.flowforwarding.warp.protocol.ofmessages.{OFMessageRef, IOFMessageProviderFactory, IOFMessageProvider} -import org.flowforwarding.warp.protocol.ofmessages.OFMessageHello.OFMessageHelloRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfigRequest.OFMessageSwitchConfigRequestRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchFeaturesRequest.OFMessageSwitchFeaturesRequestRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoReply.OFMessageEchoReplyRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoRequest.OFMessageEchoRequestRef import org.flowforwarding.warp.protocol.ofmessages.OFMessageFlowMod.OFMessageFlowModRef import org.flowforwarding.warp.protocol.ofmessages.OFMessageGroupMod.OFMessageGroupModRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessagePacketIn.OFMessagePacketInRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfig.OFMessageSwitchConfigRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageError.OFMessageErrorRef +import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoRequest.OFMessageEchoRequestRef +import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoReply.OFMessageEchoReplyRef +import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchFeaturesRequest.OFMessageSwitchFeaturesRequestRef +import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfigRequest.OFMessageSwitchConfigRequestRef +import org.flowforwarding.warp.protocol.ofmessages.OFMessageHello.OFMessageHelloRef + + +case class JDriverMessage(ref: OFMessageRef[_]) extends DynamicStructure[JDriverMessage]{ + def primitiveField(name: String): Long = ??? + + def structureField(name: String): JDriverMessage = ??? + + def primitivesSequence(name: String): Array[Long] = ??? + + def structuresSequence(name: String): Array[JDriverMessage] = ??? + + def isTypeOf(typeName: String): Boolean = ??? +} +class JDriverMessageBuilder extends DynamicStructureBuilder[JDriverMessageBuilder, JDriverMessage]{ + def setMember(memberName: String, value: Long): JDriverMessageBuilder = ??? -case class JDriverMessage(ref: OFMessageRef[_]) extends OFMessage + def setMember[T](memberName: String, values: Array[T]): JDriverMessageBuilder = ??? -case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends MessageDriver[JDriverMessage]{ + def setMember(memberName: String, value: JDriverMessage): JDriverMessageBuilder = ??? + + def build: JDriverMessage = ??? +} + +case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends DynamicDriver[JDriverMessageBuilder, JDriverMessage]{ provider.init() + def getBuilder(msgType: String): JDriverMessageBuilder = ??? + + def getHelloMessage(supportedVersions: Array[Short]): Array[Byte] = ??? + + def rejectVersionError(reason: String): Array[Byte] = ??? + + def getFeaturesRequest: Array[Byte] = ??? + def decodeMessage(in: Array[Byte]): Try[JDriverMessage] = Try { val res = if (provider.isHello(in)) - provider.parseHelloMessage(in) - else if (provider.isPacketIn(in)) - provider.parsePacketIn(in) - else if(provider.isConfig(in)) - provider.parseSwitchConfig(in) - else if (provider.isError(in)) - provider.parseError(in) - //else if (provider.isEchoRequest(in)) - // provider.parseEchoRequest(in) - //else if (provider.isSwitchFeatures(in)) - // provider.parseSwitchFeatures(in) - else throw new RuntimeException("Unrecognized message") + provider.parseHelloMessage(in) + else if (provider.isPacketIn(in)) + provider.parsePacketIn(in) + else if(provider.isConfig(in)) + provider.parseSwitchConfig(in) + else if (provider.isError(in)) + provider.parseError(in) + //else if (provider.isEchoRequest(in)) + // provider.parseEchoRequest(in) + //else if (provider.isSwitchFeatures(in)) + // provider.parseSwitchFeatures(in) + else throw new RuntimeException("Unrecognized message") JDriverMessage(res) } @@ -57,35 +81,9 @@ case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends Messa val versionCode: Short = provider.getVersion } -case class IOFMessageProviderFactoryAdapter(factory: IOFMessageProviderFactory) extends MessageDriverFactory[JDriverMessage]{ - def get(versionCode: Short): Option[MessageDriver[JDriverMessage]] = - Try(factory.getMessageProvider(versionCode)).map(IOFMessageProviderAdapter.apply).toOption -} - -abstract class OFJDriverSessionHandler(pFactory: IOFMessageProviderFactory) extends OFSessionHandler(IOFMessageProviderFactoryAdapter(pFactory)){ - - private val providers = scala.collection.mutable.Map[Short, IOFMessageProvider]() - - override def connected(versionCode: Short) { - if(!providers.contains(versionCode)) - providers(versionCode) = pFactory.getMessageProvider(versionCode) - } - - implicit def refsToMessages(refs: Seq[OFMessageRef[_]]) = refs map JDriverMessage.apply - - protected def getHandshakeMessage(version: Short, msg: JDriverMessage): Seq[JDriverMessage] = { - refsToMessages(Seq(OFMessageHelloRef.create, OFMessageSwitchFeaturesRequestRef.create)) - } - - protected def onReceivedMessage(version: Short, dpid: Long, msg: JDriverMessage): Seq[JDriverMessage] = { - msg.ref match{ - case p: OFMessagePacketInRef => packetIn(providers(version), dpid, p) - case c: OFMessageSwitchConfigRef => switchConfig(providers(version), dpid, c) - case e: OFMessageErrorRef => error(providers(version), dpid, e) - } - } +case class IOFMessageProviderFactoryAdapter(factory: IOFMessageProviderFactory) extends MessageDriverFactory[JDriverMessage, IOFMessageProviderAdapter]{ + def get(versionCode: Short): IOFMessageProviderAdapter = + IOFMessageProviderAdapter(factory.getMessageProvider(versionCode)) - def packetIn(provider: IOFMessageProvider, dpid: Long, pIn: OFMessagePacketInRef): Seq[OFMessageRef[_]] - def switchConfig(provider: IOFMessageProvider, dpid: Long, config: OFMessageSwitchConfigRef): Seq[OFMessageRef[_]] - def error(provider: IOFMessageProvider, dpid: Long, error: OFMessageErrorRef): Seq[OFMessageRef[_]] + def supportedVersions: Array[Short] = Array(4.toShort) // ??? } \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/Controller.scala b/src/main/scala/org/flowforwarding/warp/controller/Controller.scala index 8a213d2..831cf08 100644 --- a/src/main/scala/org/flowforwarding/warp/controller/Controller.scala +++ b/src/main/scala/org/flowforwarding/warp/controller/Controller.scala @@ -14,16 +14,29 @@ import org.flowforwarding.warp.controller.session._ case class Configuration(ip: String = "10.17.10.126", tcpPort: Int = 6633) object Controller { - def launch(protocolHandlers: Set[SessionHandlerLauncher], config: Configuration = Configuration()) + def launch(sessionHandlers: scala.collection.Set[SessionHandlerRef], config: Configuration) (implicit actorSystem: ActorSystem = ActorSystem.create("OfController")) = { val manager = Tcp.get(actorSystem).manager - actorSystem.actorOf(Props.create(classOf[Controller], manager, config, protocolHandlers), "Controller-Dispatcher") + actorSystem.actorOf(Props.create(classOf[Controller], manager, config, sessionHandlers), "Controller-Dispatcher") + } + + def launch(sessionHandlers: SessionHandlerRef*): ActorRef = { + launch(sessionHandlers.toSet, Configuration()) + } + + def launch(sessionHandlers: java.util.Set[SessionHandlerRef], config: Configuration, actorSystem: ActorSystem) { + val sh = scala.collection.JavaConversions.asScalaSet(sessionHandlers) + launch(sh, config)(actorSystem) + } + + def launch(sessionHandlers: java.util.Set[SessionHandlerRef]) { + launch(sessionHandlers, Configuration(), ActorSystem.create("OfController")) } } -private class Controller(manager: ActorRef, config: Configuration, messageHandlers: Set[SessionHandlerLauncher]) extends Actor { +private class Controller(manager: ActorRef, config: Configuration, messageHandlers: scala.collection.Set[SessionHandlerRef]) extends Actor { - var sessionHandlers: Set[ActorRef] = Set.empty + var sessionHandlers: scala.collection.Set[ActorRef] = Set.empty override def preStart() { manager ! TcpMessage.bind(self, new InetSocketAddress(config.ip, config.tcpPort), 100) @@ -35,10 +48,10 @@ private class Controller(manager: ActorRef, config: Configuration, messageHandle sessionHandlers = messageHandlers map { _.launch } case Tcp.CommandFailed => context stop self - case connected: Tcp.Connected => - manager ! connected + case c @ Tcp.Connected(remoteAddress, localAddress) => + manager ! c println("[INFO] Getting Switch connection \n") - val connectionHandler = context.actorOf(Props.create(classOf[session.SwitchNurse], sessionHandlers)) + val connectionHandler = context.actorOf(Props.create(classOf[SwitchNurse], sessionHandlers, remoteAddress, localAddress)) sender ! TcpMessage.register(connectionHandler) // TODO: handle messages from RestApi } diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/SessionHandler.scala b/src/main/scala/org/flowforwarding/warp/controller/api/SessionHandler.scala new file mode 100644 index 0000000..6e40d5c --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/SessionHandler.scala @@ -0,0 +1,64 @@ +package org.flowforwarding.warp.controller.api + +import scala.util.{Failure, Success, Try} +import scala.collection.JavaConversions + +import org.flowforwarding.warp.controller.api.fixed.SpecificVersionMessageHandler +import org.flowforwarding.warp.controller.api.dynamic.{DynamicMessageHandler, DynamicStructure, DynamicDriver} +import org.flowforwarding.warp.controller.session.{MessageDriver, SessionHandlerRef, MessageDriverFactory, LowLevelSessionHandler} + +class NoSuitableMessageHandlerException extends Exception + +private[api] class SessionHandler[DriverType <: DynamicDriver[_, StructureType], + StructureType <: DynamicStructure[StructureType]] + (driversFactory: MessageDriverFactory[StructureType, DriverType], + dynamicMessageHandlers: scala.collection.Set[DynamicMessageHandler[DriverType, StructureType]], + versionHandlers: scala.collection.Set[SpecificVersionMessageHandler[_, StructureType]]) extends LowLevelSessionHandler[StructureType, DriverType](driversFactory) { + + def definedHandlersVersions: Array[Short] = (versionHandlers.map(_.versionCode) ++ dynamicMessageHandlers.map(_.supportedVersions).flatten).toArray + + protected def onReceivedMessage(driver: DriverType, dpid: Long, msg: StructureType): Seq[StructureType] = { + val tryHandle = versionHandlers + .collectFirst { case h => h.handle(driver, dpid, msg) } + .flatten + .getOrElse { Try { dynamicMessageHandlers + .collectFirst { case h if h.supportsVersion(driver.versionCode) => h.onDynamicMessage(driver, dpid, msg) } + .getOrElse { throw new NoSuitableMessageHandlerException }}} + + tryHandle match { + case Success(result) => result + case Failure(e) => e.printStackTrace(); Seq() + } + } +} + +object SessionHandler{ + def makeRef[DriverType <: DynamicDriver[_, StructureType], + StructureType <: DynamicStructure[StructureType]] + (dynamicMessageHandler: scala.collection.Set[DynamicMessageHandler[DriverType, StructureType]] = Set.empty[DynamicMessageHandler[DriverType, StructureType]], + versionHandlers: scala.collection.Set[SpecificVersionMessageHandler[_, StructureType]] = Set.empty[SpecificVersionMessageHandler[_, StructureType]] ) + (implicit driversFactory: MessageDriverFactory[StructureType, DriverType]): SessionHandlerRef = { + + new SessionHandlerRef(classOf[SessionHandler[DriverType, StructureType]], driversFactory, dynamicMessageHandler, versionHandlers) + } + + def makeRef[DriverType <: DynamicDriver[_, StructureType], + StructureType <: DynamicStructure[StructureType]] + (driversFactory: MessageDriverFactory[StructureType, DriverType], + dynamicMessageHandlers: java.util.Set[DynamicMessageHandler[DriverType, StructureType]], + versionHandlers: java.util.Set[SpecificVersionMessageHandler[_, StructureType]]): SessionHandlerRef = { + + import JavaConversions.asScalaSet + + val x: scala.collection.Set[DynamicMessageHandler[DriverType, StructureType]] = + asScalaSet[DynamicMessageHandler[DriverType, StructureType]](dynamicMessageHandlers) + + val y: scala.collection.Set[SpecificVersionMessageHandler[_, StructureType]] = + asScalaSet[SpecificVersionMessageHandler[_, StructureType]](versionHandlers) + + makeRef(x, y)(driversFactory) + } +} + + + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicDriver.scala b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicDriver.scala new file mode 100644 index 0000000..6027b9d --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicDriver.scala @@ -0,0 +1,8 @@ +package org.flowforwarding.warp.controller.api.dynamic + +import org.flowforwarding.warp.controller.session.MessageDriver + +trait DynamicDriver[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends MessageDriver[StructureType]{ + def getBuilder(msgType: String): BuilderType +} \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicMessageHandler.scala b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicMessageHandler.scala new file mode 100644 index 0000000..6d21e0b --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/DynamicMessageHandler.scala @@ -0,0 +1,7 @@ +package org.flowforwarding.warp.controller.api.dynamic + +abstract class DynamicMessageHandler[DriverType <: DynamicDriver[_, StructureType], StructureType <: DynamicStructure[StructureType]] { + def supportsVersion(versionCode: Short) = supportedVersions.contains(versionCode) + def supportedVersions: Array[Short] + def onDynamicMessage(driver: DriverType, dpid: Long, msg: StructureType): Array[StructureType] +} diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/dynamic.scala b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/dynamic.scala new file mode 100644 index 0000000..597c2d4 --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/dynamic.scala @@ -0,0 +1,24 @@ +package org.flowforwarding.warp.controller.api.dynamic + +import org.flowforwarding.warp.controller.session.OFMessage + +trait DynamicStructure[SelfType <: DynamicStructure[SelfType]] extends OFMessage{ + def primitiveField(name: String): Long + def structureField(name: String): SelfType + def primitivesSequence(name: String): Array[Long] + def structuresSequence(name: String): Array[SelfType] + def isTypeOf(typeName: String): Boolean +} + +trait DynamicStructureBuilder[SelfType <: DynamicStructureBuilder[SelfType, StructureType], + StructureType <: DynamicStructure[StructureType]]{ + def setMember(memberName: String, value: Long): SelfType + def setMember[T](memberName: String, values: Array[T]): SelfType + def setMember(memberName: String, value: StructureType): SelfType + def build: StructureType +} + + + + + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/scala/api.scala b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/scala/api.scala new file mode 100644 index 0000000..cfd8397 --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/dynamic/scala/api.scala @@ -0,0 +1,45 @@ +package org.flowforwarding.warp.controller.api.dynamic.scala + +import scala.language.dynamics + +import org.flowforwarding.warp.controller.api.dynamic.{DynamicDriver, DynamicStructure, DynamicStructureBuilder} + +trait DynamicStructureScalaAPI[SelfType <: DynamicStructureScalaAPI[SelfType]] extends DynamicStructure[SelfType] with Dynamic{ + def selectDynamic(name: String): SelfType = structureField(name) + + object primitives extends Dynamic{ + def selectDynamic(name: String): Long = primitiveField(name) + } + + object sequencesOfPrimitives extends Dynamic{ + def selectDynamic(name: String): Seq[Long] = primitivesSequence(name) + } + + object sequencesOfStructures extends Dynamic{ + def selectDynamic(name: String): Seq[SelfType] = structuresSequence(name) + } + + object typeOf extends Dynamic{ + def selectDynamic(name: String): Boolean = isTypeOf(name) + } +} + +trait DynamicStructureBuilderScalaAPI[SelfType <: DynamicStructureBuilderScalaAPI[SelfType, StructureType], + StructureType <: DynamicStructureScalaAPI[StructureType]] extends + DynamicStructureBuilder[SelfType, StructureType] with Dynamic{ + + def applyDynamic(name: String) = new MemberSetter(name) + + class MemberSetter(memberName: String){ + def apply(value: StructureType) = setMember(memberName, value) + def apply(value: Long) = setMember(memberName, value) + def apply[T](values: Array[T]) = setMember(memberName, values) + } +} + +trait DynamicDriverScalaAPI[BuilderType <: DynamicStructureBuilderScalaAPI[BuilderType, StructureType], + StructureType <: DynamicStructureScalaAPI[StructureType]] extends + DynamicDriver[BuilderType, StructureType] with Dynamic{ + + def selectDynamic(msgType: String): BuilderType = getBuilder(msgType) +} diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/BasicStructuresDescription.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/BasicStructuresDescription.scala new file mode 100644 index 0000000..6598ca4 --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/BasicStructuresDescription.scala @@ -0,0 +1,67 @@ +package org.flowforwarding.warp.controller.api.fixed + +import _root_.scala.reflect.ClassTag + +import org.flowforwarding.warp.controller.api.dynamic._ + +trait Message{ + def header: Header +} + +case class Header(length: Int, xid: Int) extends BuilderInput + +trait MessageInput extends BuilderInput{ + def header: Header = new Header(length, xid) + def length: Int + def xid: Int +} + +trait BasicStructuresDescription[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] { + apiProvider: DriverApiHelper[BuilderType, StructureType] => + + trait OfpStructure extends ConcreteStructure{ + val namesConfig = apiProvider.namesConfig + } + + trait OfpMessage extends OfpStructure { + def header: Header = { + val h = underlyingStructure.structureField(getFieldName("header")(ClassTag(this.getClass))) + val xid = h.primitiveField(getFieldName[Header]("xid")).toInt + val length = h.primitiveField(getFieldName[Header]("length")).toInt + Header(length, xid) + } + } + + trait OfpStructureBuilder[Input <: BuilderInput] extends ConcreteStructureBuilder[Input, BuilderType, StructureType] { + protected val dynamicBuilder: BuilderType = apiProvider.getDynamicBuilder(this.getClass).get + protected val namesConfig = apiProvider.namesConfig + } + + trait OfpMessageBuilder[Input <: MessageInput] extends OfpStructureBuilder[Input]{ + // Fills the underlying builder with the specified input. + protected def applyInput(input: Input): Unit = { + // builder already must contain message type and version !!! + dynamicBuilder.setMember(getFieldName("header")(ClassTag(this.getClass)), new HeaderBuilder().build(input.header)) + } + } + + class HeaderBuilder extends OfpStructureBuilder[Header]{ + protected def applyInput(input: Header): Unit = { + dynamicBuilder.setMember(getFieldName[Header]("xid"), input.xid) + dynamicBuilder.setMember(getFieldName[Header]("length"), input.length) + } + } + + case class HeaderStructure(underlyingStructure: StructureType) extends OfpStructure + + // These lists are provided using stackable trait pattern + override def builderClasses: List[Class[_ <: OfpStructureBuilder[_]]] = List() + override def structureClasses: List[Class[_ <: OfpStructure]] = List() +} + + + + + + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/basic.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/basic.scala new file mode 100644 index 0000000..4309b9b --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/basic.scala @@ -0,0 +1,118 @@ +package org.flowforwarding.warp.controller.api.fixed + +import scala.util.Try +import scala.reflect.ClassTag + +import com.typesafe.config.Config + +import org.flowforwarding.warp.controller.api.dynamic.{DynamicDriver, DynamicStructure, DynamicStructureBuilder} + + +trait BuilderInput + +object StructureName { + def unapply(c: Class[_]): Option[String] = Some(c.getSimpleName.stripSuffix("Builder").stripSuffix("Structure")) +} + +object Utils{ + implicit class ConfigExt(namesConfig: Config){ + def getTypeName[T: ClassTag]: String = { + val StructureName(name) = implicitly[ClassTag[T]].runtimeClass + namesConfig.getConfig(name).getString("type_name") + } + } + + implicit class LongExt(l: Long){ + def testBit(bit: Byte): Boolean = (l & (1 << bit)) != 0 + } +} + +import Utils._ + +trait ConfigurableStructure{ + protected val namesConfig: Config + + def getFieldName[T: ClassTag](fieldKey: String): String = { + val StructureName(name) = implicitly[ClassTag[T]].runtimeClass + namesConfig.getConfig(name).getString(fieldKey) + } +} + +trait ConcreteStructure extends ConfigurableStructure{ + val namesConfig: Config + val underlyingStructure: DynamicStructure[_ <: DynamicStructure[_]] + + def getPrimitive(fieldKey: String) = underlyingStructure.primitiveField(getFieldName(fieldKey)(ClassTag(this.getClass))) +} + +abstract class ConcreteStructureBuilder[Input <: BuilderInput, + BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends ConfigurableStructure{ + protected val dynamicBuilder: BuilderType + // Fills the underlying builder with the specified input. + protected def applyInput(input: Input): Unit + def build(input: Input): StructureType = { + applyInput(input) + dynamicBuilder.build + } +} + +abstract class SpecificVersionMessageHandler[StaticApiProvider: ClassTag, StructureType <: DynamicStructure[StructureType]: ClassTag]{ + + def handle(driver: DynamicDriver[_, StructureType], dpid: Long, msg: StructureType) = + driverAsApiProvider(driver) map { provider => onCommonMessage(provider, dpid, msg) } + + def versionCode: Short + + protected[api] def driverAsApiProvider(driver: DynamicDriver[_, StructureType]): Option[StaticApiProvider] = { + val apiInterface = implicitly[ClassTag[StaticApiProvider]].runtimeClass + if(driver.getClass.getInterfaces.contains(apiInterface)) + Some(driver.asInstanceOf[StaticApiProvider]) + else + None + } + + protected def onCommonMessage(apiProvider: StaticApiProvider, dpid: Long, msg: StructureType): Try[Array[StructureType]] +} + +trait DriverApiHelper[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]]{ + driver: DynamicDriver[BuilderType, StructureType] => + + protected[api] val namesConfig: Config + + private[api] val builderClasses: List[Class[_ <: ConcreteStructureBuilder[_, BuilderType, StructureType]]] = List() + private[api] val structureClasses: List[Class[_ <: ConcreteStructure]] = List() + + private[api] def getConcreteStructure(dynamic: StructureType): Try[ConcreteStructure] = Try { + structureClasses collectFirst { + case c if dynamic.isTypeOf(namesConfig.getTypeName(ClassTag(c))) => + // Every XXXStructure type is inner class, so its constructor must get reference to outer class + c.getConstructors.head.newInstance(this, dynamic).asInstanceOf[ConcreteStructure] + } match { + case Some(structure) => structure + case None => throw new RuntimeException("Undefined type of structure.") + } + } + + private[api] def getDynamicBuilder(c: Class[_]): Try[BuilderType] = Try { getBuilder(namesConfig.getTypeName(ClassTag(c))) } + + def firstGenericParameter(c: Class[_]): Option[Class[_]] = Try { + c.getGenericSuperclass + .asInstanceOf[java.lang.reflect.ParameterizedType] + .getActualTypeArguments + .head + .asInstanceOf[Class[_]] + }.toOption + + def buildInput[X <: BuilderInput](i: X): StructureType = { + builderClasses.collectFirst { case b if firstGenericParameter(b) == Some(i.getClass) => b } + .get + .getConstructors + .head + .newInstance(this) + .asInstanceOf[ConcreteStructureBuilder[X, BuilderType, StructureType]] // could be cached + .build(i) + } +} + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13DriverApi.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13DriverApi.scala new file mode 100644 index 0000000..d534e51 --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13DriverApi.scala @@ -0,0 +1,47 @@ +package org.flowforwarding.warp.controller.api.fixed.v13 + +import scala.util.Try +import scala.reflect.ClassTag + +import org.flowforwarding.warp.controller.api.dynamic.{DynamicDriver, DynamicStructureBuilder, DynamicStructure} +import org.flowforwarding.warp.controller.api.fixed._ +import scala.util.Success +import scala.util.Failure + +trait Ofp13DriverApi[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], StructureType <: DynamicStructure[StructureType]] + extends DriverApiHelper [BuilderType, StructureType] with + Ofp13HelloDescription [BuilderType, StructureType] with + Ofp13FeaturesDescription [BuilderType, StructureType] with + Ofp13EchoReplyDescription [BuilderType, StructureType] with + Ofp13EchoRequestDescription[BuilderType, StructureType] +{ driver: DynamicDriver[BuilderType, StructureType] => } + +abstract class Ofp13MessageHandler[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]](structureClass: Class[StructureType]) + extends SpecificVersionMessageHandler[Ofp13DriverApi[BuilderType, StructureType], StructureType]()( + ClassTag(classOf[Ofp13DriverApi[BuilderType, StructureType]]), + ClassTag(structureClass)){ + + val versionCode: Short = 4 + + type Api = Ofp13DriverApi[BuilderType, StructureType] + + private implicit val structureTag: ClassTag[StructureType] = ClassTag(structureClass) + + protected[api] def onCommonMessage(apiProvider: Api, dpid: Long, msg: StructureType): Try[Array[StructureType]] = Try { + apiProvider.getConcreteStructure(msg) match { + case Success(m: Hello) => onHello(dpid, m) + case Success(m: FeaturesReply) => onFeaturesReply(dpid, m) + case Success(m: EchoRequest) => onEchoRequest(dpid, m) + case Success(m: EchoReply) => onEchoReply(dpid, m) + case Success(_) => ??? // etc... + case Failure(t) => throw t + } + } map { _.map(apiProvider.buildInput) } + + protected def onHello(dpid: Long, msg: Hello): Array[BuilderInput] = Array.empty[BuilderInput] + protected def onFeaturesReply(dpid: Long, msg: FeaturesReply): Array[BuilderInput] = Array.empty[BuilderInput] + protected def onEchoRequest(dpid: Long, msg: EchoRequest): Array[BuilderInput] = Array.empty[BuilderInput] + protected def onEchoReply(dpid: Long, msg: EchoReply): Array[BuilderInput] = Array.empty[BuilderInput] + // etc +} \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoReplyDescription.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoReplyDescription.scala new file mode 100644 index 0000000..34f62cf --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoReplyDescription.scala @@ -0,0 +1,33 @@ +package org.flowforwarding.warp.controller.api.fixed.v13 + +import org.flowforwarding.warp.controller.api.dynamic._ +import org.flowforwarding.warp.controller.api.fixed._ + +trait EchoReply extends Message{ + def elements: Array[Byte] +} + +case class EchoReplyInput(xid: Int, elements: Array[Byte]) extends MessageInput{ + def length: Int = 8 + elements.length +} + +trait Ofp13EchoReplyDescription[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends BasicStructuresDescription[BuilderType, StructureType] { + apiProvider: DriverApiHelper[BuilderType, StructureType] => + + class EchoReplyBuilder extends OfpMessageBuilder[EchoReplyInput]{ + // Fills the underlying builder with the specified input. + override protected def applyInput(input: EchoReplyInput): Unit = { + super.applyInput(input) + dynamicBuilder.setMember(getFieldName[EchoReplyBuilder]("elements"), input.elements) + } + } + + case class EchoReplyStructure(underlyingStructure: StructureType) extends OfpMessage with EchoReply{ + def elements: Array[Byte] = underlyingStructure.primitivesSequence(getFieldName[EchoReply]("elements")) map { _.toByte } + } + + abstract override def builderClasses = classOf[EchoReplyBuilder] :: super.builderClasses + abstract override def structureClasses = classOf[EchoReplyStructure] :: super.structureClasses +} + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoRequestDescription.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoRequestDescription.scala new file mode 100644 index 0000000..5433f89 --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13EchoRequestDescription.scala @@ -0,0 +1,33 @@ +package org.flowforwarding.warp.controller.api.fixed.v13 + +import org.flowforwarding.warp.controller.api.dynamic.{DynamicStructure, DynamicStructureBuilder} +import org.flowforwarding.warp.controller.api.fixed._ + +trait EchoRequest extends Message{ + def elements: Array[Byte] +} + +case class EchoRequestInput(xid: Int, elements: Array[Byte]) extends MessageInput{ + def length: Int = 8 + elements.length +} + +trait Ofp13EchoRequestDescription[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends BasicStructuresDescription[BuilderType, StructureType] { + apiProvider: DriverApiHelper[BuilderType, StructureType] => + + class EchoRequestBuilder extends OfpMessageBuilder[EchoRequestInput]{ + // Fills the underlying builder with the specified input. + override protected def applyInput(input: EchoRequestInput): Unit = { + super.applyInput(input) + dynamicBuilder.setMember(getFieldName[EchoRequestBuilder]("elements"), input.elements) + } + } + + case class EchoRequestStructure(underlyingStructure: StructureType) extends OfpMessage with EchoRequest{ + def elements: Array[Byte] = underlyingStructure.primitivesSequence(getFieldName[EchoRequest]("elements")) map { _.toByte } + } + + abstract override val builderClasses = classOf[EchoRequestBuilder] :: super.builderClasses + abstract override val structureClasses = classOf[EchoRequestStructure] :: super.structureClasses +} + diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13FeaturesDescription.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13FeaturesDescription.scala new file mode 100644 index 0000000..e711dee --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13FeaturesDescription.scala @@ -0,0 +1,40 @@ +package org.flowforwarding.warp.controller.api.fixed.v13 + +import org.flowforwarding.warp.controller.api.dynamic._ +import org.flowforwarding.warp.controller.api.fixed._ +import org.flowforwarding.warp.controller.api.fixed.Utils._ + +case class SwitchCapabilities(flowStats: Boolean, tableStats: Boolean, portStats: Boolean, groupStats: Boolean, ipReasm: Boolean, queueStats: Boolean, portBlocked: Boolean) + +trait FeaturesReply extends Message{ + def datapathId: Long + def buffersCount: Int + def tablesCount: Byte + def auxiliaryId: Byte + def capabilities: SwitchCapabilities +} + +case class FeaturesRequestInput(xid: Int) extends MessageInput{ + def length: Int = 8 +} + +trait Ofp13FeaturesDescription[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends BasicStructuresDescription[BuilderType, StructureType] { + apiProvider: DriverApiHelper[BuilderType, StructureType] => + + class FeaturesRequestBuilder extends OfpMessageBuilder[FeaturesRequestInput] + + case class FeaturesReplyStructure(underlyingStructure: StructureType) extends OfpMessage with FeaturesReply { + def datapathId: Long = getPrimitive("datapath_id") + def buffersCount: Int = getPrimitive("n_buffers").toInt //LONG? + def tablesCount: Byte = getPrimitive("n_tables").toByte //BYTE? + def auxiliaryId: Byte = getPrimitive("auxiliary_id").toByte //BYTE? + def capabilities: SwitchCapabilities = { + val c = getPrimitive("capabilities") + SwitchCapabilities(c.testBit(0), c.testBit(1), c.testBit(2), c.testBit(3), c.testBit(5), c.testBit(6), c.testBit(8)) + } + } + + abstract override def builderClasses = classOf[FeaturesRequestBuilder] :: super.builderClasses + abstract override def structureClasses = classOf[FeaturesReplyStructure] :: super.structureClasses +} \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13HelloDescription.scala b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13HelloDescription.scala new file mode 100644 index 0000000..374daca --- /dev/null +++ b/src/main/scala/org/flowforwarding/warp/controller/api/fixed/v13/Ofp13HelloDescription.scala @@ -0,0 +1,37 @@ +package org.flowforwarding.warp.controller.api.fixed.v13 + +import org.flowforwarding.warp.controller.api.dynamic._ +import org.flowforwarding.warp.controller.api.fixed._ +import org.flowforwarding.warp.controller.api.fixed.Utils._ + +trait HelloElem { val mType: Short } + +case class HelloElemVersionBitmap(bitmaps: Array[Int]) extends HelloElem { val mType: Short = 1 } + +trait Hello extends Message{ + def elems: Array[HelloElem] +} + +case class HelloInput(xid: Int, elems: Array[HelloElem]) extends MessageInput{ + def length = ??? +} + +trait Ofp13HelloDescription[BuilderType <: DynamicStructureBuilder[BuilderType, StructureType], + StructureType <: DynamicStructure[StructureType]] extends BasicStructuresDescription[BuilderType, StructureType]{ + apiProvider: DriverApiHelper[BuilderType, StructureType] => + + class HelloBuilder extends OfpMessageBuilder[HelloInput]{ + // Fills the underlying builder with the specified input. + protected override def applyInput(input: HelloInput): Unit = ??? + } + + case class HelloStructure(underlyingStructure: StructureType) extends Hello with OfpMessage{ + def elems: Array[HelloElem] = underlyingStructure.structuresSequence(getFieldName[Hello]("elements")) collect { + case s if s.isTypeOf(namesConfig.getTypeName[HelloElemVersionBitmap]) => + HelloElemVersionBitmap(s.primitivesSequence(getFieldName[HelloElemVersionBitmap]("bitmaps")) map { _.toInt }) + } + } + + abstract override def builderClasses = classOf[HelloBuilder] :: super.builderClasses + abstract override def structureClasses = classOf[HelloStructure] :: super.structureClasses +} \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/session/SwitchNurse.scala b/src/main/scala/org/flowforwarding/warp/controller/session/SwitchNurse.scala index f670102..941efdb 100644 --- a/src/main/scala/org/flowforwarding/warp/controller/session/SwitchNurse.scala +++ b/src/main/scala/org/flowforwarding/warp/controller/session/SwitchNurse.scala @@ -7,53 +7,57 @@ package org.flowforwarding.warp.controller.session import akka.actor.{Actor, ActorRef} import akka.io.{Tcp, TcpMessage} import akka.util.{Timeout, ByteString} +import akka.pattern.ask +import java.net.InetSocketAddress object SwitchNurse{ case class SendToSwitch(msg: Array[Byte]) - case class AcceptVersion(handler: ActorRef, versionCode: Long, handshake: Array[Byte]) - case class RejectVersion(handler: ActorRef, versionCode: Long) + case class AcceptConnection(handler: ActorRef, versionCode: Short, handshake: Array[Byte]) + case class RejectConnection(handler: ActorRef, versionCodes: Array[Short], errorData: Array[Byte]) } -/* One handler per version of protocol */ -class SwitchNurse(ofSessionHandlers: Set[ActorRef]) extends Actor { +/* One handler - one protocol factory */ +class SwitchNurse(ofSessionHandlers: Set[ActorRef], remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Actor { import SwitchNurse._ - import OFSessionHandler._ + import LowLevelSessionHandler._ + + val sw = Switch(remoteAddress, localAddress) def receive = startingState orElse handleClosed def startingState: Actor.Receive = { case Tcp.Received(data) => - val sw = new Switch ofSessionHandlers foreach { _ ! InitialMessageData(sw, data.toArray) } println("[OF-INFO] Hello from switch " + sw) - context become (waitingForVersion(sw, sender) orElse handleClosed) + context become (waitingForVersion(sender) orElse handleClosed) } var rejectRepliesCount = 0 - def waitingForVersion(sw: Switch, tcpChannel: ActorRef): Actor.Receive = { - case AcceptVersion(handler, vc, data) => // handshake message to be sent to the switch + def waitingForVersion(tcpChannel: ActorRef): Actor.Receive = { + case AcceptConnection(handler, vc, data) => // handshake message to be sent to the switch // TODO: add handlers priority?? implicit val timeout = Timeout(5000) tcpChannel ! TcpMessage.write(ByteString.fromArray(data)) println("[OF-INFO] Hello to switch " + sw) - println("[OF-INFO] Code of protocol version is 0x" + vc.toHexString) - context become (waitingForDPID(sw, tcpChannel, sender, handler) orElse handleClosed) - case RejectVersion(_, vc) => + println("[OF-INFO] Code of protocol version is 0x" + vc.toLong.toHexString) + context become (waitingForPDID(tcpChannel, sender, handler) orElse handleClosed) + case RejectConnection(_, versions, errorData) => rejectRepliesCount += 1 if (rejectRepliesCount == ofSessionHandlers.size){ - println("[OF-INFO] None of specified handlers can handle OpenFlow protocol version with code 0x" + vc.toHexString) + println(versions.map(_.toLong.toHexString).mkString("[OF-INFO] None of specified handlers can handle OpenFlow protocol versions ", ", 0x", "")) + tcpChannel ! TcpMessage.write(ByteString.fromArray(errorData)) context become handleClosed } } - def waitingForDPID(sw: Switch, tcpChannel: ActorRef, requester: ActorRef, handler: ActorRef): Actor.Receive = { + def waitingForPDID(tcpChannel: ActorRef, requester: ActorRef, handler: ActorRef): Actor.Receive = { case Tcp.Received(data) => println("[OF-INFO] Handshaked with Switch " + sw) requester ! ReceivedMessage(sw, data.toArray) - context become (handshakedState(sw, tcpChannel, handler) orElse handleClosed) + context become (handshakedState(tcpChannel, handler) orElse handleClosed) } - def handshakedState(sw: Switch, tcpChannel: ActorRef, handler: ActorRef): Actor.Receive = { + def handshakedState(tcpChannel: ActorRef, handler: ActorRef): Actor.Receive = { case Tcp.Received(data) => println("[OF-INFO] Connected to Switch " + sw) handler ! ReceivedMessage(sw, data.toArray) diff --git a/src/main/scala/org/flowforwarding/warp/controller/session/driver_interface.scala b/src/main/scala/org/flowforwarding/warp/controller/session/driver_interface.scala index bc58e50..13c634b 100644 --- a/src/main/scala/org/flowforwarding/warp/controller/session/driver_interface.scala +++ b/src/main/scala/org/flowforwarding/warp/controller/session/driver_interface.scala @@ -7,14 +7,26 @@ trait OFMessage // Marker trait trait MessageDriver[T <: OFMessage]{ def getDPID(in: Array[Byte]): Try[Long] def decodeMessage(in: Array[Byte]): Try[T] - def encodeMessage(dict: T): Try[Array[Byte]] + def encodeMessage(msg: T): Try[Array[Byte]] val versionCode: Short -} -trait MessageDriverFactory[T <: OFMessage]{ - def get(versionCode: Short): Option[MessageDriver[T]] + def getHelloMessage(supportedVersions: Array[Short]): Array[Byte] + def rejectVersionError(reason: String): Array[Byte] + def suitesSwitch(hello: Array[Byte]): Boolean = (hello(0).toShort & 0xff).toShort == versionCode // implementation depends on ofp version + def getFeaturesRequest: Array[Byte] +} - def getVersion(msg: Array[Byte]): Option[Short] = Try((msg(0).toShort & 0xff).toShort).toOption - def get(msg: Array[Byte]): Option[MessageDriver[T]] = getVersion(msg).flatMap(get) - //def get(version: String): MessageDriver[T] +trait MessageDriverFactory[T <: OFMessage, +DriverType <: MessageDriver[T]]{ + def get(versionCode: Short): DriverType + def supportedVersions: Array[Short] + // accepted version or rejected versions + error data + def highestCommonVersion(helloMsg: Array[Byte]): Either[Short, (Array[Short], Array[Byte])] = { + supportedVersions.map(get) + .sortBy(_.versionCode) + .reverse + .collectFirst { case d if d.suitesSwitch(helloMsg) => d.versionCode } match { + case Some(version) => Left(version) + case None => Right(Array((helloMsg(0).toShort & 0xff).toShort), get(supportedVersions.head).rejectVersionError("No message driver.")) // TODO: improve implementation + } + } } \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/controller/session/handler.scala b/src/main/scala/org/flowforwarding/warp/controller/session/handler.scala index 640b0b3..ba99a69 100644 --- a/src/main/scala/org/flowforwarding/warp/controller/session/handler.scala +++ b/src/main/scala/org/flowforwarding/warp/controller/session/handler.scala @@ -5,73 +5,84 @@ package org.flowforwarding.warp.controller.session import scala.util.Try +import scala.concurrent.ExecutionContext.Implicits.global import akka.actor.{ActorRefFactory, Props, Actor, ActorRef} -import scala.concurrent.ExecutionContext.Implicits.global import akka.pattern.ask import akka.util.Timeout +import java.net.InetSocketAddress + +case class Switch(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) -class Switch +class SessionHandlerRef(handlerClass: Class[_], launchArgs: Array[AnyRef] = Array()) { -case class SessionHandlerLauncher(handlerClass: Class[_], launchArgs: AnyRef*) { + /* handlerClass must be a subclass of LowLevelSessionHandler */ + assert(classOf[LowLevelSessionHandler[_, _]].isAssignableFrom(handlerClass)) + + def this(handlerClass: Class[_], launchArgs: AnyRef*) = this(handlerClass, launchArgs.toArray) def launch(implicit f: ActorRefFactory) = f.actorOf(Props.create(handlerClass, launchArgs: _*)) } -object OFSessionHandler{ +object LowLevelSessionHandler{ case class InitialMessageData(sw: Switch, data: Array[Byte]) case class ReceivedMessage(sw: Switch, data: Array[Byte]) + case class ConnectionClosed(sw: Switch) } -case class SwitchInfo[T <: OFMessage](dpid: Long, tcpChannel: ActorRef, driver: MessageDriver[T]) +case class SwitchInfo[T <: OFMessage, DriverType <: MessageDriver[T]](dpid: Long, tcpChannel: ActorRef, driver: DriverType) -abstract class OFSessionHandler[T <: OFMessage](driverFactory: MessageDriverFactory[T]) extends Actor { - import OFSessionHandler._ +abstract class LowLevelSessionHandler[T <: OFMessage, DriverType <: MessageDriver[T]](driverFactory: MessageDriverFactory[T, DriverType]) extends Actor { + import LowLevelSessionHandler._ import SwitchNurse._ - private val swInfo = scala.collection.mutable.Map[Switch, SwitchInfo[T]]() + private val swInfo = scala.collection.mutable.Map[Switch, SwitchInfo[T, DriverType]]() implicit val timeout = Timeout(5000) - // TODO: add closed def handshaked(versionCode: Short, dpid: Long) { } - def connected(versionCode: Short) { } + def connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) { } + def disconnected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, versionCode: Short, dpid: Long) { } - protected def getHandshakeMessage(versionCode: Short, msg: T): Seq[T] - protected def onReceivedMessage(versionCode: Short, dpid: Long, msg: T): Seq[T] + protected def onReceivedMessage(driver: DriverType, dpid: Long, msg: T): Seq[T] def sequence[A](s: Seq[Try[A]]): Try[Seq[A]] = Try(s map { _.get }) - def handleIncoming(driver: MessageDriver[T], msg: Array[Byte], f: T => Seq[T]): Try[Array[Byte]] = + def handleIncoming(driver: DriverType, msg: Array[Byte], f: T => Seq[T]): Try[Array[Byte]] = for{decoded <- driver.decodeMessage(msg) messages <- sequence(f(decoded) map driver.encodeMessage) reply = messages reduce { _ ++ _ } } yield reply + def definedHandlersVersions: Array[Short] + def receive = { - case InitialMessageData(sw, data) => - val version = driverFactory.getVersion(data).get - connected(version) + case InitialMessageData(sw @ Switch(remoteAddress, localAddress), helloData) => + connected(remoteAddress, localAddress) val cRef = sender val hRef = self - driverFactory.get(data) flatMap { - d => - val processHandshaked = (getHandshakeMessage _).curried(version) - handleIncoming(d, data, processHandshaked).toOption.map((d, _)) - } match { - case Some((d, hs)) => - sender ? AcceptVersion(hRef, d.versionCode, hs) map { reply => + driverFactory.highestCommonVersion(helloData) match { + case Left(version) if !definedHandlersVersions.contains(version) => + val driver = driverFactory.get(version) + val errorData = driver.rejectVersionError("No handlers defined.") + cRef ! RejectConnection(hRef, Array(version), errorData) + case Left(version) => + val driver = driverFactory.get(version) + val handshake = driver.getHelloMessage(driverFactory.supportedVersions) ++ driver.getFeaturesRequest + sender ? AcceptConnection(hRef, version, handshake) map { reply => val incoming = reply.asInstanceOf[ReceivedMessage].data // this message must contain "features_reply" structure - d.getDPID(incoming) map { dpid => - swInfo(sw) = SwitchInfo(dpid, cRef, d) - handshaked(d.versionCode, dpid) + driver.getDPID(incoming) map { dpid => + swInfo(sw) = SwitchInfo(dpid, cRef, driver) + handshaked(version, dpid) hRef ! reply // forward for further processing } } - case None => sender ! RejectVersion(hRef, version) + case Right((versions, error)) => cRef ! RejectConnection(hRef, versions, error) } case ReceivedMessage(sw, data) => val info = swInfo(sw) - val processReceived = (onReceivedMessage _).curried(info.driver.versionCode)(info.dpid) + val processReceived = (onReceivedMessage _).curried(info.driver)(info.dpid) handleIncoming(info.driver, data, processReceived) foreach { info.tcpChannel ! SendToSwitch(_) } + case ConnectionClosed(sw @ Switch(remoteAddress, localAddress)) => + disconnected(remoteAddress, localAddress, swInfo(sw).driver.versionCode, swInfo(sw).dpid) } } \ No newline at end of file diff --git a/src/main/scala/org/flowforwarding/warp/demo/demo.scala b/src/main/scala/org/flowforwarding/warp/demo/demo.scala deleted file mode 100644 index 51efc70..0000000 --- a/src/main/scala/org/flowforwarding/warp/demo/demo.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * В© 2013 FlowForwarding.Org - * All Rights Reserved. Use is subject to license terms. - */ -package org.flowforwarding.warp.demo - -import org.flowforwarding.warp.controller.session.SessionHandlerLauncher - -import org.flowforwarding.warp.protocol.adapter.OFJDriverSessionHandler -import org.flowforwarding.warp.protocol.ofmessages.{OFMessageProviderFactoryAvroProtocol, OFMessageRef, IOFMessageProvider, IOFMessageProviderFactory} -import org.flowforwarding.warp.protocol.ofmessages.OFMessagePacketIn.OFMessagePacketInRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfig.OFMessageSwitchConfigRef -import org.flowforwarding.warp.protocol.ofmessages.OFMessageError.OFMessageErrorRef -import org.flowforwarding.warp.controller.Controller - -class JDriverSimpleHandler(pFactory: IOFMessageProviderFactory) extends OFJDriverSessionHandler(pFactory){ - - def packetIn(provider: IOFMessageProvider, dpid: Long, pIn: OFMessagePacketInRef): Seq[OFMessageRef[_]] = { - val flowMod = provider.buildFlowModMsg - if (pIn.existMatchInPort()) { - flowMod.addMatchInPort(pIn.getMatchInPort.getMatch) - } else if (pIn.existMatchEthDst()) { - flowMod.addMatchEthDst(pIn.getMatchEthDst.getMatch) - } else if (pIn.existMatchEthSrc()) { - flowMod.addMatchEthSrc(pIn.getMatchEthSrc.getMatch) - } - - val instruction = provider.buildInstructionApplyActions - instruction.addActionOutput("2") - flowMod.addInstruction("apply_actions", instruction) - Seq(flowMod) - } - - def switchConfig(provider: IOFMessageProvider, dpid: Long, config: OFMessageSwitchConfigRef): Seq[OFMessageRef[_]] = { - println(s"[OF-INFO] DPID: $dpid Configuration: ") - if (config.isFragDrop) println("Drop fragments") - if (config.isFragMask) println("Mask") - if (config.isFragNormal) println("Normal") - if (config.isFragReasm) println("Reassemble") - Seq() - } - - def error(provider: IOFMessageProvider, dpid: Long, error: OFMessageErrorRef): Seq[OFMessageRef[_]] = { - println(s"[OF-INFO] DPID: $dpid Error(code = ${error.getCode}, type = ${error.getType})") - Seq() - } -} - -object JavaLauncher extends App { - Controller.launch(Set(SessionHandlerLauncher(classOf[JDriverSimpleHandler], new OFMessageProviderFactoryAvroProtocol))) -} -