2222package com .microsoft .applicationinsights .channel .concrete .inprocess ;
2323
2424import java .io .IOException ;
25+ import java .io .PrintWriter ;
2526import java .io .StringWriter ;
2627import java .net .URI ;
2728import java .util .Map ;
2829import java .util .concurrent .TimeUnit ;
30+ import java .util .concurrent .atomic .AtomicLong ;
2931
3032import com .microsoft .applicationinsights .internal .channel .TelemetriesTransmitter ;
3133import com .microsoft .applicationinsights .channel .TelemetrySampler ;
4244
4345import com .google .common .base .Strings ;
4446import com .google .common .base .Preconditions ;
47+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
4548
4649/**
4750 * An implementation of {@link com.microsoft.applicationinsights.channel.TelemetryChannel}
48- *
51+ * <p>
4952 * The channel holds two main entities:
50- *
53+ * <p>
5154 * A buffer for incoming {@link com.microsoft.applicationinsights.telemetry.Telemetry} instances
5255 * A transmitter
53- *
56+ * <p>
5457 * The buffer is stores incoming telemetry instances. Every new buffer starts a timer.
5558 * When the timer expires, or when the buffer is 'full' (whichever happens first), the
5659 * transmitter will pick up that buffer and will handle its sending to the server. For example,
5760 * a transmitter will be responsible for compressing, sending and activate a policy in case of failures.
58- *
61+ * <p>
5962 * The model here is:
60- *
63+ * <p>
6164 * Use application threads to populate the buffer
6265 * Use channel's threads to send buffers to the server
63- *
66+ * <p>
6467 * Created by gupele on 12/17/2014.
6568 */
6669public final class InProcessTelemetryChannel implements TelemetryChannel {
@@ -90,6 +93,8 @@ public final class InProcessTelemetryChannel implements TelemetryChannel {
9093 private TelemetryBuffer telemetryBuffer ;
9194 private TelemetrySampler telemetrySampler ;
9295
96+ private static AtomicLong itemsSent = new AtomicLong (0 );
97+
9398 public InProcessTelemetryChannel () {
9499 boolean developerMode = false ;
95100 try {
@@ -99,6 +104,8 @@ public InProcessTelemetryChannel() {
99104 }
100105 } catch (Throwable t ) {
101106 developerMode = false ;
107+ InternalLogger .INSTANCE .trace ("%s generated exception in parsing," +
108+ "stack trace is %s" , DEVELOPER_MODE_SYSTEM_PROPRETY_NAME , ExceptionUtils .getStackTrace (t ));
102109 }
103110 initialize (null ,
104111 null ,
@@ -110,25 +117,27 @@ public InProcessTelemetryChannel() {
110117
111118 /**
112119 * Ctor
113- * @param endpointAddress Must be empty string or a valid uri, else an exception will be thrown
114- * @param developerMode True will behave in a 'non-production' mode to ease the debugging
120+ *
121+ * @param endpointAddress Must be empty string or a valid uri, else an exception will be thrown
122+ * @param developerMode True will behave in a 'non-production' mode to ease the debugging
115123 * @param maxTelemetryBufferCapacity Max number of Telemetries we keep in the buffer, when reached we will send the buffer
116- * Note, value should be between TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive
117- * @param sendIntervalInMillis The maximum number of milliseconds to wait before we send the buffer
118- * Note, value should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
124+ * Note, value should be between TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive
125+ * @param sendIntervalInMillis The maximum number of milliseconds to wait before we send the buffer
126+ * Note, value should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
119127 */
120128 public InProcessTelemetryChannel (String endpointAddress , boolean developerMode , int maxTelemetryBufferCapacity , int sendIntervalInMillis ) {
121129 initialize (endpointAddress ,
122- null ,
123- developerMode ,
124- createDefaultMaxTelemetryBufferCapacityEnforcer (maxTelemetryBufferCapacity ),
125- createDefaultSendIntervalInSecondsEnforcer (sendIntervalInMillis ),
126- true );
130+ null ,
131+ developerMode ,
132+ createDefaultMaxTelemetryBufferCapacityEnforcer (maxTelemetryBufferCapacity ),
133+ createDefaultSendIntervalInSecondsEnforcer (sendIntervalInMillis ),
134+ true );
127135 }
128136
129137 /**
130138 * This Ctor will query the 'namesAndValues' map for data to initialize itself
131139 * It will ignore data that is not of its interest, this Ctor is useful for building an instance from configuration
140+ *
132141 * @param namesAndValues - The data passed as name and value pairs
133142 */
134143 public InProcessTelemetryChannel (Map <String , String > namesAndValues ) {
@@ -157,7 +166,7 @@ public InProcessTelemetryChannel(Map<String, String> namesAndValues) {
157166 }
158167
159168 /**
160- * Gets value indicating whether this channel is in developer mode.
169+ * Gets value indicating whether this channel is in developer mode.
161170 */
162171 @ Override
163172 public boolean isDeveloperMode () {
@@ -166,6 +175,7 @@ public boolean isDeveloperMode() {
166175
167176 /**
168177 * Sets value indicating whether this channel is in developer mode.
178+ *
169179 * @param developerMode True or false
170180 */
171181 @ Override
@@ -179,7 +189,7 @@ public void setDeveloperMode(boolean developerMode) {
179189 }
180190
181191 /**
182- * Sends a Telemetry instance through the channel.
192+ * Sends a Telemetry instance through the channel.
183193 */
184194 @ Override
185195 public void send (Telemetry telemetry ) {
@@ -204,8 +214,13 @@ public void send(Telemetry telemetry) {
204214 String asJson = writer .toString ();
205215 telemetryBuffer .add (asJson );
206216 telemetry .reset ();
217+ if (itemsSent .incrementAndGet () % 10000 == 0 ) {
218+ InternalLogger .INSTANCE .info ("items sent till now %d" , itemsSent .get ());
219+ }
220+
207221 } catch (IOException e ) {
208222 InternalLogger .INSTANCE .error ("Failed to serialize Telemetry" );
223+ InternalLogger .INSTANCE .trace ("Stack trace is %s" , ExceptionUtils .getStackTrace (e ));
209224 return ;
210225 }
211226
@@ -227,6 +242,8 @@ public synchronized void stop(long timeout, TimeUnit timeUnit) {
227242 telemetriesTransmitter .stop (timeout , timeUnit );
228243 stopped = true ;
229244 } catch (Throwable t ) {
245+ InternalLogger .INSTANCE .error ("Exception generated while stopping telemetry transmitter" );
246+ InternalLogger .INSTANCE .trace ("Stack trace generated is %s" , ExceptionUtils .getStackTrace (t ));
230247 }
231248 }
232249
@@ -241,6 +258,7 @@ public void flush() {
241258 /**
242259 * Sets an optional Sampler that can sample out telemetries
243260 * Currently, we don't allow to replace a valid telemtry sampler.
261+ *
244262 * @param telemetrySampler - The sampler
245263 */
246264 @ Override
@@ -252,6 +270,7 @@ public void setSampler(TelemetrySampler telemetrySampler) {
252270
253271 /**
254272 * Sets the buffer size
273+ *
255274 * @param maxTelemetriesInBatch should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY
256275 * and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
257276 * if the number is lower than the minimum then the minimum will be used
@@ -263,6 +282,7 @@ public void setMaxTelemetriesInBatch(int maxTelemetriesInBatch) {
263282
264283 /**
265284 * Sets the time tow wait before flushing the internal buffer
285+ *
266286 * @param transmitBufferTimeoutInSeconds should be between MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS
267287 * and MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS inclusive
268288 * if the number is lower than the minimum then the minimum will be used
@@ -297,6 +317,7 @@ private synchronized void initialize(String endpointAddress,
297317 /**
298318 * The method will throw IllegalArgumentException if the endpointAddress is not a valid uri
299319 * Please note that a null or empty string is valid as far as the class is concerned and thus considered valid
320+ *
300321 * @param endpointAddress
301322 */
302323 private void makeSureEndpointAddressIsValid (String endpointAddress ) {
0 commit comments