Skip to content

Commit e642cdc

Browse files
committed
Pipeline run in own thread
Refactors the pipeline to run in its own thread. This also means that steps will always run sequentially. Sources will have their sockets updated by the pipeline instead of doing so manually. Closes #190
1 parent 46d005f commit e642cdc

File tree

57 files changed

+2121
-515
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2121
-515
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ allprojects {
5757
testCompile group: 'net.jodah', name: 'concurrentunit', version: '0.4.2'
5858
testCompile group: 'org.hamcrest', name: 'hamcrest-all', version: '1.3'
5959
testCompile group: 'junit', name: 'junit', version: '4.12'
60+
testCompile group: 'com.google.truth', name: 'truth', version: '0.28'
61+
testCompile group: 'com.google.guava', name: 'guava-testlib', version: '19.0'
6062
}
6163

6264
version = getVersionName()
@@ -122,6 +124,7 @@ project(":core") {
122124
}
123125

124126
dependencies {
127+
compile group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.0'
125128
compile group: 'org.bytedeco', name: 'javacv', version: '1.1'
126129
compile group: 'org.bytedeco.javacpp-presets', name: 'opencv', version: '3.0.0-1.1'
127130
compile group: 'org.bytedeco.javacpp-presets', name: 'opencv', version: '3.0.0-1.1', classifier: os

core/src/main/java/edu/wpi/grip/core/Main.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class Main {
2626
@Inject
2727
private Project project;
2828
@Inject
29+
private PipelineRunner pipelineRunner;
30+
@Inject
2931
private EventBus eventBus;
3032
@Inject
3133
private Logger logger;
@@ -58,6 +60,7 @@ public void start(String[] args) throws IOException, InterruptedException {
5860
// Open a project from a .grip file specified on the command line
5961
project.open(new File(projectPath));
6062

63+
pipelineRunner.startAsync();
6164

6265
// This is done in order to indicate to the user using the deployment UI that this is running
6366
logger.log(Level.INFO, "SUCCESS! The project is running in headless mode!");

core/src/main/java/edu/wpi/grip/core/Operation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,18 @@ default void perform(InputSocket<?>[] inputs, OutputSocket<?>[] outputs, Optiona
6464
default void perform(InputSocket<?>[] inputs, OutputSocket<?>[] outputs) {
6565
throw new UnsupportedOperationException("Perform was not overridden");
6666
}
67+
68+
/**
69+
* Allows the step to clean itself up when removed from the pipeline.
70+
* This should only be called by {@link Step#setRemoved()} to ensure correct synchronization.
71+
*
72+
* @param inputs An array obtained from {@link #createInputSockets(EventBus)}. The caller can set the value of
73+
* each socket to an actual parameter for the operation.
74+
* @param outputs An array obtained from {@link #createOutputSockets(EventBus)}. The outputs of the operation will
75+
* be stored in these sockets.
76+
* @param data Optional data to be passed to the operation
77+
*/
78+
default void cleanUp(InputSocket<?>[] inputs, OutputSocket<?>[] outputs, Optional<?> data) {
79+
/* no-op */
80+
}
6781
}

core/src/main/java/edu/wpi/grip/core/Pipeline.java

Lines changed: 133 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package edu.wpi.grip.core;
22

3+
import com.google.common.collect.ImmutableList;
34
import com.google.common.eventbus.EventBus;
45
import com.google.common.eventbus.Subscribe;
56
import com.google.inject.Singleton;
@@ -11,6 +12,11 @@
1112

1213
import javax.inject.Inject;
1314
import java.util.*;
15+
import java.util.concurrent.locks.Lock;
16+
import java.util.concurrent.locks.ReadWriteLock;
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Consumer;
19+
import java.util.function.Function;
1420
import java.util.stream.Collectors;
1521

1622
import static com.google.common.base.Preconditions.checkArgument;
@@ -35,7 +41,14 @@ public class Pipeline {
3541
@XStreamOmitField
3642
private NTManager ntManager;
3743

44+
/*
45+
* We have separate locks for sources and steps because we don't want to
46+
* block access to both resources when only one is in use.
47+
*/
48+
49+
private transient final ReadWriteLock sourceLock = new ReentrantReadWriteLock();
3850
private final List<Source> sources = new ArrayList<>();
51+
private transient ReadWriteLock stepLock = new ReentrantReadWriteLock();
3952
private final List<Step> steps = new ArrayList<>();
4053
private final Set<Connection> connections = new HashSet<>();
4154
private ProjectSettings settings = new ProjectSettings();
@@ -44,27 +57,108 @@ public class Pipeline {
4457
* Remove everything in the pipeline
4558
*/
4659
public void clear() {
47-
this.steps.stream().collect(Collectors.toList()).forEach(this::removeStep);
48-
60+
getSteps().forEach(this::removeStep);
61+
// We collect the list first because the event modifies the list
4962
this.sources.stream()
5063
.map(SourceRemovedEvent::new)
5164
.collect(Collectors.toList())
5265
.forEach(this.eventBus::post);
5366
}
5467

68+
private final <R> R readSourcesSafely(Function<List<Source>, R> sourceListFunction) {
69+
return accessSafely(sourceLock.readLock(), Collections.unmodifiableList(sources), sourceListFunction);
70+
}
71+
72+
/**
73+
* Returns a snapshot of all of the sources in the pipeline.
74+
*
75+
* @return an Immutable copy of the sources at the current point in the pipeline.
76+
* @see <a href="https://youtu.be/ZeO_J2OcHYM?t=16m35s">Why we use ImmutableList return type</a>
77+
*/
78+
public final ImmutableList<Source> getSources() {
79+
return readSourcesSafely(ImmutableList::copyOf);
80+
}
81+
5582
/**
56-
* @return The unmodifiable list of sources for inputs to the algorithm
57-
* @see Source
83+
* @param stepListFunction The function to read the steps with.
84+
* @param <R> The return type of the function
85+
* @return The value returned by the function.
5886
*/
59-
public List<Source> getSources() {
60-
return Collections.unmodifiableList(this.sources);
87+
private final <R> R readStepsSafely(Function<List<Step>, R> stepListFunction) {
88+
return accessSafely(stepLock.readLock(), Collections.unmodifiableList(steps), stepListFunction);
6189
}
6290

6391
/**
64-
* @return The unmodifiable list of steps in the computer vision algorithm
92+
* Returns a snapshot of all of the steps in the pipeline.
93+
*
94+
* @return an Immutable copy of the steps at the current point in the pipeline.
95+
* @see <a href="https://youtu.be/ZeO_J2OcHYM?t=16m35s">Why we use ImmutableList return type</a>
6596
*/
66-
public List<Step> getSteps() {
67-
return Collections.unmodifiableList(this.steps);
97+
public final ImmutableList<Step> getSteps() {
98+
return readStepsSafely(ImmutableList::copyOf);
99+
}
100+
101+
/*
102+
* These methods should not be made public.
103+
* If you do so you are making a poor design decision and should move whatever you are trying to do into
104+
* this class.
105+
*/
106+
107+
/**
108+
* @param stepListWriterFunction A function that modifies the step list passed to the operation.
109+
* @param <R> The return type of the function
110+
* @return The value returned by the function.
111+
*/
112+
private <R> R writeStepsSafely(Function<List<Step>, R> stepListWriterFunction) {
113+
return accessSafely(stepLock.writeLock(), steps, stepListWriterFunction);
114+
}
115+
116+
/**
117+
* @param stepListWriterConsumer A consumer that can modify the list that is passed to it.
118+
*/
119+
private void writeStepsSafelyConsume(Consumer<List<Step>> stepListWriterConsumer) {
120+
writeStepsSafely(stepList -> {
121+
stepListWriterConsumer.accept(stepList);
122+
return null;
123+
});
124+
}
125+
126+
private <R> R writeSourcesSafely(Function<List<Source>, R> sourceListWriterFunction) {
127+
return accessSafely(sourceLock.writeLock(), sources, sourceListWriterFunction);
128+
}
129+
130+
private void writeSourcesSafelyConsume(Consumer<List<Source>> sourceListWriterFunction) {
131+
writeSourcesSafely(sources -> {
132+
sourceListWriterFunction.accept(sources);
133+
return null;
134+
});
135+
}
136+
137+
/*
138+
* End of methods that should not be made public
139+
*/
140+
141+
/**
142+
* Locks the resource with the specified lock and performs the function.
143+
* When the function is complete then the lock unlocked again.
144+
*
145+
* @param lock The lock for the given resource
146+
* @param list The list that will be accessed while the resource is locked
147+
* @param listFunction The function that either modifies or accesses the list
148+
* @param <T> The type of list
149+
* @param <R> The return value for the function
150+
* @return The value returned by the list function
151+
*/
152+
private static <T, R> R accessSafely(Lock lock, List<T> list, Function<List<T>, R> listFunction) {
153+
final R returnValue;
154+
lock.lock();
155+
try {
156+
returnValue = listFunction.apply(list);
157+
} finally {
158+
// Ensure that no matter what may get thrown while reading the steps we unlock
159+
lock.unlock();
160+
}
161+
return returnValue;
68162
}
69163

70164
/**
@@ -132,43 +226,53 @@ public boolean canConnect(Socket socket1, Socket socket2) {
132226
/**
133227
* @return true if the step1 is before step2 in the pipeline
134228
*/
135-
private synchronized boolean isBefore(Step step1, Step step2) {
136-
return this.steps.indexOf(step1) < this.steps.indexOf(step2);
229+
private boolean isBefore(Step step1, Step step2) {
230+
return readStepsSafely(steps -> steps.indexOf(step1) < steps.indexOf(step2));
137231
}
138232

139233
@Subscribe
140234
public void onSourceAdded(SourceAddedEvent event) {
141-
this.sources.add(event.getSource());
235+
writeSourcesSafelyConsume(sources -> {
236+
sources.add(event.getSource());
237+
});
142238
}
143239

144240
@Subscribe
145241
public void onSourceRemoved(SourceRemovedEvent event) {
146-
this.sources.remove(event.getSource());
242+
writeSourcesSafelyConsume(sources -> {
243+
sources.remove(event.getSource());
244+
});
147245

148246
// Sockets of deleted sources should not be previewed
149247
for (OutputSocket<?> socket : event.getSource().getOutputSockets()) {
150248
socket.setPreviewed(false);
151249
}
152250
}
153251

154-
public synchronized void addStep(int index, Step step) {
252+
public void addStep(int index, Step step) {
155253
checkNotNull(step, "The step can not be null");
156-
this.steps.add(index, step);
254+
checkArgument(!step.removed(), "The step must not have been disabled already");
255+
256+
writeStepsSafelyConsume(steps -> steps.add(index, step));
257+
157258
this.eventBus.register(step);
158259
this.eventBus.post(new StepAddedEvent(step, index));
159260
}
160261

161-
public synchronized void addStep(Step step) {
262+
public void addStep(Step step) {
162263
addStep(this.steps.size(), step);
163264
}
164265

165-
public synchronized void removeStep(Step step) {
266+
public void removeStep(Step step) {
166267
checkNotNull(step, "The step can not be null");
167-
this.steps.remove(step);
268+
269+
writeStepsSafelyConsume(steps -> steps.remove(step));
270+
168271
// Sockets of deleted steps should not be previewed
169272
for (OutputSocket<?> socket : step.getOutputSockets()) {
170273
socket.setPreviewed(false);
171274
}
275+
step.setRemoved();
172276
this.eventBus.unregister(step);
173277
this.eventBus.post(new StepRemovedEvent(step));
174278
}
@@ -177,13 +281,19 @@ public synchronized void moveStep(Step step, int delta) {
177281
checkNotNull(step, "The step can not be null");
178282
checkArgument(this.steps.contains(step), "The step must exist in the pipeline to be moved");
179283

180-
final int oldIndex = this.steps.indexOf(step);
181-
this.steps.remove(oldIndex);
284+
// We are modifying the steps array
285+
writeStepsSafelyConsume(steps -> {
286+
final int oldIndex = this.steps.indexOf(step);
287+
this.steps.remove(oldIndex);
182288

183-
// Compute the new index of the step, clamping to the beginning or end of pipeline if it goes past either end
184-
final int newIndex = Math.min(Math.max(oldIndex + delta, 0), this.steps.size());
185-
this.steps.add(newIndex, step);
289+
// Compute the new index of the step, clamping to the beginning or end of pipeline if it goes past either end
290+
final int newIndex = Math.min(Math.max(oldIndex + delta, 0), this.steps.size());
291+
this.steps.add(newIndex, step);
292+
});
293+
294+
// Do not lock while posting the event
186295
eventBus.post(new StepMovedEvent(step, delta));
296+
187297
}
188298

189299
@Subscribe

0 commit comments

Comments
 (0)