4242import org .onlab .packet .ndp .NeighborSolicitation ;
4343import org .onlab .packet .ndp .RouterAdvertisement ;
4444import org .onlab .packet .ndp .RouterSolicitation ;
45+ import org .onlab .util .PredictableExecutor ;
4546import org .onlab .util .Tools ;
4647import org .onosproject .cfg .ComponentConfigService ;
4748import org .onosproject .core .ApplicationId ;
@@ -185,7 +186,9 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
185186
186187 ExecutorService deviceEventHandler ;
187188 private ExecutorService probeEventHandler ;
188- private ExecutorService packetHandler ;
189+ // Packet workers - 0 will leverage available processors
190+ private static final int DEFAULT_THREADS = 0 ;
191+ private PredictableExecutor packetWorkers ;
189192
190193 @ Reference (cardinality = ReferenceCardinality .MANDATORY )
191194 protected NetworkConfigService netcfgService ;
@@ -215,8 +218,8 @@ public void activate(ComponentContext context) {
215218 "device-event-handler" , log ));
216219 probeEventHandler = newSingleThreadScheduledExecutor (groupedThreads ("onos/host-loc-provider" ,
217220 "probe-event-handler" , log ));
218- packetHandler = newSingleThreadScheduledExecutor ( groupedThreads ("onos/host-loc-provider" ,
219- "packet-handler " , log ));
221+ packetWorkers = new PredictableExecutor ( DEFAULT_THREADS , groupedThreads ("onos/host-loc-provider" ,
222+ "packet-worker-%d " , log ));
220223 providerService = providerRegistry .register (this );
221224 packetService .addProcessor (processor , PacketProcessor .advisor (1 ));
222225 deviceService .addListener (deviceListener );
@@ -238,7 +241,7 @@ public void deactivate() {
238241 deviceService .removeListener (deviceListener );
239242 deviceEventHandler .shutdown ();
240243 probeEventHandler .shutdown ();
241- packetHandler .shutdown ();
244+ packetWorkers .shutdown ();
242245 providerService = null ;
243246 registry .unregisterConfigFactory (hostLearningConfig );
244247 netcfgService .removeListener (cfgListener );
@@ -525,18 +528,22 @@ private void updateHostIp(HostId hid, IpAddress ip) {
525528
526529 @ Override
527530 public void process (PacketContext context ) {
528- packetHandler .execute (() -> processPacketInternal (context ));
529- }
530-
531- private void processPacketInternal (PacketContext context ) {
531+ // Verify valid context
532532 if (context == null ) {
533533 return ;
534534 }
535-
535+ // Verify valid Ethernet packet
536536 Ethernet eth = context .inPacket ().parsed ();
537537 if (eth == null ) {
538538 return ;
539539 }
540+ // Dispatch to a worker thread
541+ HostId hostId = HostId .hostId (eth .getSourceMAC (), VlanId .vlanId (eth .getVlanID ()));
542+ packetWorkers .execute (() -> processPacketInternal (context ), hostId .hashCode ());
543+ }
544+
545+ private void processPacketInternal (PacketContext context ) {
546+ Ethernet eth = context .inPacket ().parsed ();
540547
541548 MacAddress srcMac = eth .getSourceMAC ();
542549 if (srcMac .isBroadcast () || srcMac .isMulticast ()) {
0 commit comments