Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/mix/tasks/clean_clickhouse.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
defmodule Mix.Tasks.CleanClickhouse do
use Mix.Task

Expand All @@ -11,6 +11,8 @@
tables --
[
"schema_migrations",
"failed_batches",
"failed_batches_dict",
"location_data",
"location_data_dict",
"acquisition_channel_source_category",
Expand Down
23 changes: 23 additions & 0 deletions lib/plausible/data_migration/failed_batches.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Plausible.DataMigration.FailedBatches do
@moduledoc """
ClickHouse failed batches data migration for storing failed batches in ClickHouse.
"""

use Plausible.DataMigration, dir: "FailedBatches", repo: Plausible.IngestRepo

def run do
cluster? = Plausible.IngestRepo.clustered_table?("sessions_v2")

{:ok, _} =
run_sql("create-failed-batches-table",
cluster?: cluster?,
table_settings: Plausible.MigrationUtils.table_settings_expr(:prefix)
)

{:ok, _} =
run_sql("create-failed-batches-dictionary",
cluster?: cluster?,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params()
)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE OR REPLACE DICTIONARY failed_batches_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`batch` UInt64
)
PRIMARY KEY batch
SOURCE(CLICKHOUSE(TABLE failed_batches <%= @dictionary_connection_params %> invalidate_query 'SELECT max(batch) FROM failed_batches'))
LIFETIME(MIN 300 MAX 360)
LAYOUT(hashed())
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS failed_batches <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`batch` UInt64
)
<%= if @cluster? do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/failed_batches', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY (batch)
<%= @table_settings %>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Plausible.IngestRepo.Migrations.AddBatchToSessionsAndEvents do
use Ecto.Migration

@on_cluster_sessions Plausible.MigrationUtils.on_cluster_statement("sessions_v2")
@on_cluster_events Plausible.MigrationUtils.on_cluster_statement("events_v2")

def up do
execute """
ALTER TABLE sessions_v2
#{@on_cluster_sessions}
ADD COLUMN batch UInt64
"""

execute """
ALTER TABLE sessions_v2
#{@on_cluster_sessions}
ADD INDEX IF NOT EXISTS minmax_batch batch
TYPE minmax GRANULARITY 1
"""

execute """
ALTER TABLE sessions_v2
MATERIALIZE INDEX minmax_batch
"""

execute """
ALTER TABLE events_v2
#{@on_cluster_events}
ADD COLUMN batch UInt64
"""

execute """
ALTER TABLE events_v2
#{@on_cluster_events}
ADD INDEX IF NOT EXISTS minmax_batch batch
TYPE minmax GRANULARITY 1
"""

execute """
ALTER TABLE events_v2
MATERIALIZE INDEX minmax_batch
"""
end

def down do
execute """
ALTER TABLE sessions_v2
#{@on_cluster_sessions}
DROP INDEX IF EXISTS minmax_batch
"""

execute """
ALTER TABLE sessions_v2
#{@on_cluster_sessions}
DROP COLUMN IF EXISTS batch
"""

execute """
ALTER TABLE events_v2
#{@on_cluster_events}
DROP INDEX IF EXISTS minmax_batch
"""

execute """
ALTER TABLE events_v2
#{@on_cluster_events}
DROP COLUMN IF EXISTS batch
"""
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Plausible.IngestRepo.Migrations.AddFailedBatchesTableDict do
use Ecto.Migration

def up do
Plausible.DataMigration.FailedBatches.run()
end

def down do
raise "Irreversible"
end
end
Loading
Loading