Skip to content

Commit 2d01c6f

Browse files
author
Donald Tregonning
authored
Merge pull request splunk#125 from splunk/develop
Merge latest changes to master and tag for our release.
2 parents 0bfbba6 + b9bc8f3 commit 2d01c6f

File tree

8 files changed

+70
-25
lines changed

8 files changed

+70
-25
lines changed

README.md

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ A Kafka Connect Sink for Splunk features:
88
## Requirements
99
1. Kafka version 0.10 and above.
1010
2. Java 8 and above.
11-
3. A Splunk environment of version 6.4* and above, configured with valid HTTP Event Collector (HEC) tokens. (A minimum Splunk version of 6.5 is required for event annotation)
11+
3. A Splunk environment of version 6.5 and above, configured with valid HTTP Event Collector (HEC) tokens.
1212

1313
* HEC token settings should be the same on all Splunk Indexers and Heavy Forwarders in your environment.
1414
* Task configuration parameters will vary depending on acknowledgement setting (See the [Configuration](#configuration) section for details).
@@ -25,7 +25,7 @@ Note: The resulting "kafka-connect-splunk-*.tar.gz" package is self-contained. B
2525

2626
## Quick Start
2727

28-
1. [Start](https://kafka.apache.org/quickstart) your Kafka Cluster and Zookeeper on your local host. Confirm both are running.
28+
1. [Start](https://kafka.apache.org/quickstart) your Kafka Cluster and confirm it is running.
2929
2. If this is a new install, create a test topic (eg: `perf`). Inject events into the topic. This can be done using [Kafka data-gen-app](https://github.com/dtregonning/kafka-data-gen) or the Kafka bundle [kafka-console-producer](https://kafka.apache.org/quickstart#quickstart_send).
3030
3. Untar the package created from the build script: `tar xzvf kafka-connect-splunk-*.tar.gz` (Default target location is /tmp/kafka-connect-splunk-build/kafka-connect-splunk).
3131
4. Navigate to kafka-connect-splunk directory `cd kafka-connect-splunk`.
@@ -57,7 +57,7 @@ Note: The resulting "kafka-connect-splunk-*.tar.gz" package is self-contained. B
5757
"splunk.hec.total.channels": "8",
5858
"splunk.hec.max.batch.size": "1000000",
5959
"splunk.hec.threads": "2",
60-
"splunk.hec.event.timeout": "60",
60+
"splunk.hec.event.timeout": "300",
6161
"splunk.hec.socket.timeout": "120",
6262
"splunk.hec.track.data": "true"
6363
}
@@ -95,7 +95,7 @@ Use the following connector deployment options:
9595
* Splunk Kafka Connector in a dedicated Kafka Connect Cluster (recommended)
9696
* Splunk Kafka Connector in an existing Kafka Connect Cluster
9797
98-
### Connector in a dedicated Kafka Connect Cluster
98+
### Connector in a dedicated Kafka Connect Cluster
9999
Running the Splunk Kafka Connector in a dedicated Kafka Connect Cluster is recommended. Isolating the Splunk connector from other Kafka connectors results in significant performance benefits in high throughput environments.
100100
101101
1. Untar the **kafka-connect-splunk-*.tar.gz** package and navigate to the **kafka-connect-splunk** directory.
@@ -143,16 +143,46 @@ Running the Splunk Kafka Connector in a dedicated Kafka Connect Cluster is recom
143143
> Note: The **KAFKA\_HEAP\_OPTS** environment variable controls how much memory Kafka Connect can use. Set the **KAFKA\_HEAP\_OPTS** with the recommended value stated in the example above.
144144
145145
### Connector in an existing Kafka Connect Cluster
146-
1. Untar the **kafka-connect-splunk-*.tar.gz** installation package and go to the **kafka-connect-splunk** directory.
147146
148-
```
149-
tar xzvf kafka-connect-splunk-*.tar.gz
150-
cd kafka-connect-splunk
151-
```
147+
1. Navigate to Splunkbase and download the latest version of [Splunk Kafka Connect](https://splunkbase.splunk.com/app/3862/)
148+
149+
2. Copy downloaded file onto every host into the directory that contains your other connectors or create a folder to store them in. (ex. `/opt/connectors/splunk-kafka-connect`)
150+
151+
3. Create a properties file called `kafka-connect.properties.` File should be created in directory `$KAFKA_CONNECT_HOME/config/`.
152+
Copy the following contents into the file and modify the <BOOTSTRAP_SERVERS> to point to one of your kafka brokers (ex. `localhost:9092`): and
153+
modify <PLUGIN_PATH> to point to the top level directory of where you are storing your connectors. (ex. `/opt/connectors`)
154+
155+
> Note: - If running Kafka Version 0.10.x - PLUGIN_PATH is not a valid configuration property. To make the connector visible to
156+
Kafka Connect the connectors folder must be added to the classpath. (ex. export `CLASSPATH=/opt/connectors/*`)
152157
153-
2. Copy the **conectors/kafka-connect-splunk-*.jar** to the plugin path specified by **plugin.path** in the existing Kafka Connect on every host.
154-
3. Copy **libs/commons-logging-1.2.jar** to **libs** of the existing Kafka Connect on each host.
155-
4. Restart the Kafka Connect cluster.
158+
```
159+
bootstrap.servers=<BOOTSTRAP_SERVERS>
160+
#key.converter=org.apache.kafka.connect.json.JsonConverter
161+
#value.converter=org.apache.kafka.connect.json.JsonConverter
162+
key.converter=org.apache.kafka.connect.storage.StringConverter
163+
value.converter=org.apache.kafka.connect.storage.StringConverter
164+
key.converter.schemas.enable=false
165+
value.converter.schemas.enable=false
166+
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
167+
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
168+
internal.key.converter.schemas.enable=false
169+
internal.value.converter.schemas.enable=false
170+
offset.flush.interval.ms=10000
171+
plugin.path=<PLUGIN_PATH>
172+
group.id=kafka-connect-splunk-hec-sink
173+
config.storage.topic=__kafka-connect-splunk-task-configs
174+
config.storage.replication.factor=3
175+
offset.storage.topic=__kafka-connect-splunk-offsets
176+
offset.storage.replication.factor=3
177+
offset.storage.partitions=25
178+
status.storage.topic=__kafka-connect-splunk-statuses
179+
status.storage.replication.factor=3
180+
status.storage.partitions=5
181+
```
182+
183+
> Note - For more information on the worker paramaters please refer to Kafka Connect [documentation](https://kafka.apache.org/documentation/#connect_running).
184+
185+
4. Run `$KAFKA_CONNECT_HOME/bin/connect-distributed.sh $KAFKA_CONNECT_HOME/config/kafka-connect.properties` to start Kafka Connect or restart Kafka Connect with existing configuration file.
156186
157187
## Security
158188
The Kafka Connect Splunk Sink supports the following security mechanisms
@@ -399,7 +429,7 @@ Use the below schema to configure Splunk Kafka Connector
399429
* `splunk.hec.ack.poll.interval` - This setting is only applicable when `splunk.hec.ack.enabled` is set to `true`. Internally it controls the event ACKs polling interval. By default, this setting is 10 seconds.
400430
* `splunk.hec.ack.poll.threads` - This setting is used for performance tuning and is only applicable when `splunk.hec.ack.enabled` is set to `true`. It controls how many threads should be spawned to poll event ACKs. By default, it is set to `1`.
401431
> Note: For large Splunk indexer clusters (For example, 100 indexers) you need to increase this number. Recommended increase to speed up ACK polling is 4 threads.
402-
* `splunk.hec.event.timeout` - This setting is applicable when `splunk.hec.ack.enabled` is set to `true`. When events are POSTed to Splunk and before they are ACKed, this setting determines how long the connector will wait before timing out and resending. By default, it is set to 120 seconds.
432+
* `splunk.hec.event.timeout` - This setting is applicable when `splunk.hec.ack.enabled` is set to `true`. When events are POSTed to Splunk and before they are ACKed, this setting determines how long the connector will wait before timing out and resending. By default, it is set to 300 seconds.
403433
404434
#### Endpoint Parameters
405435
* `splunk.hec.raw` - Set to `true` in order for Splunk software to ingest data using the the /raw HEC endpoint. Default is `false`, which will use the /event endpoint.
@@ -433,7 +463,7 @@ Use the below schema to configure Splunk Kafka Connector
433463
"splunk.hec.ack.enabled : "true",
434464
"splunk.hec.ack.poll.interval" : "20",
435465
"splunk.hec.ack.poll.threads" : "2",
436-
"splunk.hec.event.timeout" : "120",
466+
"splunk.hec.event.timeout" : "300",
437467
"splunk.hec.raw" : "true",
438468
"splunk.hec.raw.line.breaker" : "#####"
439469
}
@@ -454,7 +484,7 @@ Use the below schema to configure Splunk Kafka Connector
454484
"splunk.hec.ack.enabled : "true",
455485
"splunk.hec.ack.poll.interval" : "20",
456486
"splunk.hec.ack.poll.threads" : "2",
457-
"splunk.hec.event.timeout" : "120",
487+
"splunk.hec.event.timeout" : "300",
458488
"splunk.hec.raw" : "false",
459489
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
460490
"splunk.hec.track.data" : "true"

build.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ cp kafka_2.11-${kafkaversion}/bin/kafka-run-class.sh ${builddir}/bin
5757
cp kafka_2.11-${kafkaversion}/config/connect-log4j.properties ${builddir}/config
5858
cp kafka_2.11-${kafkaversion}/libs/*.jar ${builddir}/libs
5959

60-
# Download commons-logging jar
61-
echo "Downloading commons-logging jar"
62-
wget -q http://central.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar -P ${builddir}/libs/
63-
6460
# Clean up
6561
echo "Clean up ..."
6662
/bin/rm -rf kafka_2.11-${kafkaversion}

dependency-reduced-pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
<artifactSet>
2121
<excludes>
2222
<exclude>org.apache.kafka:kafka-clients</exclude>
23-
<exclude>commons-logging:commons-logging</exclude>
2423
</excludes>
2524
</artifactSet>
2625
</configuration>

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
<artifactSet>
133133
<excludes>
134134
<exclude>org.apache.kafka:kafka-clients</exclude>
135-
<exclude>commons-logging:commons-logging</exclude>
136135
</excludes>
137136
</artifactSet>
138137
</configuration>

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public void start() {
6868
scheduler.setRemoveOnCancelPolicy(true);
6969

7070
Runnable poller = () -> {
71-
poll();
71+
try {
72+
poll();
73+
} catch (HecException e) {
74+
log.error("failed to poll", e);
75+
}
7276
};
7377
scheduler.scheduleWithFixedDelay(poller, ackPollInterval, ackPollInterval, TimeUnit.SECONDS);
7478

@@ -239,14 +243,15 @@ private void findAndRemoveTimedoutBatches(Map<Long, EventBatch> batches, List<Ev
239243
while (iterator.hasNext()) {
240244
EventBatch batch = iterator.next().getValue();
241245
if (batch.isTimedout(eventBatchTimeout)) {
246+
batch.fail();
242247
timeouts.add(batch);
243248
iterator.remove();
244249
}
245250
}
246251
}
247252

248253
private void handleAckPollResponse(String resp, HecChannel channel) {
249-
log.debug("ackPollResponse={}", resp);
254+
log.debug("ackPollResponse={}, channel={}", resp, channel);
250255
HecAckPollResponse ackPollResult;
251256
try {
252257
ackPollResult = jsonMapper.readValue(resp, HecAckPollResponse.class);

src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
import java.util.*;
2525
import java.util.concurrent.ConcurrentLinkedQueue;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
final class KafkaRecordTracker {
31+
private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class);
2832
private Map<TopicPartition, TreeMap<Long, EventBatch>> all; // TopicPartition + Long offset represents the SinkRecord
2933
private long total;
3034
private ConcurrentLinkedQueue<EventBatch> failed;
@@ -40,6 +44,7 @@ public void addFailedEventBatch(final EventBatch batch) {
4044
throw new RuntimeException("event batch was not failed");
4145
}
4246
failed.add(batch);
47+
log.info("total failed batches {}", failed.size());
4348
}
4449

4550
public void addEventBatch(final EventBatch batch) {

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public static ConfigDef conf() {
156156
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
157157
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
158158
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
159-
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
159+
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
160160
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
161161
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
162162
.define(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, MAX_HTTP_CONNECTION_PER_CHANNEL_DOC)

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback {
3737
private SplunkSinkConnectorConfig connectorConfig;
3838
private List<SinkRecord> bufferedRecords;
3939
private long lastFlushed = System.currentTimeMillis();
40+
private long threadId = Thread.currentThread().getId();
4041

4142
@Override
4243
public void start(Map<String, String> taskConfig) {
@@ -52,7 +53,8 @@ public void start(Map<String, String> taskConfig) {
5253

5354
@Override
5455
public void put(Collection<SinkRecord> records) {
55-
log.debug("received {} records with total {} outstanding events tracked", records.size(), tracker.totalEvents());
56+
long startTime = System.currentTimeMillis();
57+
log.debug("tid={} received {} records with total {} outstanding events tracked", threadId, records.size(), tracker.totalEvents());
5658

5759
handleFailedBatches();
5860

@@ -61,12 +63,14 @@ public void put(Collection<SinkRecord> records) {
6163
bufferedRecords.addAll(records);
6264
if (bufferedRecords.size() < connectorConfig.maxBatchSize) {
6365
if (System.currentTimeMillis() - lastFlushed < flushWindow) {
66+
logDuration(startTime);
6467
// still in flush window, buffer the records and return
6568
return;
6669
}
6770

6871
if (bufferedRecords.isEmpty()) {
6972
lastFlushed = System.currentTimeMillis();
73+
logDuration(startTime);
7074
return;
7175
}
7276
}
@@ -83,6 +87,12 @@ public void put(Collection<SinkRecord> records) {
8387
/* /event endpoint */
8488
handleEvent(records);
8589
}
90+
logDuration(startTime);
91+
}
92+
93+
private void logDuration(long startTime) {
94+
long endTime = System.currentTimeMillis();
95+
log.debug("tid={} cost={} ms", threadId, endTime - startTime);
8696
}
8797

8898
// for testing hook
@@ -230,6 +240,7 @@ public void onEventCommitted(final List<EventBatch> batches) {
230240
}
231241

232242
public void onEventFailure(final List<EventBatch> batches, Exception ex) {
243+
log.info("add {} failed batches", batches.size());
233244
for (EventBatch batch: batches) {
234245
tracker.addFailedEventBatch(batch);
235246
}

0 commit comments

Comments
 (0)