Skip to content

Commit d8c9bf3

Browse files
committed
Split TransmissionOutput into sync and async
1 parent c8fdb4e commit d8c9bf3

File tree

15 files changed

+119
-78
lines changed

15 files changed

+119
-78
lines changed

core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryTransmitterFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import com.microsoft.applicationinsights.TelemetryConfiguration;
2525
import com.microsoft.applicationinsights.internal.channel.ConfiguredTransmitterFactory;
2626
import com.microsoft.applicationinsights.internal.channel.TelemetriesTransmitter;
27-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
2827
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
28+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputAsync;
2929
import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader;
3030
import com.microsoft.applicationinsights.internal.channel.common.ActiveTransmissionFileSystemOutput;
3131
import com.microsoft.applicationinsights.internal.channel.common.ActiveTransmissionLoader;
@@ -77,14 +77,14 @@ public TelemetriesTransmitter create(TelemetryConfiguration configuration, Strin
7777
private TelemetriesTransmitter finishTransmitterConstruction(String maxTransmissionStorageCapacity, TransmissionPolicyManager transmissionPolicyManager, TransmissionNetworkOutput actualNetworkSender) {
7878
TransmissionPolicyStateFetcher stateFetcher = transmissionPolicyManager.getTransmissionPolicyState();
7979

80-
TransmissionOutput networkSender = new ActiveTransmissionNetworkOutput(actualNetworkSender, stateFetcher);
8180

81+
TransmissionOutputAsync networkSender = new ActiveTransmissionNetworkOutput(actualNetworkSender, stateFetcher);
8282
// An active object with the file system sender
8383
TransmissionFileSystemOutput fileSystemSender = new TransmissionFileSystemOutput(null, maxTransmissionStorageCapacity);
84-
TransmissionOutput activeFileSystemOutput = new ActiveTransmissionFileSystemOutput(fileSystemSender, stateFetcher);
84+
TransmissionOutputAsync activeFileSystemOutput = new ActiveTransmissionFileSystemOutput(fileSystemSender, stateFetcher);
8585

8686
// The dispatcher works with the two active senders
87-
TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutput[]{networkSender, activeFileSystemOutput});
87+
TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutputAsync[]{networkSender, activeFileSystemOutput});
8888
actualNetworkSender.setTransmissionDispatcher(dispatcher);
8989

9090

