Skip to content

Commit b7e67e1

Browse files
authored
Refactor CloudWatch related code and add logs input (#43)
* Refactor CloudWatch related code - Create a generic Kinesis transport and reader that can be used by different inputs - Create an abstract CloudWatch log data codec - Rename some flow log classes to more generic names * Add CloudWatch input and codec to read raw logs * Use custom object mapper as we did in the old code * Add payload examples and ignore unknwon properties * Make kinesis consumer less noisy * Update to the latest AWS SDK and Kinesis library * Use unique application name per Kinesis stream The Kinesis client is using a DynamoDB table under the hood to track shard offsets. When a consumer is using the same application name for two different Kinesis streams, checkpointing will fail. * Periodically checkpoint the Kinesis stream This ensures that we are getting old records after a Graylog server restart instead of starting with LATEST on each restart. * Add second constructor to AWSAuthProvider and simplify conditions * Pass auth provider into InstanceLookupTable to allow different methods Also use AmazonEC2Client builder instead of deprecated constructor. * Inject InstanceLookupTable and bind it as singleton * Switch tests to junit * Use default config in instance name lookup processor - Remove now unused "AWSPluginConfiguration#isComplete()" - Log a warning if the lookup processor is enabled but no regions are configured * Rename inputs to remove "CloudWatch" from the names
1 parent 8d57d23 commit b7e67e1

22 files changed

+647
-281
lines changed

pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
<maven.deploy.skip>true</maven.deploy.skip>
3535
<maven.site.skip>true</maven.site.skip>
3636
<graylog.version>2.3.0</graylog.version>
37-
<aws-java-sdk.version>1.11.98</aws-java-sdk.version>
37+
<aws-java-sdk.version>1.11.174</aws-java-sdk.version>
38+
<aws-kinesis-client.version>1.8.1</aws-kinesis-client.version>
3839
</properties>
3940

4041
<dependencyManagement>
@@ -114,7 +115,7 @@
114115
<dependency>
115116
<groupId>com.amazonaws</groupId>
116117
<artifactId>amazon-kinesis-client</artifactId>
117-
<version>1.7.4</version>
118+
<version>${aws-kinesis-client.version}</version>
118119
<exclusions>
119120
<exclusion>
120121
<groupId>commons-logging</groupId>
@@ -135,10 +136,9 @@
135136
<scope>provided</scope>
136137
</dependency>
137138
<dependency>
138-
<groupId>org.testng</groupId>
139-
<artifactId>testng</artifactId>
140-
<version>6.9.10</version>
141-
<scope>test</scope>
139+
<groupId>org.assertj</groupId>
140+
<artifactId>assertj-core</artifactId>
141+
<version>${assertj-core.version}</version>
142142
</dependency>
143143
</dependencies>
144144

src/main/java/org/graylog/aws/auth/AWSAuthProvider.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,24 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

13+
import javax.annotation.Nullable;
14+
15+
import static com.google.common.base.Strings.isNullOrEmpty;
16+
1317
public class AWSAuthProvider implements AWSCredentialsProvider {
1418
private static final Logger LOG = LoggerFactory.getLogger(AWSAuthProvider.class);
1519

1620
private AWSCredentialsProvider credentials;
1721

18-
public AWSAuthProvider(AWSPluginConfiguration config, String accessKey, String secretKey) {
19-
if (accessKey != null && secretKey != null
20-
&& !accessKey.isEmpty() && !secretKey.isEmpty()) {
22+
public AWSAuthProvider(AWSPluginConfiguration config) {
23+
this(config, null, null);
24+
}
25+
26+
public AWSAuthProvider(AWSPluginConfiguration config, @Nullable String accessKey, @Nullable String secretKey) {
27+
if (!isNullOrEmpty(accessKey) && !isNullOrEmpty(secretKey)) {
2128
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
2229
LOG.debug("Using input specific config");
23-
} else if (config.accessKey() != null && config.secretKey() != null
24-
&& !config.accessKey().isEmpty() && !config.secretKey().isEmpty()) {
30+
} else if (!isNullOrEmpty(config.accessKey()) && !isNullOrEmpty(config.secretKey())) {
2531
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(config.accessKey(), config.secretKey()));
2632
LOG.debug("Using AWS Plugin config");
2733
} else {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.graylog.aws.cloudwatch;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
6+
import java.util.List;
7+
8+
/**
9+
* A collection of CloudWatch log events.
10+
* <p/>
11+
* Example payload:
12+
* <pre>
13+
* {
14+
* "messageType": "DATA_MESSAGE",
15+
* "owner": "123456789",
16+
* "logGroup": "aws-plugin-test-flows",
17+
* "logStream": "eni-aaaaaaaa-all",
18+
* "subscriptionFilters": ["match-all"],
19+
* "logEvents": [
20+
* {
21+
* "id": "33503748002479370955346306650196094071913271643270021120",
22+
* "timestamp": 1502360020000,
23+
* "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
24+
* },
25+
* {
26+
* "id": "33503748002479370955346306650196094071913271643270021127",
27+
* "timestamp": 1502360020000,
28+
* "message": "2 123456789 eni-aaaaaaaa 10.0.34.113 10.42.96.199 53421 17720 6 1 48 1502360020 1502360079 REJECT OK"
29+
* }
30+
* ]
31+
* }
32+
* </pre>
33+
*/
34+
@JsonIgnoreProperties(ignoreUnknown = true)
35+
public class CloudWatchLogData {
36+
@JsonProperty("logEvents")
37+
public List<CloudWatchLogEvent> logEvents;
38+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.graylog.aws.cloudwatch;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import com.google.common.base.MoreObjects;
6+
7+
/**
8+
* A single CloudWatch log event.
9+
* <p/>
10+
* Example payload:
11+
* <pre>
12+
* {
13+
* "id": "33503748002479370955346306650196094071913271643270021120",
14+
* "timestamp": 1502360020000,
15+
* "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
16+
* }
17+
* </pre>
18+
*/
19+
@JsonIgnoreProperties(ignoreUnknown = true)
20+
public class CloudWatchLogEvent {
21+
@JsonProperty("timestamp")
22+
public long timestamp;
23+
24+
@JsonProperty("message")
25+
public String message;
26+
27+
@Override
28+
public String toString() {
29+
return MoreObjects.toStringHelper(this)
30+
.add("timestamp", timestamp)
31+
.add("message", message)
32+
.toString();
33+
}
34+
}

src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java renamed to src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
package org.graylog.aws.inputs.flowlogs;
1+
package org.graylog.aws.cloudwatch;
22

33
import org.joda.time.DateTime;
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7+
import javax.annotation.Nullable;
8+
79
public class FlowLogMessage {
810
private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class);
911

@@ -55,26 +57,31 @@ public FlowLogMessage(DateTime timestamp,
5557
this.logStatus = logStatus;
5658
}
5759

58-
public static FlowLogMessage fromParts(String[] parts) {
59-
if(parts == null || parts.length != 15) {
60-
throw new IllegalArgumentException("Message parts were null or not length of 15");
60+
@Nullable
61+
public static FlowLogMessage fromLogEvent(final CloudWatchLogEvent logEvent) {
62+
final String[] parts = logEvent.message.split(" ");
63+
64+
if (parts.length != 14) {
65+
LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message);
66+
return null;
6167
}
68+
6269
return new FlowLogMessage(
63-
new DateTime(Long.valueOf(parts[0])),
64-
safeInteger(parts[1]),
70+
new DateTime(Long.valueOf(logEvent.timestamp)),
71+
safeInteger(parts[0]),
72+
parts[1],
6573
parts[2],
6674
parts[3],
6775
parts[4],
68-
parts[5],
76+
safeInteger(parts[5]),
6977
safeInteger(parts[6]),
7078
safeInteger(parts[7]),
71-
safeInteger(parts[8]),
79+
safeLong(parts[8]),
7280
safeLong(parts[9]),
73-
safeLong(parts[10]),
81+
new DateTime(Long.valueOf(parts[10])*1000),
7482
new DateTime(Long.valueOf(parts[11])*1000),
75-
new DateTime(Long.valueOf(parts[12])*1000),
76-
parts[13],
77-
parts[14]
83+
parts[12],
84+
parts[13]
7885
);
7986
}
8087

src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,10 @@ public static Builder builder() {
6464
return new AutoValue_AWSPluginConfiguration.Builder();
6565
}
6666

67-
@JsonIgnore
68-
public boolean isComplete() {
69-
return accessKey() != null && secretKey() != null
70-
&& !accessKey().isEmpty() && !secretKey().isEmpty();
71-
}
72-
7367
@JsonIgnore
7468
public List<Regions> getLookupRegions() {
7569
if (lookupRegions() == null || lookupRegions().isEmpty()) {
76-
return Collections.EMPTY_LIST;
70+
return Collections.emptyList();
7771
}
7872

7973
ImmutableList.Builder<Regions> builder = ImmutableList.<Regions>builder();
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.graylog.aws.inputs.cloudwatch;
2+
3+
import com.codahale.metrics.MetricRegistry;
4+
import com.google.inject.assistedinject.Assisted;
5+
import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec;
6+
import org.graylog.aws.inputs.transports.KinesisTransport;
7+
import org.graylog2.plugin.LocalMetricRegistry;
8+
import org.graylog2.plugin.ServerStatus;
9+
import org.graylog2.plugin.configuration.Configuration;
10+
import org.graylog2.plugin.inputs.MessageInput;
11+
import org.graylog2.plugin.inputs.annotations.ConfigClass;
12+
import org.graylog2.plugin.inputs.annotations.FactoryClass;
13+
14+
import javax.inject.Inject;
15+
16+
public class CloudWatchLogsInput extends MessageInput {
17+
private static final String NAME = "AWS Logs";
18+
19+
@Inject
20+
public CloudWatchLogsInput(@Assisted Configuration configuration,
21+
MetricRegistry metricRegistry,
22+
KinesisTransport.Factory transport,
23+
LocalMetricRegistry localRegistry,
24+
CloudWatchRawLogCodec.Factory codec,
25+
Config config,
26+
Descriptor descriptor,
27+
ServerStatus serverStatus) {
28+
super(
29+
metricRegistry,
30+
configuration,
31+
transport.create(configuration),
32+
localRegistry,
33+
codec.create(configuration),
34+
config,
35+
descriptor,
36+
serverStatus
37+
);
38+
}
39+
40+
@FactoryClass
41+
public interface Factory extends MessageInput.Factory<CloudWatchLogsInput> {
42+
@Override
43+
CloudWatchLogsInput create(Configuration configuration);
44+
45+
@Override
46+
Config getConfig();
47+
48+
@Override
49+
Descriptor getDescriptor();
50+
}
51+
52+
public static class Descriptor extends MessageInput.Descriptor {
53+
public Descriptor() {
54+
super(NAME, false, "");
55+
}
56+
}
57+
58+
@ConfigClass
59+
public static class Config extends MessageInput.Config {
60+
@Inject
61+
public Config(KinesisTransport.Factory transport, CloudWatchRawLogCodec.Factory codec) {
62+
super(transport.getConfig(), codec.getConfig());
63+
}
64+
}
65+
}

src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java renamed to src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,48 @@
1-
package org.graylog.aws.inputs.flowlogs;
1+
package org.graylog.aws.inputs.codecs;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
43
import com.google.inject.assistedinject.Assisted;
54
import org.graylog.aws.AWS;
5+
import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
6+
import org.graylog.aws.cloudwatch.FlowLogMessage;
7+
import org.graylog.aws.inputs.flowlogs.IANAProtocolNumbers;
68
import org.graylog2.plugin.Message;
79
import org.graylog2.plugin.configuration.Configuration;
810
import org.graylog2.plugin.configuration.ConfigurationRequest;
911
import org.graylog2.plugin.inputs.annotations.ConfigClass;
1012
import org.graylog2.plugin.inputs.annotations.FactoryClass;
1113
import org.graylog2.plugin.inputs.codecs.Codec;
1214
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
13-
import org.graylog2.plugin.journal.RawMessage;
1415
import org.joda.time.Seconds;
15-
import org.slf4j.Logger;
16-
import org.slf4j.LoggerFactory;
1716

1817
import javax.annotation.Nonnull;
1918
import javax.annotation.Nullable;
2019
import javax.inject.Inject;
2120
import java.util.HashMap;
2221
import java.util.Map;
2322

24-
public class FlowLogCodec implements Codec {
25-
private static final Logger LOG = LoggerFactory.getLogger(FlowLogCodec.class);
23+
public class CloudWatchFlowLogCodec extends CloudWatchLogDataCodec {
2624
public static final String NAME = "AWSFlowLog";
2725

2826
private final Configuration configuration;
29-
private final ObjectMapper objectMapper;
30-
3127
private final IANAProtocolNumbers protocolNumbers;
3228

3329
@Inject
34-
public FlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) {
30+
public CloudWatchFlowLogCodec(@Assisted Configuration configuration) {
3531
this.configuration = configuration;
36-
this.objectMapper = objectMapper;
37-
3832
this.protocolNumbers = new IANAProtocolNumbers();
3933
}
4034

4135
@Nullable
4236
@Override
43-
public Message decode(@Nonnull RawMessage rawMessage) {
37+
public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) {
4438
try {
45-
String rawString = new String(rawMessage.getPayload());
46-
String[] parts = rawString.split(" ");
39+
final FlowLogMessage flowLogMessage = FlowLogMessage.fromLogEvent(logEvent);
4740

48-
if (parts.length != 15) {
49-
LOG.warn("Received FlowLog message with not exactly 15 fields. Skipping. Message was: [{}]", rawString);
41+
if (flowLogMessage == null) {
5042
return null;
5143
}
5244

53-
FlowLogMessage flowLogMessage = FlowLogMessage.fromParts(parts);
54-
55-
Message result = new Message(
45+
final Message result = new Message(
5646
buildSummary(flowLogMessage),
5747
"aws-flowlogs",
5848
flowLogMessage.getTimestamp()
@@ -113,9 +103,9 @@ public String getName() {
113103
}
114104

115105
@FactoryClass
116-
public interface Factory extends Codec.Factory<FlowLogCodec> {
106+
public interface Factory extends Codec.Factory<CloudWatchFlowLogCodec> {
117107
@Override
118-
FlowLogCodec create(Configuration configuration);
108+
CloudWatchFlowLogCodec create(Configuration configuration);
119109

120110
@Override
121111
Config getConfig();

0 commit comments

Comments
 (0)