Skip to content

Commit 2a133da

Browse files
jrauh01yhabteab
authored andcommitted
Structs for upsert and delete
1 parent c7dfddd commit 2a133da

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed

database/optionally.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
7+
"github.com/pkg/errors"
8+
)
9+
10+
// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
11+
type Upsert interface {
12+
// Stream bulk upserts the specified entities via NamedBulkExec.
13+
// If not explicitly specified, the upsert statement is created using
14+
// BuildUpsertStmt with the first entity from the entities stream.
15+
Stream(ctx context.Context, entities <-chan Entity) error
16+
}
17+
18+
// UpsertOption is a functional option for NewUpsert.
19+
type UpsertOption func(u *upsert)
20+
21+
// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
22+
// operation was performed successfully are passed to the callbacks.
23+
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
24+
return func(u *upsert) {
25+
u.onUpsert = onUpsert
26+
}
27+
}
28+
29+
// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
30+
func WithStatement(stmt string, placeholders int) UpsertOption {
31+
return func(u *upsert) {
32+
u.stmt = stmt
33+
u.placeholders = placeholders
34+
}
35+
}
36+
37+
// NewUpsert creates a new Upsert initalized with a database.
38+
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
39+
u := &upsert{db: db}
40+
41+
for _, option := range options {
42+
option(u)
43+
}
44+
45+
return u
46+
}
47+
48+
type upsert struct {
49+
db *DB
50+
onUpsert []OnSuccess[Entity]
51+
stmt string
52+
placeholders int
53+
}
54+
55+
func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
56+
first, forward, err := com.CopyFirst(ctx, entities)
57+
if err != nil {
58+
return errors.Wrap(err, "can't copy first entity")
59+
}
60+
61+
sem := u.db.GetSemaphoreForTable(TableName(first))
62+
var stmt string
63+
var placeholders int
64+
65+
if u.stmt != "" {
66+
stmt = u.stmt
67+
placeholders = u.placeholders
68+
} else {
69+
stmt, placeholders = u.db.BuildUpsertStmt(first)
70+
}
71+
72+
return u.db.NamedBulkExec(
73+
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
74+
forward, SplitOnDupId[Entity], u.onUpsert...,
75+
)
76+
}
77+
78+
// Delete deletes rows of a table.
79+
type Delete interface {
80+
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
81+
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
82+
Stream(ctx context.Context, from any, args <-chan any) error
83+
}
84+
85+
// DeleteOption is a functional option for NewDelete.
86+
type DeleteOption func(options *delete)
87+
88+
// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
89+
// operation was performed successfully are passed to the callbacks.
90+
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
91+
return func(d *delete) {
92+
d.onDelete = onDelete
93+
}
94+
}
95+
96+
// ByColumn uses the given column for the WHERE clause that the rows must
97+
// satisfy in order to be deleted, instead of automatically using ID.
98+
func ByColumn(column string) DeleteOption {
99+
return func(d *delete) {
100+
d.column = column
101+
}
102+
}
103+
104+
// NewDelete creates a new Delete initalized with a database.
105+
func NewDelete(db *DB, options ...DeleteOption) Delete {
106+
d := &delete{db: db}
107+
108+
for _, option := range options {
109+
option(d)
110+
}
111+
112+
return d
113+
}
114+
115+
type delete struct {
116+
db *DB
117+
column string
118+
onDelete []OnSuccess[any]
119+
}
120+
121+
func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
122+
var stmt string
123+
124+
if d.column != "" {
125+
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
126+
} else {
127+
stmt = d.db.BuildDeleteStmt(from)
128+
}
129+
130+
sem := d.db.GetSemaphoreForTable(TableName(from))
131+
132+
return d.db.BulkExec(
133+
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
134+
)
135+
}

0 commit comments

Comments
 (0)