Skip to content

Commit 866c7ca

Browse files
authored
Merge pull request #50 from nlnwa/concurrency
New API
2 parents cc02e5b + 0c59b27 commit 866c7ca

24 files changed

+725
-705
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<maven.compiler.target>11</maven.compiler.target>
2727
<docker.tag>${project.version}</docker.tag>
2828

29-
<veidemann.api.version>1.0.0-beta23</veidemann.api.version>
29+
<veidemann.api.version>1.0.0-beta24</veidemann.api.version>
3030
<veidemann.commons.version>v0.6.0</veidemann.commons.version>
3131
<veidemann.rethinkdbadapter.version>0.8.0</veidemann.rethinkdbadapter.version>
3232

@@ -91,7 +91,7 @@
9191
<dependency>
9292
<groupId>redis.clients</groupId>
9393
<artifactId>jedis</artifactId>
94-
<version>3.6.0</version>
94+
<version>3.7.0</version>
9595
</dependency>
9696

9797
<dependency>
@@ -115,7 +115,7 @@
115115
<dependency>
116116
<groupId>com.google.guava</groupId>
117117
<artifactId>guava</artifactId>
118-
<version>30.1.1-jre</version>
118+
<version>31.0.1-jre</version>
119119
</dependency>
120120

121121
<!-- Configuration framework -->
Lines changed: 13 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
package no.nb.nna.veidemann.frontier.api;
22

3-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4-
import io.grpc.stub.ServerCallStreamObserver;
53
import no.nb.nna.veidemann.frontier.db.CrawlQueueManager;
64
import no.nb.nna.veidemann.frontier.worker.Frontier;
75
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
97

