Skip to content

Commit 052939a

Browse files
nyk0322nyk0322
andauthored
added new config to allow kafka record timestamp to set as published_… (#47)
* added new config to allow kafka record timestamp to set as published_date event attribute --------- Co-authored-by: nyk0322 <[email protected]>
1 parent 8ad004d commit 052939a

File tree

4 files changed

+47
-13
lines changed

4 files changed

+47
-13
lines changed

README.md

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,18 @@ A REST call can be executed against one of the cluster instances, and the config
9595
| `topics` | Comma separated list of Kafka topics for Datadog to consume. `prod-topic1,prod-topic2,prod-topic3`||
9696
| `datadog.api_key` | The API key of your Datadog platform.||
9797
#### General Optional Parameters
98-
| Name | Description | Default Value |
99-
|-------- |----------------------------|-----------------------|
100-
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
98+
| Name | Description | Default Value |
99+
|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------|
100+
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
101101
| `datadog.url` | Custom Datadog URL endpoint where your logs will be sent. `datadog.url` takes precedence over `datadog.site`. Example: `http-intake.logs.datadoghq.com:443` ||
102-
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format.||
103-
| `datadog.service` | The name of the application or service generating the log events.||
104-
| `datadog.hostname` | The name of the originating host of the log.||
105-
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog.||
106-
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog.||
107-
| `datadog.retry.max` | The number of retries before the output plugin stops.| `5` ||
108-
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made.| `3000` ||
102+
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. ||
103+
| `datadog.service` | The name of the application or service generating the log events. ||
104+
| `datadog.hostname` | The name of the originating host of the log. ||
105+
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. ||
106+
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. ||
107+
| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` ||
108+
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` ||
109+
| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` ||
109110

110111
### Troubleshooting performance
111112

src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private JsonArray formatBatch(String topic) {
9797
}
9898

9999
JsonElement recordJSON = recordToJSON(record);
100-
JsonObject message = populateMetadata(topic, recordJSON);
100+
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp());
101101
batchRecords.add(message);
102102
}
103103

@@ -110,11 +110,14 @@ private JsonElement recordToJSON(SinkRecord record) {
110110
return new Gson().fromJson(jsonPayload, JsonElement.class);
111111
}
112112

113-
private JsonObject populateMetadata(String topic, JsonElement message) {
113+
private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) {
114114
JsonObject content = new JsonObject();
115115
String tags = "topic:" + topic;
116116
content.add("message", message);
117117
content.add("ddsource", new JsonPrimitive(config.ddSource));
118+
if (config.addPublishedDate && timestamp != null) {
119+
content.add("published_date", new JsonPrimitive(timestamp));
120+
}
118121

119122
if (config.ddTags != null) {
120123
tags += "," + config.ddTags;

src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
3333
private static final String DD_URL_FORMAT_FROM_SITE = "http-intake.logs.%s:443";
3434
private static final String DEFAULT_DD_SITE = "datadoghq.com";
3535
public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE);
36+
public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date";
3637

3738
// Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs
3839
public final Integer ddMaxBatchLength;
@@ -51,6 +52,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
5152
public final Integer proxyPort;
5253
public final Integer retryMax;
5354
public final Integer retryBackoffMs;
55+
public final boolean addPublishedDate;
5456

5557
public static final ConfigDef CONFIG_DEF = baseConfigDef();
5658

@@ -72,6 +74,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength,
7274
this.ddUrl = getString(DD_URL);
7375
this.ddSite = getString(DD_SITE);
7476
this.ddMaxBatchLength = ddMaxBatchLength;
77+
this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE);
7578
validateConfig();
7679
}
7780

@@ -166,7 +169,13 @@ private static void addMetadataConfigs(ConfigDef configDef) {
166169
++orderInGroup,
167170
Width.LONG,
168171
"Datadog logs site"
169-
);
172+
).define(
173+
ADD_PUBLISHED_DATE,
174+
Type.BOOLEAN,
175+
false,
176+
null,
177+
Importance.MEDIUM,
178+
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`");
170179
}
171180

172181
private static void addProxyConfigs(ConfigDef configDef) {

src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
99
import com.datadoghq.connect.logs.sink.util.RestHelper;
1010
import com.datadoghq.connect.logs.util.Project;
1111

12+
import org.apache.kafka.common.record.TimestampType;
1213
import org.apache.kafka.connect.sink.SinkRecord;
1314
import org.apache.kafka.connect.data.Decimal;
1415
import org.apache.kafka.connect.data.Schema;
@@ -173,4 +174,24 @@ public void metadata_asOneBatch_shouldPopulatePerBatch() throws IOException {
173174

174175
Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"}]", request.getBody());
175176
}
177+
178+
@Test
179+
public void writer_withUseRecordTimeStampEnabled_shouldPopulateRecordTimestamp() throws IOException {
180+
props.put(DatadogLogsSinkConnectorConfig.ADD_PUBLISHED_DATE, "true");
181+
DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props);
182+
DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config);
183+
184+
185+
long recordTime = 1713974401224L;
186+
187+
records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0, recordTime, TimestampType.CREATE_TIME));
188+
records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue2", 0, recordTime, TimestampType.CREATE_TIME));
189+
writer.write(records);
190+
191+
Assert.assertEquals(1, restHelper.getCapturedRequests().size());
192+
193+
RequestInfo request = restHelper.getCapturedRequests().get(0);
194+
System.out.println(request.getBody());
195+
Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"}]", request.getBody());
196+
}
176197
}

0 commit comments

Comments
 (0)