Skip to content

Commit 3791d0c

Browse files
committed
Track time spent in DNS resolution by OKHTTP protocol, fixes #878
1 parent 3bf66e9 commit 3791d0c

File tree

4 files changed

+97
-6
lines changed

4 files changed

+97
-6
lines changed

core/src/main/java/com/digitalpebble/stormcrawler/bolt/FetcherBolt.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,14 @@ fit.t, new Values(fit.url,
629629
long timeFetching = System.currentTimeMillis() - start;
630630

631631
final int byteLength = response.getContent().length;
632+
633+
// get any metrics from the protocol metadata
634+
// expect Longs
635+
response.getMetadata().keySet().stream()
636+
.filter(s -> s.startsWith("metrics."))
637+
.forEach(s -> averagedMetrics.scope(s.substring(8))
638+
.update(Long.parseLong(response.getMetadata()
639+
.getFirstValue(s))));
632640

633641
averagedMetrics.scope("fetch_time").update(timeFetching);
634642
averagedMetrics.scope("time_in_queues")

core/src/main/java/com/digitalpebble/stormcrawler/bolt/SimpleFetcherBolt.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,13 @@ input, new Values(urlString, metadata,
433433
long timeFetching = System.currentTimeMillis() - start;
434434

435435
final int byteLength = response.getContent().length;
436+
437+
// get any metrics from the protocol metadata
438+
response.getMetadata().keySet().stream()
439+
.filter(s -> s.startsWith("metrics."))
440+
.forEach(s -> averagedMetrics.scope(s.substring(8))
441+
.update(Long.parseLong(
442+
response.getMetadata().getFirstValue(s))));
436443

437444
averagedMetrics.scope("wait_time").update(timeWaiting);
438445
averagedMetrics.scope("fetch_time").update(timeFetching);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Licensed to DigitalPebble Ltd under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.digitalpebble.stormcrawler.protocol.okhttp;
19+
20+
import java.net.InetAddress;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.slf4j.LoggerFactory;
25+
26+
import okhttp3.Call;
27+
import okhttp3.EventListener;
28+
29+
public class DNSResolutionListener extends EventListener {
30+
31+
private static final org.slf4j.Logger LOG = LoggerFactory
32+
.getLogger(DNSResolutionListener.class);
33+
34+
private long dnsStartMillis;
35+
36+
final Map<String, Long> times;
37+
38+
public DNSResolutionListener(final Map<String, Long> times) {
39+
this.times = times;
40+
}
41+
42+
@Override
43+
public void dnsEnd(Call call, String domainName,
44+
List<InetAddress> inetAddressList) {
45+
long timeSpent = System.currentTimeMillis() - dnsStartMillis;
46+
LOG.debug("DNS resolution for {} took {} millisecs", domainName,
47+
timeSpent);
48+
times.put(call.toString(), timeSpent);
49+
}
50+
51+
@Override
52+
public void dnsStart(Call call, String domainName) {
53+
dnsStartMillis = System.currentTimeMillis();
54+
}
55+
56+
}

core/src/main/java/com/digitalpebble/stormcrawler/protocol/okhttp/HttpProtocol.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import java.security.cert.CertificateException;
2626
import java.util.ArrayList;
2727
import java.util.Base64;
28+
import java.util.HashMap;
2829
import java.util.LinkedList;
2930
import java.util.List;
3031
import java.util.Locale;
32+
import java.util.Map;
3133
import java.util.concurrent.TimeUnit;
3234

3335
import javax.net.ssl.HostnameVerifier;
@@ -54,6 +56,7 @@
5456
import okhttp3.Call;
5557
import okhttp3.Connection;
5658
import okhttp3.Credentials;
59+
import okhttp3.EventListener;
5760
import okhttp3.Headers;
5861
import okhttp3.Interceptor;
5962
import okhttp3.MediaType;
@@ -65,6 +68,7 @@
6568
import okhttp3.Response;
6669
import okhttp3.ResponseBody;
6770
import okhttp3.Route;
71+
import okhttp3.EventListener.Factory;
6872
import okio.BufferedSource;
6973

7074
public class HttpProtocol extends AbstractHttpProtocol {
@@ -86,6 +90,9 @@ public class HttpProtocol extends AbstractHttpProtocol {
8690

8791
private final List<String[]> customRequestHeaders = new LinkedList<>();
8892

93+
// track the time spent for each URL in DNS resolution
94+
private final Map<String, Long> DNStimes = new HashMap<>();
95+
8996
private static final TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() {
9097
@Override
9198
public void checkClientTrusted(
@@ -237,6 +244,13 @@ public boolean verify(String hostname, SSLSession session) {
237244
});
238245
}
239246

247+
builder.eventListenerFactory(new Factory() {
248+
@Override
249+
public EventListener create(Call call) {
250+
return new DNSResolutionListener(DNStimes);
251+
}
252+
});
253+
240254
client = builder.build();
241255
}
242256

@@ -332,7 +346,13 @@ public ProtocolResponse getProtocolOutput(String url, final Metadata metadata) t
332346
trimmed.getValue().toString().toLowerCase(Locale.ROOT));
333347
LOG.warn("HTTP content trimmed to {}", bytes.length);
334348
}
335-
349+
350+
Long DNSResolution = DNStimes.remove(call.toString());
351+
if (DNSResolution != null) {
352+
responsemetadata.setValue("metrics.dns.resolution.msec",
353+
DNSResolution.toString());
354+
}
355+
336356
return new ProtocolResponse(bytes, response.code(), responsemetadata);
337357
}
338358
}
@@ -358,7 +378,7 @@ private final byte[] toByteArray(final ResponseBody responseBody,
358378
int bytesRequested = 0;
359379
int bufferGrowStepBytes = 8192;
360380

361-
while (source.buffer().size() <= maxContentBytes) {
381+
while (source.getBuffer().size() <= maxContentBytes) {
362382
bytesRequested += Math.min(bufferGrowStepBytes,
363383
/*
364384
* request one byte more than required to reliably detect truncated
@@ -371,7 +391,7 @@ private final byte[] toByteArray(final ResponseBody responseBody,
371391
success = source.request(bytesRequested);
372392
} catch (IOException e) {
373393
// requesting more content failed, e.g. by a socket timeout
374-
if (partialContentAsTrimmed && source.buffer().size() > 0) {
394+
if (partialContentAsTrimmed && source.getBuffer().size() > 0) {
375395
// treat already fetched content as trimmed
376396
trimmed.setValue(TrimmedContentReason.DISCONNECT);
377397
LOG.debug("Exception while fetching {}", e);
@@ -392,17 +412,17 @@ private final byte[] toByteArray(final ResponseBody responseBody,
392412

393413
// okhttp may fetch more content than requested, quickly "increment"
394414
// bytes
395-
bytesRequested = (int) source.buffer().size();
415+
bytesRequested = (int) source.getBuffer().size();
396416
}
397-
int bytesBuffered = (int) source.buffer().size();
417+
int bytesBuffered = (int) source.getBuffer().size();
398418
int bytesToCopy = bytesBuffered;
399419
if (maxContent != -1 && bytesToCopy > maxContent) {
400420
// okhttp's internal buffer is larger than maxContent
401421
trimmed.setValue(TrimmedContentReason.LENGTH);
402422
bytesToCopy = maxContentBytes;
403423
}
404424
byte[] arr = new byte[bytesToCopy];
405-
source.buffer().readFully(arr);
425+
source.getBuffer().readFully(arr);
406426
return arr;
407427
}
408428

0 commit comments

Comments
 (0)