|
25 | 25 | import java.nio.channels.SeekableByteChannel; |
26 | 26 | import java.util.Collection; |
27 | 27 | import java.util.HashSet; |
| 28 | +import java.util.concurrent.atomic.AtomicInteger; |
28 | 29 |
|
29 | | -//TODO: concurrency |
30 | 30 | class JobState implements AutoCloseable { |
31 | | - private int objectsRemaining; |
| 31 | + private final AtomicInteger objectsRemaining; |
32 | 32 | private final AutoCloseableCache<String, WindowedChannelFactory> channelCache; |
33 | 33 | private final JobPartTracker partTracker; |
34 | 34 |
|
35 | 35 | public JobState(final ObjectChannelBuilder channelBuilder, final Collection<Objects> filteredChunks) { |
36 | | - this.objectsRemaining = getObjectCount(filteredChunks); |
| 36 | + this.objectsRemaining = new AtomicInteger(getObjectCount(filteredChunks)); |
37 | 37 | this.channelCache = buildCache(channelBuilder); |
38 | 38 | this.partTracker = JobPartTrackerFactory |
39 | 39 | .buildPartTracker(Iterables.concat(filteredChunks)) |
40 | 40 | .attachObjectCompletedListener(new ObjectCompletedListenerImplementation()); |
41 | 41 | } |
42 | 42 |
|
43 | 43 | public boolean hasObjects() { |
44 | | - return this.objectsRemaining > 0; |
| 44 | + return this.objectsRemaining.get() > 0; |
45 | 45 | } |
46 | 46 |
|
47 | 47 | private static int getObjectCount(final Collection<Objects> chunks) { |
@@ -82,7 +82,7 @@ public JobPartTracker getPartTracker() { |
82 | 82 | private final class ObjectCompletedListenerImplementation implements ObjectCompletedListener { |
83 | 83 | @Override |
84 | 84 | public void objectCompleted(final String name) { |
85 | | - JobState.this.objectsRemaining--; |
| 85 | + JobState.this.objectsRemaining.decrementAndGet(); |
86 | 86 | try { |
87 | 87 | JobState.this.channelCache.close(name); |
88 | 88 | } catch (final Exception e) { |
|
0 commit comments