Skip to content

Commit d5584e3

Browse files
authored
perf: bq clustering (#2489)
* perf: partition by hour by default * perf: add in BQ clustering customization * chore: revert hour partitioning
1 parent 1668b40 commit d5584e3

File tree

5 files changed

+73
-11
lines changed

5 files changed

+73
-11
lines changed

lib/logflare/google/bigquery/bigquery.ex

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,12 @@ defmodule Logflare.Google.BigQuery do
135135
table_name = GenUtils.format_table_name(source_id)
136136
dataset_id = dataset_id || GenUtils.get_account_id(source_id) <> env_dataset_id_append()
137137

138+
{:ok, table} = get_table(source_id)
139+
timepartitioning_type = table.timePartitioning.type
140+
138141
partitioning = %Model.TimePartitioning{
139-
type: "DAY",
142+
# use the same type as the existing table, as BQ does not allow changing it
143+
type: timepartitioning_type,
140144
expirationMs: table_ttl
141145
}
142146

@@ -147,6 +151,23 @@ defmodule Logflare.Google.BigQuery do
147151
|> GenUtils.maybe_parse_google_api_result()
148152
end
149153

154+
@spec patch_table_clustering(atom, list(String.t()), binary, binary) :: ok_err_tup
155+
def patch_table_clustering(source_id, clustering, dataset_id, project_id) do
156+
conn = GenUtils.get_conn()
157+
table_name = GenUtils.format_table_name(source_id)
158+
dataset_id = dataset_id || GenUtils.get_account_id(source_id) <> env_dataset_id_append()
159+
160+
clustering_model = %Model.Clustering{
161+
fields: clustering
162+
}
163+
164+
conn
165+
|> Api.Tables.bigquery_tables_patch(project_id, dataset_id, table_name,
166+
body: %Model.Table{clustering: clustering_model}
167+
)
168+
|> GenUtils.maybe_parse_google_api_result()
169+
end
170+
150171
@spec patch_table(atom, any, binary, binary) :: {:error, Tesla.Env.t()} | {:ok, Model.Table.t()}
151172
def patch_table(source_id, schema, dataset_id, project_id) do
152173
conn = GenUtils.get_conn()

lib/logflare/sources.ex

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,20 @@ defmodule Logflare.Sources do
150150
)
151151
end
152152

153+
if source.bigquery_clustering_fields != updated.bigquery_clustering_fields and
154+
not SingleTenant.postgres_backend?() do
155+
user = Users.Cache.get(updated.user_id)
156+
157+
fields = String.split(updated.bigquery_clustering_fields || "", ",") ++ ["timestamp", "id"]
158+
159+
BigQuery.patch_table_clustering(
160+
updated.token,
161+
fields,
162+
user.bigquery_dataset_id,
163+
user.bigquery_project_id
164+
)
165+
end
166+
153167
{:ok, updated}
154168
end
155169

lib/logflare/sources/source.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ defmodule Logflare.Source do
2929
:custom_event_message_keys,
3030
:backends,
3131
:retention_days,
32-
:transform_copy_fields
32+
:transform_copy_fields,
33+
:bigquery_clustering_fields
3334
]}
3435
defp env_dataset_id_append,
3536
do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append]
@@ -134,6 +135,7 @@ defmodule Logflare.Source do
134135
field(:suggested_keys, :string, default: "")
135136
field(:retention_days, :integer, virtual: true)
136137
field(:transform_copy_fields, :string)
138+
field(:bigquery_clustering_fields, :string)
137139
# Causes a shitstorm
138140
# field :bigquery_schema, Ecto.Term
139141

@@ -171,6 +173,7 @@ defmodule Logflare.Source do
171173
:public_token,
172174
:favorite,
173175
:bigquery_table_ttl,
176+
:bigquery_clustering_fields,
174177
# users can't update thier API quota currently
175178
:api_quota,
176179
:webhook_notification_url,
@@ -202,6 +205,7 @@ defmodule Logflare.Source do
202205
:public_token,
203206
:favorite,
204207
:bigquery_table_ttl,
208+
:bigquery_clustering_fields,
205209
:webhook_notification_url,
206210
:slack_hook_url,
207211
:custom_event_message_keys,

lib/logflare_web/templates/source/edit.html.eex

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -261,21 +261,35 @@
261261

262262
<%= section_header("Backend TTL") %>
263263
<%= if @conn.assigns.user.billing_enabled do %>
264-
<p>Set how long to keep data in your backend.</p>
264+
<p>Set how long to keep data in your backend.</p>
265+
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
266+
<div class="form-group">
267+
<%= text_input e, :retention_days, placeholder: "3", class: "form-control form-control-margin" %>
268+
<%= error_tag e, :retention_days %>
269+
<small class="form-text text-muted">
270+
Days to keep data.
271+
</small>
272+
</div>
273+
<%= submit "Update backend settings", class: "btn btn-primary form-button" %>
274+
<% end %>
275+
<% else %>
276+
<p>Add a Google Cloud Platform project ID to get started!</p>
277+
<%= link "Setup custom backend", to: Routes.user_path(@conn, :edit) <> "#bigquery-backend", class: "btn btn-primary form-button", role: "button" %>
278+
<% end %>
279+
280+
<%= section_header("Backend Clustering") %>
281+
<p>Set additional clustering columns to improve query performance. <code>timestamp</code> and <code>id</code> are always clustered.</p>
265282
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
266283
<div class="form-group">
267-
<%= text_input e, :retention_days, placeholder: "3", class: "form-control form-control-margin" %>
268-
<%= error_tag e, :retention_days %>
284+
<%= text_input e, :bigquery_clustering_fields, placeholder: "my_field,my_other_field", class: "form-control form-control-margin" %>
285+
<%= error_tag e, :bigquery_clustering_fields %>
269286
<small class="form-text text-muted">
270-
Days to keep data.
287+
Comma-separated. For example: <code>my_field,my_other_field</code>.
271288
</small>
272289
</div>
273-
<%= submit "Update backend settings", class: "btn btn-primary form-button" %>
274-
<% end %>
275-
<% else %>
276-
<p>Add a Google Cloud Platform project ID to get started!</p>
277-
<%= link "Setup custom backend", to: Routes.user_path(@conn, :edit) <> "#bigquery-backend", class: "btn btn-primary form-button", role: "button" %>
290+
<%= submit "Update clustering", class: "btn btn-primary form-button" %>
278291
<% end %>
292+
279293
<%= section_header("Suggested Search Keys") %>
280294
<p>Set suggested search keys for this source, to encourage usage of certain filtering keys when performing searches across large sources. If a declared key is missing, users will be prompted to include the key when a search is made.</p>
281295
<%= form_for @changeset, Routes.source_path(@conn, :update, @source),fn f -> %>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule Logflare.Repo.Migrations.AddBigqueryClusteringFieldstoSourcesTable do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:sources) do
6+
add :bigquery_clustering_fields, :string
7+
end
8+
end
9+
end

0 commit comments

Comments
 (0)