Skip to content

Commit b84e919

Browse files
committed
Improves host programming introducing a pool of worker threads
Change-Id: I979693aa220e2666c13c4015435c66173624ea64
1 parent b45ffda commit b84e919

File tree

4 files changed

+111
-45
lines changed

4 files changed

+111
-45
lines changed

apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java

Lines changed: 86 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.onlab.packet.IpPrefix;
2222
import org.onlab.packet.MacAddress;
2323
import org.onlab.packet.VlanId;
24+
import org.onlab.util.PredictableExecutor;
2425
import org.onosproject.net.ConnectPoint;
2526
import org.onosproject.net.DeviceId;
2627
import org.onosproject.net.Host;
@@ -40,6 +41,7 @@
4041
import java.util.stream.Collectors;
4142

4243
import static com.google.common.base.Preconditions.checkArgument;
44+
import static org.onlab.util.Tools.groupedThreads;
4345

4446
/**
4547
* Handles host-related events.
@@ -49,6 +51,9 @@ public class HostHandler {
4951

5052
protected final SegmentRoutingManager srManager;
5153
private HostService hostService;
54+
// Host workers - 0 will leverage available processors
55+
private static final int DEFAULT_THREADS = 0;
56+
protected PredictableExecutor hostWorkers;
5257

5358
/**
5459
* Constructs the HostHandler.
@@ -58,19 +63,36 @@ public class HostHandler {
5863
HostHandler(SegmentRoutingManager srManager) {
5964
this.srManager = srManager;
6065
hostService = srManager.hostService;
66+
this.hostWorkers = new PredictableExecutor(DEFAULT_THREADS,
67+
groupedThreads("onos/sr", "h-worker-%d", log));
68+
}
69+
70+
/**
71+
* Shutdowns the workers.
72+
*/
73+
void terminate() {
74+
hostWorkers.shutdown();
6175
}
6276

6377
protected void init(DeviceId devId) {
64-
hostService.getHosts().forEach(host ->
65-
host.locations().stream()
66-
.filter(location -> location.deviceId().equals(devId) ||
67-
location.deviceId().equals(srManager.getPairDeviceId(devId).orElse(null)))
68-
.forEach(location -> processHostAddedAtLocation(host, location))
78+
// Init hosts in parallel using hostWorkers executor
79+
hostService.getHosts().forEach(
80+
host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
6981
);
7082
}
7183

84+
private void initHost(Host host, DeviceId deviceId) {
85+
host.locations().forEach(location -> {
86+
if (location.deviceId().equals(deviceId) ||
87+
location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
88+
processHostAddedAtLocation(host, location);
89+
}
90+
});
91+
}
92+
7293
void processHostAddedEvent(HostEvent event) {
73-
processHostAdded(event.subject());
94+
Host host = event.subject();
95+
hostWorkers.execute(() -> processHostAdded(host), host.id().hashCode());
7496
}
7597

7698
private void processHostAdded(Host host) {
@@ -141,7 +163,8 @@ void processHostAddedAtLocation(Host host, HostLocation location) {
141163
}
142164

143165
void processHostRemovedEvent(HostEvent event) {
144-
processHostRemoved(event.subject());
166+
Host host = event.subject();
167+
hostWorkers.execute(() -> processHostRemoved(host), host.id().hashCode());
145168
}
146169

147170
private void processHostRemoved(Host host) {
@@ -195,6 +218,11 @@ private void processHostRemoved(Host host) {
195218
}
196219

197220
void processHostMovedEvent(HostEvent event) {
221+
Host host = event.subject();
222+
hostWorkers.execute(() -> processHostMovedEventInternal(event), host.id().hashCode());
223+
}
224+
225+
private void processHostMovedEventInternal(HostEvent event) {
198226
Host host = event.subject();
199227
MacAddress hostMac = host.mac();
200228
VlanId hostVlanId = host.vlan();
@@ -348,6 +376,11 @@ void processHostMovedEvent(HostEvent event) {
348376
}
349377

350378
void processHostUpdatedEvent(HostEvent event) {
379+
Host host = event.subject();
380+
hostWorkers.execute(() -> processHostUpdatedEventInternal(event), host.id().hashCode());
381+
}
382+
383+
private void processHostUpdatedEventInternal(HostEvent event) {
351384
Host host = event.subject();
352385
MacAddress hostMac = host.mac();
353386
VlanId hostVlanId = host.vlan();
@@ -424,10 +457,15 @@ void processPortUp(ConnectPoint cp) {
424457
}
425458
if (srManager.activeProbing) {
426459
srManager.getPairDeviceId(cp.deviceId())
427-
.ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).stream()
428-
.filter(host -> isHostInVlanOfPort(host, pairDeviceId, cp))
429-
.forEach(host -> srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER))
430-
);
460+
.ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).forEach(
461+
host -> hostWorkers.execute(() -> probingIfNecessary(host, pairDeviceId, cp),
462+
host.id().hashCode())));
463+
}
464+
}
465+
466+
private void probingIfNecessary(Host host, DeviceId pairDeviceId, ConnectPoint cp) {
467+
if (isHostInVlanOfPort(host, pairDeviceId, cp)) {
468+
srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER);
431469
}
432470
}
433471

