Skip to content

Commit 62bf9e0

Browse files
authored
added waitForItems methods so to not rely on guessing the right time (#621)
* added waitForItems methods so to not rely on guessing the right time * intellij messed up my imports...fixed, i guess * last time i mess with the imports * srsly, this is the last time
1 parent 1ec84e4 commit 62bf9e0

File tree

2 files changed

+82
-9
lines changed

2 files changed

+82
-9
lines changed

test/smoke/framework/fakeIngestion/servlet/src/main/java/com/microsoft/applicationinsights/test/fakeingestion/MockedAppInsightsIngestionServlet.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.microsoft.applicationinsights.internal.schemav2.Envelope;
1010
import com.microsoft.applicationinsights.smoketest.JsonHelper;
1111

12+
import javax.annotation.concurrent.GuardedBy;
1213
import javax.servlet.ServletConfig;
1314
import javax.servlet.ServletException;
1415
import javax.servlet.http.HttpServlet;
@@ -18,9 +19,17 @@
1819
import java.io.InputStreamReader;
1920
import java.io.StringWriter;
2021
import java.util.ArrayList;
22+
import java.util.Collection;
2123
import java.util.List;
2224
import java.util.Queue;
25+
import java.util.concurrent.Callable;
2326
import java.util.concurrent.ConcurrentLinkedDeque;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.Future;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
2433
import java.util.zip.GZIPInputStream;
2534

2635
public class MockedAppInsightsIngestionServlet extends HttpServlet {
@@ -31,12 +40,18 @@ public class MockedAppInsightsIngestionServlet extends HttpServlet {
3140

3241
private final String appid = "DUMMYAPPID";
3342

34-
private Queue<Envelope> telemetryReceived;
35-
private ListMultimap<String, Envelope> type2envelope;
43+
44+
private final Queue<Envelope> telemetryReceived;
45+
@GuardedBy("multimapLock")
46+
private final ListMultimap<String, Envelope> type2envelope;
3647
private List<Predicate<Envelope>> filters;
3748

49+
private final Object multimapLock = new Object();
50+
3851
private MockedIngestionServletConfig config;
3952

53+
private final ExecutorService itemExecutor = Executors.newSingleThreadExecutor();
54+
4055
public static final String LOG_PAYLOADS_PARAMETER_KEY = "logPayloads";
4156
public static final String RETAIN_PAYLOADS_PARAMETER_KEY = "retainPayloads";
4257

@@ -96,7 +111,9 @@ public MockedIngestionServletConfig getIngestionConfig() {
96111
public void resetData() {
97112
logit("Clearing telemetry accumulator...");
98113
telemetryReceived.clear();
99-
type2envelope.clear();
114+
synchronized (multimapLock) {
115+
type2envelope.clear();
116+
}
100117
}
101118

102119
public boolean hasData() {
@@ -113,7 +130,33 @@ public Envelope nextItem() {
113130

114131
public List<Envelope> getItemsByType(String type) {
115132
Preconditions.checkNotNull(type, "type");
116-
return type2envelope.get(type);
133+
synchronized (multimapLock) {
134+
return type2envelope.get(type);
135+
}
136+
}
137+
138+
public List<Envelope> waitForItems(final Predicate<Envelope> condition, final int numItems, int timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
139+
final Future<List<Envelope>> future = itemExecutor.submit(new Callable<List<Envelope>>() {
140+
@Override
141+
public List<Envelope> call() throws Exception {
142+
List<Envelope> targetCollection = new ArrayList<>(numItems);
143+
while(targetCollection.size() < numItems) {
144+
targetCollection.clear();
145+
final Collection<Envelope> currentValues;
146+
synchronized (multimapLock) {
147+
currentValues = new ArrayList<>(type2envelope.values());
148+
}
149+
for (Envelope val : currentValues) {
150+
if (condition.apply(val)) {
151+
targetCollection.add(val);
152+
}
153+
}
154+
TimeUnit.MILLISECONDS.sleep(75);
155+
}
156+
return targetCollection;
157+
}
158+
});
159+
return future.get(timeout, timeUnit);
117160
}
118161

119162
@Override
@@ -157,8 +200,10 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
157200
String baseType = envelope.getData().getBaseType();
158201
if (filtersAllowItem(envelope)) {
159202
logit("Adding telemetry item: "+baseType);
160-
type2envelope.put(baseType, envelope);
161203
telemetryReceived.offer(envelope);
204+
synchronized (multimapLock) {
205+
type2envelope.put(baseType, envelope);
206+
}
162207
} else {
163208
logit("Rejected telemetry item by filter: "+baseType);
164209
}

test/smoke/framework/fakeIngestion/standalone/src/main/java/com/microsoft/applicationinsights/test/fakeingestion/MockedAppInsightsIngestionServer.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package com.microsoft.applicationinsights.test.fakeingestion;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
5-
63
import com.google.common.base.Preconditions;
74
import com.google.common.base.Predicate;
85
import com.microsoft.applicationinsights.internal.schemav2.Data;
96
import com.microsoft.applicationinsights.internal.schemav2.Domain;
107
import com.microsoft.applicationinsights.internal.schemav2.Envelope;
11-
128
import org.eclipse.jetty.server.Server;
139
import org.eclipse.jetty.servlet.ServletHandler;
1410
import org.eclipse.jetty.servlet.ServletHolder;
1511

12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.ExecutionException;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.TimeoutException;
17+
1618
public class MockedAppInsightsIngestionServer implements AutoCloseable {
1719
public static final int DEFAULT_PORT = 60606;
1820

@@ -89,6 +91,32 @@ public <T extends Domain> T getBaseDataForType(int index, String type) {
8991
return data.getBaseData();
9092
}
9193

94+
/**
95+
* Waits the given amount of time for this mocked server to recieve one telemetry item matching the given predicate.
96+
*
97+
* @see #waitForItems(Predicate, int, int, TimeUnit)
98+
*/
99+
public Envelope waitForItem(Predicate<Envelope> condition, int timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
100+
return waitForItems(condition, 1, timeout, timeUnit).get(0);
101+
}
102+
103+
104+
/**
105+
* Waits the given amount of time for this mocked server to receive a certain number of items which match the given predicate.
106+
*
107+
* @param condition condition describing what items to wait for.
108+
* @param numItems number of matching items to wait for.
109+
* @param timeout amount of time to wait
110+
* @param timeUnit the unit of time to wait
111+
* @return The items the given condition. This will be at least {@code numItems}, but could be more.
112+
* @throws InterruptedException if the thread is interrupted while waiting
113+
* @throws ExecutionException if an exception is thrown while waiting
114+
* @throws TimeoutException if the timeout is reached
115+
*/
116+
public List<Envelope> waitForItems(Predicate<Envelope> condition, int numItems, int timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
117+
return this.servlet.waitForItems(condition, numItems, timeout, timeUnit);
118+
}
119+
92120
@Override
93121
public void close() throws Exception {
94122
stopServer();

0 commit comments

Comments
 (0)