Skip to content

Commit 2a686fd

Browse files
joeyjacksoncwbriones
authored andcommitted
Stamping source (#161)
* kafka source init * kafka consumer wrapper and listener * unit test and checkstyle * exception handling * unit tests for kafka source * interfaces and wrappers for kafka source * removed kafka dep * integration tests for kafka source * config deserialization * Fixed deserializer * Fixed deserializer * styling * kafka docker * pipeline config example * style * error checking * error checking * integration test kafka source from config * style * added parser to kafka source * example pipeline * Fail integration test on send fail to kafka server * requested changes * requested changes * configurable backoff time for kafka source * fixed conf deserializer * concurrent parsing workers * multi worker unit test * queue holds record values instead of records * style * instrument init * todo * mock observer for multithreaded testing * configurable buffer queue size * moved fill queue integration test to unit test * style * ensure queue fills in queue filled test * refactor kafka source constructors * style * fix injector in integration tests * instrumentation testing init * unit tests for instrumentation counter * unit test gauge metric * more instrumentation metrics * remove prinln * new metric names * metrics unit tests * requested changes * nonnull annotate * Time restamping source * Time restamping source * requested changes * checkstyle * Time restamping source * requested changes * checkstyle
1 parent 840ad11 commit 2a686fd

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.mad.sources;
17+
18+
import com.arpnetworking.commons.builder.ThreadLocalBuilder;
19+
import com.arpnetworking.commons.observer.Observable;
20+
import com.arpnetworking.commons.observer.Observer;
21+
import com.arpnetworking.logback.annotations.LogValue;
22+
import com.arpnetworking.metrics.common.sources.BaseSource;
23+
import com.arpnetworking.metrics.common.sources.Source;
24+
import com.arpnetworking.metrics.mad.model.DefaultRecord;
25+
import com.arpnetworking.metrics.mad.model.Record;
26+
import com.arpnetworking.steno.LogValueMapFactory;
27+
import com.arpnetworking.steno.Logger;
28+
import com.arpnetworking.steno.LoggerFactory;
29+
import net.sf.oval.constraint.NotNull;
30+
31+
import java.time.Instant;
32+
import java.time.ZoneId;
33+
import java.time.ZonedDateTime;
34+
35+
/**
36+
* Implementation of {@link Source} which wraps another {@link Source} and sets
37+
* the time stamp of the {@link Record}s received from the wrapped {@link Source}
38+
* to the current time.
39+
*
40+
* @author Joey Jackson (jjackson at dropbox dot com)
41+
*/
42+
public final class TimeStampingSource extends BaseSource {
43+
44+
private static final Logger LOGGER = LoggerFactory.getLogger(MappingSource.class);
45+
46+
private final Source _source;
47+
48+
@Override
49+
public void start() {
50+
_source.start();
51+
}
52+
53+
@Override
54+
public void stop() {
55+
_source.stop();
56+
}
57+
58+
/**
59+
* Generate a Steno log compatible representation.
60+
*
61+
* @return Steno log compatible representation.
62+
*/
63+
@LogValue
64+
public Object toLogValue() {
65+
return LogValueMapFactory.builder(this)
66+
.put("source", _source)
67+
.build();
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return toLogValue().toString();
73+
}
74+
75+
private TimeStampingSource(final Builder builder) {
76+
super(builder);
77+
_source = builder._source;
78+
_source.attach(new TimeStampingObserver(this));
79+
}
80+
81+
82+
private static final class TimeStampingObserver implements Observer {
83+
private final TimeStampingSource _source;
84+
85+
TimeStampingObserver(final TimeStampingSource source) {
86+
_source = source;
87+
}
88+
89+
@Override
90+
public void notify(final Observable observable, final Object event) {
91+
if (!(event instanceof Record)) {
92+
LOGGER.error()
93+
.setMessage("Observed unsupported event")
94+
.addData("event", event)
95+
.log();
96+
return;
97+
}
98+
99+
final Record record = (Record) event;
100+
_source.notify(
101+
ThreadLocalBuilder.build(
102+
DefaultRecord.Builder.class,
103+
b1 -> b1.setMetrics(record.getMetrics())
104+
.setId(record.getId())
105+
.setTime(ZonedDateTime.ofInstant(Instant.now(), ZoneId.of("UTC")))
106+
.setAnnotations(record.getAnnotations())
107+
.setDimensions(record.getDimensions())));
108+
}
109+
}
110+
111+
/**
112+
* Builder pattern for {@link TimeStampingSource}.
113+
*
114+
* @author Joey Jackson (jjackson at dropbox dot com)
115+
*/
116+
public static final class Builder extends BaseSource.Builder<Builder, TimeStampingSource> {
117+
118+
/**
119+
* Public constructor.
120+
*/
121+
public Builder() {
122+
super(TimeStampingSource::new);
123+
}
124+
125+
/**
126+
* Sets the wrapped {@link Source}. Cannot be null.
127+
*
128+
* @param value The wrapped source
129+
* @return This instance of {@link TimeStampingSource.Builder}
130+
*/
131+
public Builder setSource(final Source value) {
132+
_source = value;
133+
return this;
134+
}
135+
136+
@Override
137+
protected Builder self() {
138+
return this;
139+
}
140+
141+
@NotNull
142+
private Source _source;
143+
}
144+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.mad.sources;
17+
18+
import com.arpnetworking.commons.observer.Observable;
19+
import com.arpnetworking.commons.observer.Observer;
20+
import com.arpnetworking.metrics.common.sources.Source;
21+
import com.arpnetworking.metrics.mad.model.Record;
22+
import com.arpnetworking.test.TestBeanFactory;
23+
import org.junit.Assert;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.ArgumentCaptor;
27+
import org.mockito.Mockito;
28+
29+
import java.time.ZoneId;
30+
import java.time.ZonedDateTime;
31+
32+
/**
33+
* Unit tests for the {@link TimeStampingSource}.
34+
*
35+
* @author Joey Jackson (jjackson at dropbox dot com)
36+
*/
37+
public class TimeStampingSourceTest {
38+
private TimeStampingSource _source;
39+
private Source _mockSource;
40+
private Observer _mockObserver;
41+
private ArgumentCaptor<Record> _captor = ArgumentCaptor.forClass(Record.class);
42+
43+
@Before
44+
public void setUp() {
45+
_mockSource = Mockito.mock(Source.class);
46+
_source = new TimeStampingSource.Builder()
47+
.setName("TimeStampingSource")
48+
.setSource(_mockSource)
49+
.build();
50+
51+
_mockObserver = Mockito.mock(Observer.class);
52+
_source.attach(_mockObserver);
53+
}
54+
55+
@Test
56+
public void testAttachStampingObserver() {
57+
Mockito.verify(_mockSource).attach(Mockito.any(Observer.class));
58+
}
59+
60+
@Test
61+
public void testStart() {
62+
_source.start();
63+
Mockito.verify(_mockSource).start();
64+
}
65+
66+
@Test
67+
public void testStop() {
68+
_source.stop();
69+
Mockito.verify(_mockSource).stop();
70+
}
71+
72+
@Test
73+
public void testToString() {
74+
final String asString = _source.toString();
75+
Assert.assertNotNull(asString);
76+
Assert.assertFalse(asString.isEmpty());
77+
}
78+
79+
@Test
80+
public void testStampSuccess() {
81+
final ZonedDateTime before = ZonedDateTime.now();
82+
83+
final Record record = TestBeanFactory.createRecordBuilder()
84+
.setTime(ZonedDateTime.of(2019, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC")))
85+
.build();
86+
notify(_mockSource, record);
87+
88+
Mockito.verify(_mockObserver).notify(Mockito.same(_source), _captor.capture());
89+
final Record received = _captor.getValue();
90+
91+
final ZonedDateTime after = ZonedDateTime.now();
92+
93+
Assert.assertTrue(String.format("Timestamp should have been between %s and %s but was %s", before.toString(),
94+
after.toString(), received.getTime().toString()),
95+
!received.getTime().isBefore(before) && !received.getTime().isAfter(after));
96+
}
97+
98+
private static void notify(final Observable observable, final Object event) {
99+
final ArgumentCaptor<Observer> argument = ArgumentCaptor.forClass(Observer.class);
100+
Mockito.verify(observable).attach(argument.capture());
101+
for (final Observer observer : argument.getAllValues()) {
102+
observer.notify(observable, event);
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)