@@ -637,7 +675,7 @@ VlanId vlanForPairPort(VlanId hostVlanId, ConnectPoint location) {
637675
* @param install true to populate the objective, false to revoke
638676
*/
639677
void processIntfVlanUpdatedEvent(DeviceId deviceId, PortNumber portNum, VlanId vlanId,
640-
boolean popVlan, boolean install) {
678+
boolean popVlan, boolean install) {
641679
ConnectPoint connectPoint = new ConnectPoint(deviceId, portNum);
642680
Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
643681

@@ -646,22 +684,25 @@ void processIntfVlanUpdatedEvent(DeviceId deviceId, PortNumber portNum, VlanId v
646684
return;
647685
}
648686

649-
hosts.forEach(host -> {
650-
MacAddress mac = host.mac();
651-
VlanId hostVlanId = host.vlan();
652-
653-
// Check whether the host vlan is valid for new interface configuration
654-
if ((!popVlan && hostVlanId.equals(vlanId)) ||
655-
(popVlan && hostVlanId.equals(VlanId.NONE))) {
656-
srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
657-
// Update Forwarding objective and corresponding simple Next objective
658-
// for each host and IP address connected to given port
659-
host.ipAddresses().forEach(ipAddress ->
660-
srManager.defaultRoutingHandler.updateFwdObj(deviceId, portNum, ipAddress.toIpPrefix(),
661-
mac, vlanId, popVlan, install)
662-
);
663-
}
664-
});
687+
hosts.forEach(host -> hostWorkers.execute(() -> processIntfVlanUpdatedEventInternal(
688+
host, deviceId, portNum, vlanId, popVlan, install), host.id().hashCode()));
689+
}
690+
691+
private void processIntfVlanUpdatedEventInternal(Host host, DeviceId deviceId, PortNumber portNum,
692+
VlanId vlanId, boolean popVlan, boolean install) {
693+
MacAddress mac = host.mac();
694+
VlanId hostVlanId = host.vlan();
695+
696+
// Check whether the host vlan is valid for new interface configuration
697+
if ((!popVlan && hostVlanId.equals(vlanId)) ||
698+
(popVlan && hostVlanId.equals(VlanId.NONE))) {
699+
srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
700+
// Update Forwarding objective and corresponding simple Next objective
701+
// for each host and IP address connected to given port
702+
host.ipAddresses().forEach(ipAddress -> srManager.defaultRoutingHandler.updateFwdObj(
703+
deviceId, portNum, ipAddress.toIpPrefix(), mac, vlanId, popVlan, install)
704+
);
705+
}
665706
}
666707

667708
/**
@@ -680,18 +721,23 @@ void processIntfIpUpdatedEvent(ConnectPoint cp, Set<IpPrefix> ipPrefixSet, boole
680721
}
681722

682723
// Check whether the host IP address is in the interface's subnet
683-
hosts.forEach(host ->
684-
host.ipAddresses().forEach(hostIpAddress -> {
685-
ipPrefixSet.forEach(ipPrefix -> {
686-
if (install && ipPrefix.contains(hostIpAddress)) {
687-
srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
688-
host.mac(), host.vlan(), cp.port(), true);
689-
} else if (!install && ipPrefix.contains(hostIpAddress)) {
690-
srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
691-
host.mac(), host.vlan(), cp.port(), true);
692-
}
693-
});
694-
}));
724+
hosts.forEach(host -> hostWorkers.execute(() -> processIntfIpUpdatedEventInternal(
725+
host, cp, ipPrefixSet, install)));
726+
}
727+
728+
private void processIntfIpUpdatedEventInternal(Host host, ConnectPoint cp, Set<IpPrefix> ipPrefixSet,
729+
boolean install) {
730+
host.ipAddresses().forEach(hostIpAddress -> {
731+
ipPrefixSet.forEach(ipPrefix -> {
732+
if (install && ipPrefix.contains(hostIpAddress)) {
733+
srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
734+
host.mac(), host.vlan(), cp.port(), true);
735+
} else if (!install && ipPrefix.contains(hostIpAddress)) {
736+
srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
737+
host.mac(), host.vlan(), cp.port(), true);
738+
}
739+
});
740+
});
695741
}
696742

697743
/**

apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ protected void deactivate() {
642642
policyStore.destroy();
643643

644644
mcastHandler.terminate();
645+
hostHandler.terminate();
645646
log.info("Stopped");
646647
}
647648

apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.onlab.packet.IpPrefix;
2929
import org.onlab.packet.MacAddress;
3030
import org.onlab.packet.VlanId;
31+
import org.onlab.util.PredictableExecutor;
3132
import org.onosproject.net.config.ConfigApplyDelegate;
3233
import org.onosproject.net.host.HostProbingService;
3334
import org.onosproject.net.host.ProbeMode;
@@ -65,8 +66,9 @@
6566
import static org.easymock.EasyMock.reset;
6667
import static org.easymock.EasyMock.verify;
6768
import static org.junit.Assert.*;
69+
import static org.onlab.util.Tools.groupedThreads;
6870

69-
/**r
71+
/**
7072
* Unit test for {@link HostHandler}.
7173
*/
7274
public class HostHandlerTest {
@@ -250,6 +252,8 @@ public void setUp() {
250252
replay(srManager.routeService);
251253

252254
hostHandler = new HostHandler(srManager);
255+
hostHandler.hostWorkers = new PredictableExecutor(
256+
0, groupedThreads("onos/sr", "h-worker-%d"), true);
253257

254258
ROUTING_TABLE.clear();
255259
BRIDGING_TABLE.clear();

utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.onlab.util;
1717

18+
import com.google.common.util.concurrent.MoreExecutors;
19+
1820
import static com.google.common.base.Preconditions.checkArgument;
1921
import static com.google.common.base.Preconditions.checkNotNull;
2022

@@ -67,6 +69,18 @@ public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFact
6769
* @param threadFactory {@link ThreadFactory} to use to create threads
6870
*/
6971
public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
72+
this(buckets, threadFactory, false);
73+
}
74+
75+
/**
76+
* Creates {@link PredictableExecutor} instance.
77+
* Meant for testing purposes.
78+
*
79+
* @param buckets number of buckets or 0 to match available processors
80+
* @param threadFactory {@link ThreadFactory} to use to create threads
81+
* @param directExec direct executors
82+
*/
83+
public PredictableExecutor(int buckets, ThreadFactory threadFactory, boolean directExec) {
7084
checkArgument(buckets >= 0, "number of buckets must be non zero");
7185
checkNotNull(threadFactory);
7286
if (buckets == 0) {
@@ -75,7 +89,7 @@ public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
7589
this.backends = new ArrayList<>(buckets);
7690

7791
for (int i = 0; i < buckets; ++i) {
78-
this.backends.add(backendExecutorService(threadFactory));
92+
this.backends.add(backendExecutorService(threadFactory, directExec));
7993
}
8094
}
8195

@@ -93,10 +107,11 @@ public PredictableExecutor(ThreadFactory threadFactory) {
93107
* Creates a single thread {@link ExecutorService} to use in the backend.
94108
*
95109
* @param threadFactory {@link ThreadFactory} to use to create threads
96-
* @return single thread {@link ExecutorService}
110+
* @param direct direct executors
111+
* @return single thread {@link ExecutorService} or direct executor
97112
*/
98-
protected ExecutorService backendExecutorService(ThreadFactory threadFactory) {
99-
return Executors.newSingleThreadExecutor(threadFactory);
113+
protected ExecutorService backendExecutorService(ThreadFactory threadFactory, boolean direct) {
114+
return direct ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(threadFactory);
100115
}
101116

102117

0 commit comments

Comments
 (0)