Skip to content

Commit c468d77

Browse files
authored
Restrict ClickHouse pipeline to log events that are within a 72 window (#2986)
* restrict clickhouse pipeline to log events that are within a 72 window * add logging to indicate number of events dropped due to old timestamps
1 parent 4b7bb09 commit c468d77

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

lib/logflare/backends/adaptor/clickhouse_adaptor/pipeline.ex

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Pipeline do
2424
@batch_size 1_500
2525
@max_retries 1
2626

27+
# 72 hour max event age, based on timestamp
28+
@max_event_age_us 72 * 3_600 * 1_000_000
29+
2730
@doc false
2831
def max_retries, do: @max_retries
2932

@@ -91,6 +94,8 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Pipeline do
9194
}
9295
)
9396

97+
message_count = length(messages)
98+
9499
result =
95100
OpenTelemetry.Tracer.with_span :clickhouse_pipeline, %{
96101
attributes: %{
@@ -103,7 +108,22 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.Pipeline do
103108
} do
104109
source = Sources.Cache.get_by_id(source_id)
105110
backend = Backends.Cache.get_backend(backend_id)
106-
events = for %{data: le} <- messages, do: le
111+
cutoff_us = System.system_time(:microsecond) - @max_event_age_us
112+
113+
events =
114+
for %{data: le} <- messages,
115+
le.body["timestamp"] >= cutoff_us,
116+
do: le
117+
118+
event_count = length(events)
119+
120+
if event_count < message_count do
121+
Logger.warning(
122+
"Dropping #{message_count - event_count} of #{message_count} ClickHouse event(s) older than 72 hours",
123+
source_token: source_token,
124+
backend_id: backend_id
125+
)
126+
end
107127

108128
ClickHouseAdaptor.insert_log_events({source, backend}, events)
109129
end

test/logflare/backends/adaptor/clickhouse_adaptor/pipeline_test.exs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,45 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor.PipelineTest do
115115
assert result == []
116116
end
117117

118+
test "filters out events older than 72 hours", %{
119+
context: context,
120+
source: source,
121+
backend: backend
122+
} do
123+
old_timestamp = System.system_time(:microsecond) - 73 * 3_600 * 1_000_000
124+
recent_timestamp = System.system_time(:microsecond) - 1 * 3_600 * 1_000_000
125+
126+
old_event =
127+
build(:log_event, source: source, message: "Old event", timestamp: old_timestamp)
128+
129+
recent_event =
130+
build(:log_event, source: source, message: "Recent event", timestamp: recent_timestamp)
131+
132+
messages = [
133+
%Message{data: old_event, acknowledger: {Pipeline, :ack_id, :ack_data}},
134+
%Message{data: recent_event, acknowledger: {Pipeline, :ack_id, :ack_data}}
135+
]
136+
137+
result = Pipeline.handle_batch(:ch, messages, %{size: 2, trigger: :flush}, context)
138+
139+
assert result == messages
140+
141+
Process.sleep(200)
142+
143+
table_name = ClickHouseAdaptor.clickhouse_ingest_table_name(source)
144+
145+
{:ok, query_result} =
146+
ClickHouseAdaptor.execute_ch_query(
147+
backend,
148+
"SELECT body FROM #{table_name}"
149+
)
150+
151+
assert length(query_result) == 1
152+
[row] = query_result
153+
body = Jason.decode!(row["body"])
154+
assert body["event_message"] == "Recent event"
155+
end
156+
118157
test "handles log events with different field types", %{
119158
context: context,
120159
source: source,

0 commit comments

Comments
 (0)