Skip to content

Commit 18f995e

Browse files
authored
feat: add lf-endpoint-redact-pii (#2756)
* feat: add lf-endpoint-redact-pii * chore: version bump * chore: formatting * fix: correctly forward :redact_pii option * chore: formatting * chore: fix failing tests * chore: more test fixes
1 parent 0db6b72 commit 18f995e

File tree

11 files changed

+88
-30
lines changed

11 files changed

+88
-30
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.21.1
1+
1.22.0

docs/docs.logflare.com/docs/concepts/endpoints.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ Endpoints are unsecure by default. However, you can generate [access tokens](/co
176176

177177
Logflare endpoints support automatic redaction of personally identifiable information (PII) from query results to help protect sensitive data. When enabled, the PII redaction feature will automatically replace IP addresses in query result values with "REDACTED".
178178

179-
PII redaction can be enabled when checking the "Redact PII from query results" checkbox when configuring the endpoint.
179+
PII redaction can be enabled when checking the "Redact PII from query results" checkbox when configuring the endpoint. Override per request with `LF-ENDPOINT-REDACT-PII: true|false`; if omitted, the endpoint setting is used.
180180

181181
Currently, PII redaction targets:
182182

lib/logflare/endpoints.ex

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,14 +344,15 @@ defmodule Logflare.Endpoints do
344344
Runs a cached query.
345345
"""
346346
@spec run_cached_query(query :: Query.t(), params :: map()) :: run_query_return()
347-
def run_cached_query(%Query{} = query, params \\ %{}) when is_map(params) do
347+
def run_cached_query(%Query{} = query, params \\ %{}, opts \\ [])
348+
when is_map(params) and is_list(opts) do
348349
if query.cache_duration_seconds > 0 do
349350
query
350-
|> Resolver.resolve(params)
351+
|> Resolver.resolve(params, opts)
351352
|> ResultsCache.query()
352353
else
353354
# execute the query directly
354-
run_query(query, params)
355+
run_query(query, params, opts)
355356
end
356357
end
357358

@@ -422,14 +423,16 @@ defmodule Logflare.Endpoints do
422423
{final_query, declared_params, input_params, endpoint_query}
423424
end
424425

426+
redact_pii = Keyword.get(opts, :redact_pii, endpoint_query.redact_pii)
427+
425428
case adaptor.execute_query(backend, query_args, opts) do
426429
{:ok, rows} when is_list(rows) ->
427-
redacted_rows = PiiRedactor.redact_query_result(rows, endpoint_query.redact_pii)
430+
redacted_rows = PiiRedactor.redact_query_result(rows, redact_pii)
428431
{:ok, %{rows: redacted_rows}}
429432

430433
{:ok, %{rows: rows} = result} ->
431434
# Pass through the full result map with all metadata, but redact PII in rows
432-
redacted_rows = PiiRedactor.redact_query_result(rows, endpoint_query.redact_pii)
435+
redacted_rows = PiiRedactor.redact_query_result(rows, redact_pii)
433436
{:ok, %{result | rows: redacted_rows}}
434437

435438
{:error, error} ->

lib/logflare/endpoints/resolver.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ defmodule Logflare.Endpoints.Resolver do
2121
Starts up or performs a lookup for an Endpoint.Cache process.
2222
Returns the resolved pid.
2323
"""
24-
def resolve(%Logflare.Endpoints.Query{id: id} = query, params) do
24+
def resolve(%Logflare.Endpoints.Query{id: id} = query, params, opts) do
2525
attributes = %{
2626
"endpoint.id" => id,
2727
"endpoint.token" => query.token,
@@ -40,12 +40,12 @@ defmodule Logflare.Endpoints.Resolver do
4040
OpenTelemetry.Tracer.with_span "logflare.endpoints.results_cache.create", %{
4141
attributes: attributes
4242
} do
43-
spec = {ResultsCache, {query, params}}
43+
spec = {ResultsCache, {query, params, opts}}
4444
Logger.debug("Starting up Endpoint.Cache for Endpoint.Query id=#{id}", endpoint_id: id)
4545

4646
via =
4747
{:via, PartitionSupervisor,
48-
{Logflare.Endpoints.ResultsCache.PartitionSupervisor, {id, params}}}
48+
{Logflare.Endpoints.ResultsCache.PartitionSupervisor, {id, params, opts}}}
4949

5050
case DynamicSupervisor.start_child(via, spec) do
5151
{:ok, pid} ->

lib/logflare/endpoints/results_cache.ex

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Logflare.Endpoints.ResultsCache do
1212

1313
defstruct query_tasks: [],
1414
params: %{},
15+
opts: [],
1516
cached_result: nil,
1617
shutdown_timer: nil,
1718
refresh_timer: nil,
@@ -24,15 +25,16 @@ defmodule Logflare.Endpoints.ResultsCache do
2425
endpoint_query_token: String.t(),
2526
query_tasks: list(%Task{}),
2627
params: map(),
28+
opts: list(),
2729
cached_result: binary(),
2830
shutdown_timer: reference(),
2931
refresh_timer: reference(),
3032
parsed_labels: map()
3133
}
3234

33-
def start_link({query, params}) do
35+
def start_link({query, params, opts}) do
3436
name = name(query.id, params)
35-
GenServer.start_link(__MODULE__, {query, params}, name: name, hibernate_after: 5_000)
37+
GenServer.start_link(__MODULE__, {query, params, opts}, name: name, hibernate_after: 5_000)
3638
end
3739

3840
@doc """
@@ -86,7 +88,7 @@ defmodule Logflare.Endpoints.ResultsCache do
8688
GenServer.call(cache, :invalidate)
8789
end
8890

89-
def init({query, params}) do
91+
def init({query, params, opts}) do
9092
endpoints = endpoints_part(query.id)
9193
:syn.join(endpoints, query.id, self())
9294

@@ -97,6 +99,7 @@ defmodule Logflare.Endpoints.ResultsCache do
9799
endpoint_query_id: query.id,
98100
endpoint_query_token: query.token,
99101
params: params,
102+
opts: opts,
100103
shutdown_timer: timer,
101104
parsed_labels: query.parsed_labels
102105
}
@@ -188,7 +191,7 @@ defmodule Logflare.Endpoints.ResultsCache do
188191
Endpoints.Cache.get_mapped_query_by_token(state.endpoint_query_token)
189192
|> Map.put(:parsed_labels, state.parsed_labels)
190193

191-
Logflare.Endpoints.run_query(query, state.params)
194+
Logflare.Endpoints.run_query(query, state.params, state.opts)
192195
|> Tuple.append(query)
193196
end
194197

lib/logflare_web/controllers/endpoints_controller.ex

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ defmodule LogflareWeb.EndpointsController do
2828
"Content-Length",
2929
"X-Requested-With",
3030
"X-API-Key",
31-
"LF-ENDPOINT-LABELS"
31+
"LF-ENDPOINT-LABELS",
32+
"LF-ENDPOINT-REDACT-PII"
3233
],
3334
methods: ["GET", "POST", "OPTIONS"],
3435
send_preflight_response?: true
@@ -66,8 +67,16 @@ defmodule LogflareWeb.EndpointsController do
6667
end
6768

6869
parsed_labels = Endpoints.parse_labels(endpoint_query.labels, header_str, params)
70+
override = get_req_header(conn, "lf-endpoint-redact-pii") |> List.first()
6971

70-
case Endpoints.run_cached_query(%{endpoint_query | parsed_labels: parsed_labels}, params) do
72+
redact_pii =
73+
if override, do: String.downcase(override) == "true", else: endpoint_query.redact_pii
74+
75+
case Endpoints.run_cached_query(
76+
%{endpoint_query | parsed_labels: parsed_labels},
77+
params,
78+
redact_pii: redact_pii
79+
) do
7180
{:ok, result} ->
7281
Logger.debug("Endpoint cache result, #{inspect(result, pretty: true)}")
7382
render(conn, "query.json", result: result.rows)

lib/logflare_web/live/endpoints/actions/show.html.heex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,14 @@
101101
-H 'Content-Type: application/json; charset=utf-8' <span :if={length(@declared_params) > 0}>\</span>
102102
<span :if={length(@declared_params) > 0}>-G <%= Enum.map(@declared_params, fn p -> "-d \"#{p}=VALUE\"" end) |> Enum.join(" ") %></span>
103103
</code>
104+
105+
<code class="tw-whitespace-pre-line">
106+
# With per-request PII redaction
107+
curl "<%= "https://api.logflare.app" <> ~p"/api/endpoints/query/#{@show_endpoint.token}" %>" \
108+
-H 'X-API-KEY: YOUR-ACCESS-TOKEN' \
109+
-H 'LF-ENDPOINT-REDACT-PII: true' \
110+
-H 'Content-Type: application/json; charset=utf-8' <span :if={length(@declared_params) > 0}>\</span>
111+
<span :if={length(@declared_params) > 0}>-G <%= Enum.map(@declared_params, fn p -> "-d \"#{p}=VALUE\"" end) |> Enum.join(" ") %></span>
112+
</code>
104113
</div>
105114
</section>

test/logflare/endpoints/cache_test.exs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule Logflare.Endpoints.CacheTest do
3838
end)
3939

4040
# Start cache process
41-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
41+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
4242
assert Process.alive?(cache_pid)
4343

4444
# First query should hit backend
@@ -54,7 +54,7 @@ defmodule Logflare.Endpoints.CacheTest do
5454
{:error, :timeout}
5555
end)
5656

57-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
57+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
5858
assert Process.alive?(cache_pid)
5959

6060
assert {:error, %{"message" => :timeout}} = Endpoints.run_cached_query(endpoint)
@@ -70,7 +70,7 @@ defmodule Logflare.Endpoints.CacheTest do
7070
{:ok, TestUtils.gen_bq_response(test_response)}
7171
end)
7272

73-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
73+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
7474
assert Process.alive?(cache_pid)
7575

7676
# First query should succeed
@@ -94,7 +94,7 @@ defmodule Logflare.Endpoints.CacheTest do
9494
{:error, TestUtils.gen_bq_error("BQ Error")}
9595
end)
9696

97-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
97+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
9898
assert Process.alive?(cache_pid)
9999

100100
assert {:error, %{"message" => "BQ Error"}} = Endpoints.run_cached_query(endpoint)
@@ -113,7 +113,7 @@ defmodule Logflare.Endpoints.CacheTest do
113113
{:ok, TestUtils.gen_bq_response(test_response)}
114114
end)
115115

116-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
116+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
117117
assert Process.alive?(cache_pid)
118118

119119
# First query should succeed
@@ -136,7 +136,7 @@ defmodule Logflare.Endpoints.CacheTest do
136136
{:ok, TestUtils.gen_bq_response(test_response)}
137137
end)
138138

139-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
139+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
140140
assert Process.alive?(cache_pid)
141141

142142
# First query should succeed
@@ -161,7 +161,7 @@ defmodule Logflare.Endpoints.CacheTest do
161161
{:ok, TestUtils.gen_bq_response(test_response)}
162162
end)
163163

164-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
164+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
165165
assert Process.alive?(cache_pid)
166166

167167
# First query should return first test response
@@ -202,7 +202,7 @@ defmodule Logflare.Endpoints.CacheTest do
202202
{:ok, TestUtils.gen_bq_response()}
203203
end)
204204

205-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
205+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
206206
assert Process.alive?(cache_pid)
207207
assert {:ok, %{rows: [_]}} = Endpoints.run_cached_query(endpoint)
208208

@@ -222,7 +222,7 @@ defmodule Logflare.Endpoints.CacheTest do
222222
{:ok, TestUtils.gen_bq_response(test_response)}
223223
end)
224224

225-
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
225+
{:ok, cache_pid} = start_supervised({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
226226
assert Process.alive?(cache_pid)
227227

228228
# First query should succeed

test/logflare/endpoints_test.exs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ defmodule Logflare.EndpointsTest do
314314
cache_duration_seconds: 4
315315
)
316316

317-
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
317+
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
318318
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_cached_query(endpoint)
319319
# 2nd query should hit local cache
320320
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_cached_query(endpoint)
@@ -334,7 +334,7 @@ defmodule Logflare.EndpointsTest do
334334
cache_duration_seconds: 1
335335
)
336336

337-
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
337+
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
338338
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_cached_query(endpoint)
339339
# 2nd query should hit local cache
340340
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_cached_query(endpoint)
@@ -375,7 +375,7 @@ defmodule Logflare.EndpointsTest do
375375

376376
user = insert(:user)
377377
endpoint = insert(:endpoint, user: user, query: "select current_datetime() as testing")
378-
cache_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
378+
cache_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
379379
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_cached_query(endpoint)
380380

381381
params =
@@ -454,7 +454,7 @@ defmodule Logflare.EndpointsTest do
454454
}
455455
} = Endpoints.calculate_endpoint_metrics(endpoint)
456456

457-
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}}})
457+
_pid = start_supervised!({Logflare.Endpoints.ResultsCache, {endpoint, %{}, []}})
458458

459459
assert %_{
460460
metrics: %Query.Metrics{

test/logflare_web/controllers/endpoints_controller_test.exs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,40 @@ defmodule LogflareWeb.EndpointsControllerTest do
576576
refute conn.halted
577577
end
578578

579+
test "redacts IP addresses when header is set", %{conn: init_conn, user: user} do
580+
endpoint = insert(:endpoint, user: user, enable_auth: false, redact_pii: false)
581+
582+
GoogleApi.BigQuery.V2.Api.Jobs
583+
|> expect(:bigquery_jobs_query, fn _conn, _proj_id, _opts ->
584+
bq_response =
585+
TestUtils.gen_bq_response([
586+
%{"ip_address" => "192.168.1.1", "event_message" => "User 10.0.0.1 connected ::1"}
587+
])
588+
589+
{:ok, bq_response}
590+
end)
591+
592+
conn =
593+
init_conn
594+
|> put_req_header("lf-endpoint-redact-pii", "true")
595+
|> get(~p"/endpoints/query/#{endpoint.token}")
596+
597+
response =
598+
conn
599+
|> json_response(200)
600+
|> assert_schema("EndpointQuery")
601+
602+
assert [
603+
%{
604+
"ip_address" => "REDACTED",
605+
"event_message" => "User REDACTED connected REDACTED"
606+
}
607+
] = response.result
608+
609+
refute response.error
610+
refute conn.halted
611+
end
612+
579613
test "does not redact IP addresses when redact_pii is disabled", %{
580614
conn: init_conn,
581615
user: user

0 commit comments

Comments
 (0)