core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
*/
1010
public interface TransmissionHandler {
1111
/**
12-
* Called when a transmission is sent by the {@link TransmissionOutput}.
12+
* Called when a transmission is sent by the {@link TransmissionOutputSync}.
1313
* @param args The {@link TransmissionHandlerArgs} for this handler.
1414
*/
1515
void onTransmissionSent(TransmissionHandlerArgs args);
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@
2121

2222
package com.microsoft.applicationinsights.internal.channel;
2323

24-
import com.microsoft.applicationinsights.internal.channel.common.Transmission;
25-
2624
import java.util.concurrent.TimeUnit;
2725

26+
import com.microsoft.applicationinsights.internal.channel.common.Transmission;
27+
2828
/**
29-
* Defines the interface of classes that get a {@link com.microsoft.applicationinsights.internal.channel.common.Transmission}
29+
* Defines the interface of classes that get a {@link Transmission}
3030
* and can 'send' it.
3131
*
3232
* Concrete classes can 'send' the data to remote server, to disk, database etc.
3333
*
3434
* Created by gupele on 12/18/2014.
3535
*/
36-
public interface TransmissionOutput {
37-
boolean send(Transmission transmission);
36+
public interface TransmissionOutputAsync {
37+
boolean sendAsync(Transmission transmission);
3838

3939
void stop(long timeout, TimeUnit timeUnit);
4040
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* ApplicationInsights-Java
3+
* Copyright (c) Microsoft Corporation
4+
* All rights reserved.
5+
*
6+
* MIT License
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this
8+
* software and associated documentation files (the ""Software""), to deal in the Software
9+
* without restriction, including without limitation the rights to use, copy, modify, merge,
10+
* publish, distribute, sublicense, and/or sell copies of the Software, and to permit
11+
* persons to whom the Software is furnished to do so, subject to the following conditions:
12+
* The above copyright notice and this permission notice shall be included in all copies or
13+
* substantial portions of the Software.
14+
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
15+
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16+
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
17+
* FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
18+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
* DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package com.microsoft.applicationinsights.internal.channel;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
import com.microsoft.applicationinsights.internal.channel.common.Transmission;
27+
28+
/**
29+
* Defines the interface of classes that get a {@link Transmission}
30+
* and can 'send' it.
31+
*
32+
* Concrete classes can 'send' the data to remote server, to disk, database etc.
33+
*
34+
* Created by gupele on 12/18/2014.
35+
*/
36+
public interface TransmissionOutputSync {
37+
boolean sendSync(Transmission transmission);
38+
39+
void stop(long timeout, TimeUnit timeUnit);
40+
}
41+

core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionFileSystemOutput.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

2929
import com.google.common.base.Preconditions;
30-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
30+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputAsync;
31+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputSync;
3132
import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils;
3233

3334
/**
@@ -37,14 +38,14 @@
3738
*
3839
* Created by gupele on 12/22/2014.
3940
*/
40-
public final class ActiveTransmissionFileSystemOutput implements TransmissionOutput {
41+
public final class ActiveTransmissionFileSystemOutput implements TransmissionOutputAsync {
4142
private static final AtomicInteger INSTANCE_ID_POOL = new AtomicInteger(1);
4243
private final ThreadPoolExecutor threadPool;
43-
private final TransmissionOutput actualOutput;
44+
private final TransmissionOutputSync actualOutput;
4445
private final TransmissionPolicyStateFetcher transmissionPolicy;
4546
private final int instanceId = INSTANCE_ID_POOL.getAndIncrement();
4647

47-
public ActiveTransmissionFileSystemOutput(TransmissionOutput actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
48+
public ActiveTransmissionFileSystemOutput(TransmissionOutputSync actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
4849
Preconditions.checkNotNull(transmissionPolicy, "transmissionPolicy must be a non-null value");
4950

5051
this.actualOutput = actualOutput;
@@ -56,7 +57,7 @@ public ActiveTransmissionFileSystemOutput(TransmissionOutput actualOutput, Trans
5657
}
5758

5859
@Override
59-
public boolean send(final Transmission transmission) {
60+
public boolean sendAsync(final Transmission transmission) {
6061
// TODO: check the possibility of refactoring the 'send' and possible log on errors
6162
try {
6263
if (transmissionPolicy.getCurrentState() == TransmissionPolicy.BLOCKED_AND_CANNOT_BE_PERSISTED) {
@@ -67,7 +68,7 @@ public boolean send(final Transmission transmission) {
6768
@Override
6869
public void run() {
6970
try {
70-
actualOutput.send(transmission);
71+
actualOutput.sendSync(transmission);
7172
} catch (ThreadDeath td) {
7273
throw td;
7374
} catch (Throwable throwable) {

core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionNetworkOutput.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

2929
import com.google.common.base.Preconditions;
30-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
30+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputAsync;
31+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputSync;
3132
import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils;
3233

3334
/**
3435
* Created by gupele on 12/18/2014.
3536
*/
36-
public final class ActiveTransmissionNetworkOutput implements TransmissionOutput {
37+
public final class ActiveTransmissionNetworkOutput implements TransmissionOutputAsync {
3738
private final static int DEFAULT_MAX_MESSAGES_IN_BUFFER = 128;
3839
private final static int DEFAULT_MIN_NUMBER_OF_THREADS = 7;
3940
private final static int DEFAULT_MAX_NUMBER_OF_THREADS = 7;
@@ -42,15 +43,15 @@ public final class ActiveTransmissionNetworkOutput implements TransmissionOutput
4243

4344
private final int maxThreads;
4445
private final ThreadPoolExecutor outputThreads;
45-
private final TransmissionOutput actualOutput;
46+
private final TransmissionOutputSync actualOutput;
4647
private final TransmissionPolicyStateFetcher transmissionPolicy;
4748
private final int instanceId = INTSTANCE_ID_POOL.getAndIncrement();
4849

49-
public ActiveTransmissionNetworkOutput(TransmissionOutput actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
50+
public ActiveTransmissionNetworkOutput(TransmissionOutputSync actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
5051
this(actualOutput, transmissionPolicy, DEFAULT_MAX_MESSAGES_IN_BUFFER);
5152
}
5253

53-
public ActiveTransmissionNetworkOutput(TransmissionOutput actualOutput, TransmissionPolicyStateFetcher transmissionPolicy, int maxMessagesInBuffer) {
54+
public ActiveTransmissionNetworkOutput(TransmissionOutputSync actualOutput, TransmissionPolicyStateFetcher transmissionPolicy, int maxMessagesInBuffer) {
5455
Preconditions.checkNotNull(transmissionPolicy, "transmissionPolicy must be a valid non-null value");
5556

5657
this.actualOutput = actualOutput;
@@ -66,7 +67,7 @@ public ActiveTransmissionNetworkOutput(TransmissionOutput actualOutput, Transmis
6667
}
6768

6869
@Override
69-
public boolean send(final Transmission transmission) {
70+
public boolean sendAsync(final Transmission transmission) {
7071
try {
7172
if (transmissionPolicy.getCurrentState() != TransmissionPolicy.UNBLOCKED) {
7273
return false;
@@ -76,7 +77,7 @@ public boolean send(final Transmission transmission) {
7677
@Override
7778
public void run() {
7879
try {
79-
actualOutput.send(transmission);
80+
actualOutput.sendSync(transmission);
8081
} catch (ThreadDeath td) {
8182
throw td;
8283
} catch (Throwable throwable) {

core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/NonBlockingDispatcher.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,24 @@
2525

2626
import com.google.common.base.Preconditions;
2727
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
28-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
28+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputAsync;
2929

3030
/**
3131
* The class implements {@link TransmissionDispatcher}
3232
*
33-
* Basically, the class tries to find one {@link TransmissionOutput}
33+
* Basically, the class tries to find one {@link TransmissionOutputAsync}
3434
* that will accept the incoming {@link Transmission}.
3535
*
3636
* It is a non blocking behavior in the sense that if no one can accept it will drop the data
3737
*
3838
* Created by gupele on 12/18/2014.
3939
*/
4040
public final class NonBlockingDispatcher implements TransmissionDispatcher {
41-
private final TransmissionOutput[] transmissionOutputs;
41+
private final TransmissionOutputAsync[] transmissionOutputs;
4242

43-
public NonBlockingDispatcher(TransmissionOutput[] transmissionOutputs) {
43+
public NonBlockingDispatcher(TransmissionOutputAsync[] transmissionOutputs) {
4444
Preconditions.checkNotNull(transmissionOutputs, "transmissionOutputs should be non-null value");
45-
Preconditions.checkArgument(transmissionOutputs.length > 0, "There should be at least one TransmissionOutput");
45+
Preconditions.checkArgument(transmissionOutputs.length > 0, "There should be at least one transmission output");
4646

4747
this.transmissionOutputs = transmissionOutputs;
4848
}
@@ -51,16 +51,16 @@ public NonBlockingDispatcher(TransmissionOutput[] transmissionOutputs) {
5151
public void dispatch(Transmission transmission) {
5252
Preconditions.checkNotNull(transmission, "transmission should be non-null value");
5353

54-
for (TransmissionOutput output : transmissionOutputs) {
55-
if (output.send(transmission)) {
54+
for (TransmissionOutputAsync output : transmissionOutputs) {
55+
if (output.sendAsync(transmission)) {
5656
return;
5757
}
5858
}
5959
}
6060

6161
@Override
6262
public void stop(long timeout, TimeUnit timeUnit) {
63-
for (TransmissionOutput output : transmissionOutputs) {
63+
for (TransmissionOutputAsync output : transmissionOutputs) {
6464
output.stop(timeout, timeUnit);
6565
}
6666
}

core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.concurrent.atomic.AtomicLong;
4646

47-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
48-
47+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputSync;
4948
import com.microsoft.applicationinsights.internal.util.LimitsEnforcer;
5049
import com.microsoft.applicationinsights.internal.util.LocalFileSystemUtils;
5150
import org.apache.commons.io.FileUtils;
@@ -67,7 +66,7 @@
6766
*
6867
* Created by gupele on 12/18/2014.
6968
*/
70-
public final class TransmissionFileSystemOutput implements TransmissionOutput {
69+
public final class TransmissionFileSystemOutput implements TransmissionOutputSync {
7170

7271
private static final Logger logger = LoggerFactory.getLogger(TransmissionFileSystemOutput.class);
7372

@@ -137,7 +136,7 @@ public TransmissionFileSystemOutput(String folderPath) {
137136
}
138137

139138
@Override
140-
public boolean send(Transmission transmission) {
139+
public boolean sendSync(Transmission transmission) {
141140

142141
long currentSizeInBytes = size.get();
143142
if (currentSizeInBytes >= capacityInBytes) {

core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.microsoft.applicationinsights.TelemetryConfiguration;
2626
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
2727
import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
28-
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
28+
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputSync;
2929
import org.apache.commons.lang3.StringUtils;
3030
import org.apache.http.Header;
3131
import org.apache.http.HttpEntity;
@@ -52,7 +52,7 @@
5252
*
5353
* Created by gupele on 12/18/2014.
5454
*/
55-
public final class TransmissionNetworkOutput implements TransmissionOutput {
55+
public final class TransmissionNetworkOutput implements TransmissionOutputSync {
5656

5757
private static final Logger logger = LoggerFactory.getLogger(TransmissionNetworkOutput.class);
5858

@@ -157,10 +157,9 @@ public synchronized void stop(long timeout, TimeUnit timeUnit) {
157157
* @return True when done.
158158
*/
159159
@Override
160-
public boolean send(Transmission transmission) {
160+
public boolean sendSync(Transmission transmission) {
161161
if (!stopped) {
162-
// If we're not stopped but in a blocked state then fail to second
163-
// TransmissionOutput
162+
// If we're not stopped but in a blocked state then fail to second transmission output
164163
if (transmissionPolicyManager.getTransmissionPolicyState().getCurrentState() != TransmissionPolicy.UNBLOCKED) {
165164
return false;
166165
}
@@ -183,7 +182,7 @@ public boolean send(Transmission transmission) {
183182
respString = EntityUtils.toString(respEntity);
184183
retryAfterHeader = response.getFirstHeader(RESPONSE_THROTTLING_HEADER);
185184

186-
// After we reach our instant retry limit we should fail to second TransmissionOutput
185+
// After we reach our instant retry limit we should fail to second transmission output
187186
if (code > HttpStatus.SC_PARTIAL_CONTENT && transmission.getNumberOfSends() > this.transmissionPolicyManager.getMaxInstantRetries()) {
188187
return false;
189188
} else if (code == HttpStatus.SC_OK) {

core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private void testFilesOnDiskAreLoaded(int amount, boolean putFilesFirst) throws
105105
}
106106

107107
for (int i = 0; i < amount; ++i) {
108-
fileSystem.send(new Transmission(new byte[2], "MockContentType", "MockEncodingType"));
108+
fileSystem.sendSync(new Transmission(new byte[2], "MockContentType", "MockEncodingType"));
109109
}
110110

111111
if (putFilesFirst) {

0 commit comments

Comments
 (0)