Skip to content

Commit 7271efb

Browse files
committed
kafka
1 parent ded845a commit 7271efb

File tree

19 files changed

+830
-9
lines changed

19 files changed

+830
-9
lines changed

.github/workflows/test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ jobs:
8585
key: ${{ runner.os }}-pip-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/requirements.txt') }}
8686
restore-keys: |
8787
${{ runner.os }}-pip-${{ secrets.CACHE_VERSION }}-
88+
-
89+
name: Install packages
90+
run: |
91+
sudo apt-get -qq update
92+
sudo apt-get install -y fping python3-pip python3-setuptools snmp sqlite3 librdkafka-dev
8893
-
8994
name: Pip install
9095
run: |

LibreNMS/Data/Store/Datastore.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public static function init($options = [])
4949
'p' => 'prometheus.enable',
5050
'g' => 'graphite.enable',
5151
'2' => 'influxdbv2.enable',
52+
'k' => 'kafka.enable',
5253
];
5354
foreach ($opts as $opt => $setting) {
5455
if (isset($options[$opt])) {

LibreNMS/Data/Store/Kafka.php

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
<?php
2+
3+
namespace LibreNMS\Data\Store;
4+
5+
use App\Facades\DeviceCache;
6+
use App\Polling\Measure\Measurement;
7+
use Carbon\Carbon;
8+
use Illuminate\Support\Facades\Log;
9+
use LibreNMS\Config;
10+
use RdKafka\Conf;
11+
use RdKafka\FFI\Library;
12+
use RdKafka\Message;
13+
use RdKafka\Producer;
14+
15+
class Kafka extends BaseDatastore
16+
{
17+
private $client = null;
18+
private $device_id = 0;
19+
private $kafkaFlushTimeout = 50;
20+
21+
public function __construct(Producer $client)
22+
{
23+
parent::__construct();
24+
25+
$this->client = $client;
26+
27+
// Cache the flush timeout value early to avoid Config during shutdown
28+
if ($this->kafkaFlushTimeout == null) {
29+
$this->kafkaFlushTimeout = Config::get('kafka.flush.timeout', 50);
30+
}
31+
32+
// Register shutdown function
33+
register_shutdown_function(function () {
34+
$this->safeFlush();
35+
});
36+
}
37+
38+
public function __destruct()
39+
{
40+
$this->safeFlush();
41+
// Clear reference
42+
$this->client = null;
43+
}
44+
45+
public static function getClient(): Producer
46+
{
47+
$conf = new Conf();
48+
// Set the log level
49+
$conf->set('log_level', (string) LOG_DEBUG);
50+
// Set the log callback for exceptions
51+
$conf->setDrMsgCb(
52+
function (Producer $producer, Message $message): void {
53+
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
54+
error_log($message->errstr());
55+
}
56+
}
57+
);
58+
// Set the log callback for logs
59+
$conf->setLogCb(
60+
function (Producer $producer, int $level, string $facility, string $message): void {
61+
error_log('KAFKA: ' . $message);
62+
}
63+
);
64+
65+
// Set the kafka broker servers
66+
$conf->set('bootstrap.servers', Config::get('kafka.broker.list', '127.0.2.2:9092'));
67+
// Set the idempotence
68+
$conf->set('enable.idempotence', Config::get('kafka.idempotence', false) ? 'true' : 'false');
69+
// Max queue allowed messages in poller memory
70+
$conf->set('queue.buffering.max.messages', Config::get('kafka.buffer.max.message', 100_000));
71+
// Num of messages each call to kafka
72+
$conf->set('batch.num.messages', Config::get('kafka.batch.max.message', 25));
73+
// Max wait time to acumulate before sending the batch
74+
$conf->set('linger.ms', Config::get('kafka.linger.ms', default: 500));
75+
// Change ACK
76+
$conf->set(
77+
'request.required.acks',
78+
Config::get(
79+
'kafka.request.required.acks',
80+
Config::get('kafka.idempotence', false) ? 'all' : '1'
81+
)
82+
);
83+
84+
// check if debug for ssl was set and enable it
85+
$confKafkaSSLDebug = Config::get('kafka.security.debug', null);
86+
$confKafkaSSLDebug != null || strlen($confKafkaSSLDebug) !== 0 ? $conf->set('debug', $confKafkaSSLDebug) : null;
87+
88+
// config ssl
89+
$isSslEnabled = Config::get('kafka.ssl.enable', false);
90+
if ($isSslEnabled) {
91+
$conf->set('security.protocol', Config::get('kafka.ssl.protocol', 'ssl'));
92+
$conf->set('ssl.endpoint.identification.algorithm', 'none');
93+
94+
// prepare all necessary librenms kafka config with associated rdkafka key
95+
$kafkaSSLConfigs = [
96+
'kafka.ssl.keystore.location' => 'ssl.keystore.location',
97+
'kafka.ssl.keystore.password' => 'ssl.keystore.password',
98+
'kafka.ssl.ca.location' => 'ssl.ca.location',
99+
'kafka.ssl.certificate.location' => 'ssl.certificate.location',
100+
'kafka.ssl.key.location' => 'ssl.key.location',
101+
'kafka.ssl.key.password' => 'ssl.key.password',
102+
];
103+
104+
// fetch kafka config values, if exists, associate its value to rdkafka key
105+
foreach ($kafkaSSLConfigs as $configKey => $kafkaKey) {
106+
$configValue = Config::get($configKey, null);
107+
$configValue != null || strlen($configValue) !== 0 ? $conf->set($kafkaKey, $configValue) : null;
108+
}
109+
}
110+
111+
return new Producer($conf);
112+
}
113+
114+
public function safeFlush()
115+
{
116+
// check if client instance exists
117+
if ($this->client === null) {
118+
return;
119+
}
120+
121+
try {
122+
// get total number of messages in the queue
123+
$outQLen = $this->client->getOutQLen();
124+
125+
if ($outQLen > 0) {
126+
// During shutdown, Log facades might not work properly, use d_echo as fallback
127+
error_log("KAFKA: Flushing {$outQLen} remaining messages");
128+
129+
// Use cached timeout value to avoid Config during shutdown
130+
$result = $this->client->flush($this->kafkaFlushTimeout);
131+
132+
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
133+
$error_msg = sprintf(
134+
'KAFKA: Flush failed. Error: %s, Code: %d, Device ID: %d, Remaining: %d',
135+
Library::rd_kafka_err2str($result),
136+
$result,
137+
$this->device_id,
138+
$this->client->getOutQLen()
139+
);
140+
141+
error_log($error_msg);
142+
}
143+
}
144+
} catch (\Throwable $e) {
145+
$error_msg = 'KAFKA: safeFlush failed with exception. Error: ' . $e->getMessage() . '. Trace: ' . $e->getTraceAsString();
146+
error_log($error_msg);
147+
}
148+
}
149+
150+
public function getName()
151+
{
152+
return 'Kafka';
153+
}
154+
155+
public static function isEnabled()
156+
{
157+
return Config::get('kafka.enable', false);
158+
}
159+
160+
public function getKafkaFlushTimeout()
161+
{
162+
return $this->kafkaFlushTimeout;
163+
}
164+
165+
/**
166+
* Datastore-independent function which should be used for all polled metrics.
167+
*
168+
* RRD Tags:
169+
* rrd_def RrdDefinition
170+
* rrd_name array|string: the rrd filename, will be processed with rrd_name()
171+
* rrd_oldname array|string: old rrd filename to rename, will be processed with rrd_name()
172+
* rrd_step int: rrd step, defaults to 300
173+
*
174+
* @param array $device
175+
* @param string $measurement Name of this measurement
176+
* @param array $tags tags for the data (or to control rrdtool)
177+
* @param array|mixed $fields The data to update in an associative array, the order must be consistent with rrd_def,
178+
* single values are allowed and will be paired with $measurement
179+
*/
180+
public function put($device, $measurement, $tags, $fields)
181+
{
182+
try {
183+
// get the singleton instance of the produced
184+
/** @var Producer $producer */
185+
$producer = $this->client;
186+
$this->device_id = $device['device_id'];
187+
$topic = $producer->newTopic(Kafka::getTopicName());
188+
189+
$device_data = DeviceCache::get($device['device_id']);
190+
$excluded_groups = Config::get('kafka.groups-exclude'); // comman separated string
191+
$excluded_measurement = Config::get('kafka.measurement-exclude'); // comman separated string
192+
$excluded_device_fields = Config::get('kafka.device-fields-exclude'); // comman separated string
193+
$excluded_device_fields_arr = [];
194+
195+
if ($excluded_groups != null && strlen($excluded_groups) > 0) {
196+
// convert into array
197+
$excluded_groups_arr = explode(',', strtoupper($excluded_groups));
198+
199+
$device_groups = $device_data->groups;
200+
foreach ($device_groups as $group) {
201+
// The group name will always be parsed as lowercase, even when uppercase in the GUI.
202+
if (in_array(strtoupper($group->name), $excluded_groups_arr)) {
203+
Log::debug('KAFKA: Skipped parsing to Kafka, device is in group: ' . $group->name);
204+
205+
return;
206+
}
207+
}
208+
}
209+
210+
if ($excluded_measurement != null && strlen($excluded_measurement) > 0) {
211+
// convert into array
212+
$excluded_measurement_arr = explode(',', $excluded_measurement);
213+
214+
if (in_array($measurement, $excluded_measurement_arr)) {
215+
Log::debug('KAFKA: Skipped parsing to Kafka, measurement is in measurement-excluded: ' . $measurement);
216+
217+
return;
218+
}
219+
}
220+
221+
if ($excluded_device_fields != null && strlen($excluded_device_fields) > 0) {
222+
// convert into array
223+
$excluded_device_fields_arr = explode(',', $excluded_device_fields);
224+
}
225+
226+
// start
227+
$stat = Measurement::start('write');
228+
229+
$tmp_fields = [];
230+
$tmp_tags = [];
231+
$tmp_tags['device_groups'] = implode('|', $device_data->groups->pluck('name')->toArray());
232+
233+
foreach ($tags as $k => $v) {
234+
if (empty($v)) {
235+
$v = '_blank_';
236+
}
237+
$tmp_tags[$k] = $v;
238+
}
239+
foreach ($fields as $k => $v) {
240+
if ($k == 'time') {
241+
$k = 'rtime';
242+
}
243+
244+
if (($value = $this->forceType($v)) !== null) {
245+
$tmp_fields[$k] = $value;
246+
}
247+
}
248+
249+
if (empty($tmp_fields)) {
250+
Log::warning('KAFKA: All fields empty, skipping update', [
251+
'orig_fields' => $fields,
252+
'device_id' => $this->device_id,
253+
]);
254+
255+
return;
256+
}
257+
258+
// create and organize data
259+
$filteredDeviceData = array_diff_key($device, array_flip($excluded_device_fields_arr));
260+
// add current time to the data
261+
$filteredDeviceData['current_polled_time'] = Carbon::now();
262+
263+
$resultArr = [
264+
'measurement' => $measurement,
265+
'device' => $filteredDeviceData,
266+
'fields' => $tmp_fields,
267+
'tags' => $tmp_tags,
268+
];
269+
270+
if (Config::get('kafka.debug') === true) {
271+
Log::debug('Kafka data: ', [
272+
'device_id' => $this->device_id,
273+
'measurement' => $measurement,
274+
'fields' => $tmp_fields,
275+
]);
276+
}
277+
278+
$dataArr = json_encode($resultArr);
279+
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $dataArr);
280+
$producer->poll(0);
281+
282+
// end
283+
$this->recordStatistic($stat->end());
284+
} catch (\Throwable $e) {
285+
Log::error('KAFKA: Put failed with exception', [
286+
'device_id' => $this->device_id,
287+
'error' => $e->getMessage(),
288+
'trace' => $e->getTraceAsString(),
289+
]);
290+
}
291+
}
292+
293+
private function forceType($data)
294+
{
295+
/*
296+
* It is not trivial to detect if something is a float or an integer, and
297+
* therefore may cause breakages on inserts.
298+
* Just setting every number to a float gets around this, but may introduce
299+
* inefficiencies.
300+
*/
301+
302+
if (is_numeric($data)) {
303+
return floatval($data);
304+
}
305+
306+
return $data === 'U' ? null : $data;
307+
}
308+
309+
public static function getTopicName()
310+
{
311+
return Config::get('kafka.topic', 'librenms');
312+
}
313+
314+
/**
315+
* Checks if the datastore wants rrdtags to be sent when issuing put()
316+
*
317+
* @return bool
318+
*/
319+
public function wantsRrdTags()
320+
{
321+
return false;
322+
}
323+
}

