Skip to content

Commit 175edf1

Browse files
authored
refactor: partition tasks by range and by status in OLAP repository (#6)
* refactor: timescale partition by statuses * pub concurrency 2 -> 1
1 parent 0ca5f0e commit 175edf1

File tree

11 files changed

+395
-66
lines changed

11 files changed

+395
-66
lines changed

internal/msgqueue/mq_pub_buffer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
const PUB_FLUSH_INTERVAL = 10 * time.Millisecond
1111
const PUB_BUFFER_SIZE = 1000
12-
const PUB_MAX_CONCURRENCY = 2
12+
const PUB_MAX_CONCURRENCY = 1
1313

1414
type PubFunc func(m *Message) error
1515

pkg/repository/olap.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewOLAPEventRepository(l *zerolog.Logger) OLAPEventRepository {
9999
queries := timescalev2.New()
100100

101101
// create partitions of the events OLAP table
102-
partitionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
102+
partitionCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
103103
defer cancel()
104104

105105
err = queries.CreateOLAPPartitions(partitionCtx, timescalePool, NUM_PARTITIONS)
@@ -108,6 +108,57 @@ func NewOLAPEventRepository(l *zerolog.Logger) OLAPEventRepository {
108108
log.Fatal(err)
109109
}
110110

111+
today := time.Now().UTC()
112+
tomorrow := today.AddDate(0, 0, 1)
113+
sevenDaysAgo := today.AddDate(0, 0, -7)
114+
115+
err = queries.CreateOLAPTaskPartition(partitionCtx, timescalePool, pgtype.Date{
116+
Time: today,
117+
Valid: true,
118+
})
119+
120+
if err != nil {
121+
log.Fatal(err)
122+
}
123+
124+
err = queries.CreateOLAPTaskPartition(partitionCtx, timescalePool, pgtype.Date{
125+
Time: tomorrow,
126+
Valid: true,
127+
})
128+
129+
if err != nil {
130+
log.Fatal(err)
131+
}
132+
133+
partitions, err := queries.ListOLAPTaskPartitionsBeforeDate(partitionCtx, timescalePool, pgtype.Date{
134+
Time: sevenDaysAgo,
135+
Valid: true,
136+
})
137+
138+
if err != nil {
139+
log.Fatal(err)
140+
}
141+
142+
for _, partition := range partitions {
143+
_, err := timescalePool.Exec(
144+
partitionCtx,
145+
fmt.Sprintf("ALTER TABLE v2_task DETACH PARTITION %s CONCURRENTLY", partition),
146+
)
147+
148+
if err != nil {
149+
log.Fatal(err)
150+
}
151+
152+
_, err = timescalePool.Exec(
153+
partitionCtx,
154+
fmt.Sprintf("DROP TABLE %s", partition),
155+
)
156+
157+
if err != nil {
158+
log.Fatal(err)
159+
}
160+
}
161+
111162
return &olapEventRepository{
112163
pool: timescalePool,
113164
l: l,

pkg/repository/v2/sqlcv2/dags.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
-- name: CreateDAGPartition :exec
2+
SELECT create_v2_dag_partition(
3+
@date::date
4+
);
5+
6+
-- name: ListDAGPartitionsBeforeDate :many
7+
SELECT
8+
p::text AS partition_name
9+
FROM
10+
get_v2_dag_partitions_before(
11+
@date::date
12+
) AS p;
13+
114
-- name: GetDAGData :many
215
WITH input AS (
316
SELECT

pkg/repository/v2/sqlcv2/dags.sql.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/v2/sqlcv2/tasks.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
-- name: CreateTablePartition :exec
1+
-- name: CreateTaskPartition :exec
22
SELECT create_v2_task_partition(
33
@date::date
44
);
55

6-
-- name: ListTablePartitionsBeforeDate :many
6+
-- name: ListTaskPartitionsBeforeDate :many
77
SELECT
88
p::text AS partition_name
99
FROM

pkg/repository/v2/sqlcv2/tasks.sql.go

Lines changed: 40 additions & 40 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/v2/task.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
8181
tomorrow := today.AddDate(0, 0, 1)
8282
sevenDaysAgo := today.AddDate(0, 0, -7)
8383

84-
err := r.queries.CreateTablePartition(ctx, r.pool, pgtype.Date{
84+
err := r.queries.CreateTaskPartition(ctx, r.pool, pgtype.Date{
8585
Time: today,
8686
Valid: true,
8787
})
@@ -90,7 +90,7 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
9090
return err
9191
}
9292

93-
err = r.queries.CreateTablePartition(ctx, r.pool, pgtype.Date{
93+
err = r.queries.CreateTaskPartition(ctx, r.pool, pgtype.Date{
9494
Time: tomorrow,
9595
Valid: true,
9696
})
@@ -99,7 +99,7 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
9999
return err
100100
}
101101

102-
partitions, err := r.queries.ListTablePartitionsBeforeDate(ctx, r.pool, pgtype.Date{
102+
partitions, err := r.queries.ListTaskPartitionsBeforeDate(ctx, r.pool, pgtype.Date{
103103
Time: sevenDaysAgo,
104104
Valid: true,
105105
})
@@ -128,6 +128,53 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
128128
}
129129
}
130130

131+
err = r.queries.CreateDAGPartition(ctx, r.pool, pgtype.Date{
132+
Time: today,
133+
Valid: true,
134+
})
135+
136+
if err != nil {
137+
return err
138+
}
139+
140+
err = r.queries.CreateDAGPartition(ctx, r.pool, pgtype.Date{
141+
Time: tomorrow,
142+
Valid: true,
143+
})
144+
145+
if err != nil {
146+
return err
147+
}
148+
149+
dagPartitions, err := r.queries.ListDAGPartitionsBeforeDate(ctx, r.pool, pgtype.Date{
150+
Time: sevenDaysAgo,
151+
Valid: true,
152+
})
153+
154+
if err != nil {
155+
return err
156+
}
157+
158+
for _, partition := range dagPartitions {
159+
_, err := r.pool.Exec(
160+
ctx,
161+
fmt.Sprintf("ALTER TABLE v2_task DETACH PARTITION %s CONCURRENTLY", partition),
162+
)
163+
164+
if err != nil {
165+
return err
166+
}
167+
168+
_, err = r.pool.Exec(
169+
ctx,
170+
fmt.Sprintf("DROP TABLE %s", partition),
171+
)
172+
173+
if err != nil {
174+
return err
175+
}
176+
}
177+
131178
return nil
132179
}
133180

pkg/repository/v2/timescalev2/models.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/v2/timescalev2/queries.sql

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,19 @@ SELECT create_v2_task_events_partitions(
33
@partitions::int
44
);
55

6+
-- name: CreateOLAPTaskPartition :exec
7+
SELECT create_v2_tasks_olap_partition(
8+
@date::date
9+
);
10+
11+
-- name: ListOLAPTaskPartitionsBeforeDate :many
12+
SELECT
13+
p::text AS partition_name
14+
FROM
15+
get_v2_tasks_olap_partitions_before(
16+
@date::date
17+
) AS p;
18+
619
-- name: CreateTasksOLAP :copyfrom
720
INSERT INTO v2_tasks_olap (
821
tenant_id,
@@ -480,4 +493,4 @@ WITH locked_events AS (
480493
SELECT
481494
COUNT(*)
482495
FROM
483-
locked_events;
496+
locked_events;

0 commit comments

Comments
 (0)