10-
import java.util.concurrent.Executors;
11-
import java.util.concurrent.ScheduledExecutorService;
12-
import java.util.concurrent.ScheduledFuture;
138
import java.util.concurrent.TimeUnit;
149
import java.util.concurrent.atomic.AtomicBoolean;
1510
import java.util.concurrent.atomic.AtomicInteger;
@@ -21,30 +16,17 @@ public class Context {
2116
private static final Logger LOG = LoggerFactory.getLogger(Context.class);
2217
private final AtomicBoolean isShutdown;
2318
private final AtomicInteger amountOfActiveObserversCounter;
24-
private final AtomicInteger amountOfActivePageFetchesCounter;
2519
private final Frontier frontier;
2620

2721
static final Lock lock = new ReentrantLock();
2822
static final Condition notTerminated = lock.newCondition();
2923

30-
static final ScheduledExecutorService timoutThread = Executors.newScheduledThreadPool(
31-
1, new ThreadFactoryBuilder().setNameFormat("fetch-timeout-%d").build());
32-
3324
public Context(Frontier frontier) {
3425
isShutdown = new AtomicBoolean(false);
3526
amountOfActiveObserversCounter = new AtomicInteger(0);
36-
amountOfActivePageFetchesCounter = new AtomicInteger(0);
3727
this.frontier = frontier;
3828
}
3929

40-
public RequestContext newRequestContext(ServerCallStreamObserver responseObserver) {
41-
return new RequestContext(frontier, responseObserver);
42-
}
43-
44-
public boolean isCancelled() {
45-
return isShutdown.get();
46-
}
47-
4830
public void shutdown() {
4931
isShutdown.set(true);
5032
if (amountOfActiveObserversCounter.get() <= 0) {
@@ -95,66 +77,25 @@ public Frontier getFrontier() {
9577
return frontier;
9678
}
9779

98-
public int getActivePageFetchCount() {
99-
return amountOfActivePageFetchesCounter.get();
100-
}
101-
10280
public CrawlQueueManager getCrawlQueueManager() {
10381
return frontier.getCrawlQueueManager();
10482
}
10583

106-
public class RequestContext extends Context {
107-
private final ServerCallStreamObserver responseObserver;
108-
private final AtomicBoolean observerCompleted = new AtomicBoolean(false);
109-
private final AtomicBoolean pageFetchStarted = new AtomicBoolean(false);
110-
private final AtomicBoolean fetchReturned = new AtomicBoolean(false);
111-
private ScheduledFuture<Void> timeout;
112-
113-
private RequestContext(Frontier frontier, ServerCallStreamObserver responseObserver) {
114-
super(frontier);
115-
this.responseObserver = responseObserver;
116-
amountOfActiveObserversCounter.incrementAndGet();
117-
LOG.trace("Client connected. Currently active clients: {}", amountOfActiveObserversCounter.get());
118-
}
119-
120-
public boolean isCancelled() {
121-
return isShutdown.get() || responseObserver.isCancelled();
122-
}
123-
124-
public ServerCallStreamObserver getResponseObserver() {
125-
return responseObserver;
126-
}
127-
128-
public void startPageFetch() {
129-
if (pageFetchStarted.compareAndSet(false, true)) {
130-
amountOfActivePageFetchesCounter.incrementAndGet();
131-
LOG.trace("Page fetch started. Currently active page fetches: {}", amountOfActivePageFetchesCounter.get());
132-
}
133-
}
134-
135-
public void setObserverCompleted() {
136-
if (observerCompleted.compareAndSet(false, true)) {
137-
if (pageFetchStarted.get()) {
138-
amountOfActivePageFetchesCounter.decrementAndGet();
139-
}
140-
if (amountOfActiveObserversCounter.decrementAndGet() <= 0 && isShutdown.get()) {
141-
lock.lock();
142-
try {
143-
notTerminated.signalAll();
144-
} finally {
145-
lock.unlock();
146-
}
147-
}
148-
LOG.trace("Client disconnected. Currently active clients: {}. Currently active page fetches: {}",
149-
amountOfActiveObserversCounter.get(), amountOfActivePageFetchesCounter.get());
150-
}
151-
}
84+
public void startPageComplete() {
85+
amountOfActiveObserversCounter.incrementAndGet();
86+
LOG.trace("Client connected. Currently active clients: {}", amountOfActiveObserversCounter.get());
87+
}
15288

153-
public void setFetchCompleted() {
154-
if (timeout != null) {
155-
// Stop the timeout cancel handler
156-
timeout.cancel(false);
89+
public void setObserverCompleted() {
90+
if (amountOfActiveObserversCounter.decrementAndGet() <= 0 && isShutdown.get()) {
91+
lock.lock();
92+
try {
93+
notTerminated.signalAll();
94+
} finally {
95+
lock.unlock();
15796
}
15897
}
98+
LOG.trace("Client disconnected. Currently active clients: {}.",
99+
amountOfActiveObserversCounter.get());
159100
}
160101
}

src/main/java/no/nb/nna/veidemann/frontier/api/FrontierApiServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ public FrontierApiServer(ServerBuilder<?> serverBuilder, Frontier frontier) {
7373
ConcurrencyLimitServerInterceptor.newBuilder(
7474
new GrpcServerLimiterBuilder()
7575
.partitionByMethod()
76-
.partition(FrontierGrpc.getCrawlSeedMethod().getFullMethodName(), 0.38)
77-
.partition(FrontierGrpc.getGetNextPageMethod().getFullMethodName(), 0.02)
76+
.partition(FrontierGrpc.getCrawlSeedMethod().getFullMethodName(), 0.30)
77+
.partition(FrontierGrpc.getGetNextPageMethod().getFullMethodName(), 0.0001)
78+
.partition(FrontierGrpc.getPageCompletedMethod().getFullMethodName(), 0.0999)
7879
.partition(FrontierGrpc.getBusyCrawlHostGroupCountMethod().getFullMethodName(), 0.15)
7980
.partition(FrontierGrpc.getQueueCountForCrawlExecutionMethod().getFullMethodName(), 0.15)
8081
.partition(FrontierGrpc.getQueueCountForCrawlHostGroupMethod().getFullMethodName(), 0.15)

src/main/java/no/nb/nna/veidemann/frontier/api/FrontierService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,13 @@ public void crawlSeed(CrawlSeedRequest request, StreamObserver<CrawlExecutionId>
8080
}
8181

8282
@Override
83-
public StreamObserver<PageHarvest> getNextPage(StreamObserver<PageHarvestSpec> responseObserver) {
84-
return new GetNextPageHandler(ctx.newRequestContext((ServerCallStreamObserver) responseObserver));
83+
public void getNextPage(Empty request, StreamObserver<PageHarvestSpec> responseObserver) {
84+
GetNextPageHandler.onNext(ctx, responseObserver);
85+
}
86+
87+
@Override
88+
public StreamObserver<PageHarvest> pageCompleted(StreamObserver<Empty> responseObserver) {
89+
return new PageCompletedHandler(ctx, (ServerCallStreamObserver) responseObserver);
8590
}
8691

8792
@Override

0 commit comments

Comments
 (0)