Skip to content

Commit 3bd7c98

Browse files
committed
CSV log processing support with fluentbit integration
1 parent 3ed16b3 commit 3bd7c98

File tree

5 files changed

+142
-2
lines changed

5 files changed

+142
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
/data/log_ingest_data/evtx/*
1515
/data/log_ingest_data/auditd/*
1616
/data/log_ingest_data/json/*
17+
/data/log_ingest_data/csv/*
1718
/data/mysql_data/*
1819
/data/yara_triage_data/*
1920
/docs/graphs/*.bkp

config/docker-config/Dockerfile.fluentbit

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ FROM ubuntu:24.04
33
RUN mkdir -p fluent-bit /fluent-bit/etc /fluent-bit/scripts /fluent-bit/logs /fluent-bit/database
44
WORKDIR /fluent-bit
55
COPY config/docker-config/fluentbit-evtx-dump.sh /fluent-bit/scripts/fluentbit-evtx-dump.sh
6-
RUN chmod +x /fluent-bit/scripts/fluentbit-evtx-dump.sh
6+
COPY config/docker-config/fluentbit-csv-dump.sh /fluent-bit/scripts/fluentbit-csv-dump.sh
7+
RUN chmod +x /fluent-bit/scripts/fluentbit-evtx-dump.sh && \
8+
chmod +x /fluent-bit/scripts/fluentbit-csv-dump.sh
79

810
RUN apt update && \
9-
apt install -y curl ca-certificates dpkg gnupg dpkg-dev && \
11+
apt install -y curl ca-certificates dpkg gnupg dpkg-dev python3 && \
1012
curl -L -o /tmp/fluentbit.key https://packages.fluentbit.io/fluentbit.key && \
1113
gpg --dearmor < /tmp/fluentbit.key > /usr/share/keyrings/fluentbit-keyring.gpg && \
1214
rm /tmp/fluentbit.key && \
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#!/bin/bash
2+
3+
CSV_DIR="/fluent-bit/logs/csv"
4+
JSON_DIR="/fluent-bit/logs/csv"
5+
PROCESSED_DIR="/fluent-bit/logs/csv/processed"
6+
7+
mkdir -p "$PROCESSED_DIR" "$JSON_DIR"
8+
find "$CSV_DIR" -maxdepth 1 -name "*.csv" -print0 | while IFS= read -r -d $'\0' csv_file; do
9+
filename=$(basename "$csv_file" .csv)
10+
jsonl_file="${JSON_DIR}/${filename}.jsonl"
11+
echo "Converting $csv_file to $jsonl_file..."
12+
13+
if [ ! -s "$csv_file" ]; then
14+
echo "Warning: $csv_file is empty, skipping..."
15+
continue
16+
fi
17+
18+
first_line=$(head -1 "$csv_file")
19+
if echo "$first_line" | grep -q ","; then
20+
sep=","
21+
elif echo "$first_line" | grep -q ";"; then
22+
sep=";"
23+
elif echo "$first_line" | grep -q $'\t'; then
24+
sep=$'\t'
25+
elif echo "$first_line" | grep -q "|"; then
26+
sep="|"
27+
else
28+
sep=","
29+
fi
30+
31+
echo "Using separator: '$sep'"
32+
33+
if echo "$first_line" | grep -q "[a-zA-Z]"; then
34+
echo "CSV file has headers, using column names"
35+
python3 -c "
36+
import csv
37+
import json
38+
import sys
39+
40+
try:
41+
with open('$csv_file', 'r', encoding='utf-8') as f:
42+
sample = f.read(1024)
43+
f.seek(0)
44+
sniffer = csv.Sniffer()
45+
dialect = sniffer.sniff(sample, delimiters=',$sep')
46+
47+
reader = csv.DictReader(f, dialect=dialect)
48+
with open('$jsonl_file', 'w', encoding='utf-8') as out_f:
49+
for row in reader:
50+
json.dump(row, out_f, ensure_ascii=False)
51+
out_f.write('\n')
52+
print('Conversion successful with Python CSV parser')
53+
except Exception as e:
54+
print(f'Python conversion failed: {e}', file=sys.stderr)
55+
sys.exit(1)
56+
"
57+
else
58+
echo "CSV file without headers, using column_0, column_1, etc."
59+
python3 -c "
60+
import csv
61+
import json
62+
import sys
63+
64+
try:
65+
with open('$csv_file', 'r', encoding='utf-8') as f:
66+
sample = f.read(1024)
67+
f.seek(0)
68+
sniffer = csv.Sniffer()
69+
dialect = sniffer.sniff(sample, delimiters=',$sep')
70+
71+
reader = csv.reader(f, dialect=dialect)
72+
with open('$jsonl_file', 'w', encoding='utf-8') as out_f:
73+
for row in reader:
74+
row_dict = {f'column_{i}': value for i, value in enumerate(row)}
75+
json.dump(row_dict, out_f, ensure_ascii=False)
76+
out_f.write('\n')
77+
print('Conversion successful with Python CSV parser (no headers)')
78+
except Exception as e:
79+
print(f'Python conversion failed: {e}', file=sys.stderr)
80+
sys.exit(1)
81+
"
82+
fi
83+
84+
if [ $? -eq 0 ]; then
85+
echo "Conversion successful. Moving original..."
86+
mv "$csv_file" "$PROCESSED_DIR/"
87+
else
88+
echo "Error occurred during csv to json conversion for $csv_file." >&2
89+
fi
90+
done

config/fluentbit_server/fluent-bit.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
@include /fluent-bit/etc/logs-evtx.conf
1414
@include /fluent-bit/etc/logs-auditd.conf
1515
@include /fluent-bit/etc/logs-json.conf
16+
@include /fluent-bit/etc/logs-csv.conf
1617
@include /fluent-bit/etc/logs-http.conf
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
[INPUT]
2+
Name exec
3+
Tag csv.converter.run
4+
Interval_Sec 30
5+
Interval_NSec 0
6+
Buf_Size 128mb
7+
Command /fluent-bit/scripts/fluentbit-csv-dump.sh
8+
Oneshot false
9+
Threaded true
10+
11+
[INPUT]
12+
Name tail
13+
Path /fluent-bit/logs/csv/*.jsonl
14+
Tag csv.logs
15+
DB /fluent-bit/database/csv_jsonl_db.db
16+
Parser json_parser
17+
Read_from_Head true
18+
Buffer_Chunk_Size 1M
19+
Buffer_Max_Size 5M
20+
Path_Key source_file
21+
22+
23+
[FILTER]
24+
Name lua
25+
Match csv.logs
26+
Script /fluent-bit/etc/add_timestamp.lua
27+
Call add_timestamp
28+
29+
[OUTPUT]
30+
Name es
31+
Match csv.logs
32+
Host sentinel-kit-db-elasticsearch-es01
33+
Port 9200
34+
Buffer_Size 5M
35+
Logstash_Format On
36+
Logstash_Prefix ingest-csv
37+
Logstash_DateFormat %Y.%m.%d
38+
Type _doc
39+
Time_Key @timestamp
40+
Replace_Dots On
41+
Suppress_Type_Name On
42+
Retry_Limit False
43+
TLS On
44+
TLS.Verify Off
45+
HTTP_User elastic
46+
HTTP_Passwd ${ELASTIC_PASSWORD}

0 commit comments

Comments
 (0)