LibreNMS/Util/ModuleTestHelper.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public function __construct($modules, $os, $variant = '')
8787
Config::set('hide_rrd_disabled', true);
8888
Config::set('influxdb.enable', false);
8989
Config::set('influxdbv2.enable', false);
90+
Config::set('kafka.enable', false);
9091
Config::set('graphite.enable', false);
9192
Config::set('prometheus.enable', false);
9293
}

app/Console/Commands/DevicePoll.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public function handle(MeasurementManager $measurements): int
5151
Config::set('influxdbv2.enable', false);
5252
Config::set('prometheus.enable', false);
5353
Config::set('graphite.enable', false);
54+
Config::set('kafka.enable', false);
5455
}
5556

5657
try {

app/Providers/DatastoreServiceProvider.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class DatastoreServiceProvider extends ServiceProvider
4141
'LibreNMS\Data\Store\OpenTSDB',
4242
'LibreNMS\Data\Store\Prometheus',
4343
'LibreNMS\Data\Store\Rrd',
44+
'LibreNMS\Data\Store\Kafka',
4445
];
4546

4647
public function register(): void
@@ -65,6 +66,7 @@ public function register(): void
6566

6667
// additional bindings
6768
$this->registerInflux();
69+
$this->registerKafka();
6870
}
6971

7072
public function registerInflux()
@@ -73,4 +75,11 @@ public function registerInflux()
7375
return \LibreNMS\Data\Store\InfluxDB::createFromConfig();
7476
});
7577
}
78+
79+
public function registerKafka()
80+
{
81+
$this->app->singleton('RdKafka\Producer', function ($app) {
82+
return \LibreNMS\Data\Store\Kafka::getClient();
83+
});
84+
}
7685
}

0 commit comments

Comments
 (0)