Skip to content

Commit a2a2acf

Browse files
committed
Add functional options to 'DeleteStreamed' and adjust for 'UpsertStreamed'
1 parent cf9d730 commit a2a2acf

File tree

1 file changed

+89
-6
lines changed

1 file changed

+89
-6
lines changed

database/db.go

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -731,25 +731,67 @@ func (db *DB) CreateIgnoreStreamed(
731731
)
732732
}
733733

734+
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
735+
return func(options *ExecOptions) {
736+
options.onSuccess = onSuccess
737+
}
738+
}
739+
740+
func WithStatement(stmt string, placeholders int) ExecOption {
741+
return func(options *ExecOptions) {
742+
options.stmt = stmt
743+
options.placeholders = placeholders
744+
}
745+
}
746+
747+
type ExecOption func(options *ExecOptions)
748+
749+
type ExecOptions struct {
750+
onSuccess []OnSuccess[Entity]
751+
stmt string
752+
placeholders int
753+
}
754+
755+
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
756+
execOptions := &ExecOptions{}
757+
758+
for _, option := range execOpts {
759+
option(execOptions)
760+
}
761+
762+
return execOptions
763+
}
764+
734765
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
735766
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
736767
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
737768
// concurrency is controlled via Options.MaxConnectionsPerTable.
738769
// Entities for which the query ran successfully will be passed to onSuccess.
739770
func (db *DB) UpsertStreamed(
740-
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
771+
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
741772
) error {
773+
774+
execOptions := NewExecOptions(execOpts...)
775+
742776
first, forward, err := com.CopyFirst(ctx, entities)
743777
if err != nil {
744778
return errors.Wrap(err, "can't copy first entity")
745779
}
746780

747781
sem := db.GetSemaphoreForTable(TableName(first))
748-
stmt, placeholders := db.BuildUpsertStmt(first)
782+
var stmt string
783+
var placeholders int
784+
785+
if execOptions.stmt != "" {
786+
stmt = execOptions.stmt
787+
placeholders = execOptions.placeholders
788+
} else {
789+
stmt, placeholders = db.BuildUpsertStmt(first)
790+
}
749791

750792
return db.NamedBulkExec(
751793
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
752-
forward, SplitOnDupId[Entity], onSuccess...,
794+
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
753795
)
754796
}
755797

@@ -768,17 +810,58 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
768810
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
769811
}
770812

813+
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
814+
return func(options *DeleteOptions) {
815+
options.onSuccess = onSuccess
816+
}
817+
}
818+
819+
func ByColumn(column string) DeleteOption {
820+
return func(options *DeleteOptions) {
821+
options.column = column
822+
}
823+
}
824+
825+
type DeleteOption func(options *DeleteOptions)
826+
827+
type DeleteOptions struct {
828+
onSuccess []OnSuccess[any]
829+
column string
830+
}
831+
832+
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
833+
deleteOptions := &DeleteOptions{}
834+
835+
for _, option := range execOpts {
836+
option(deleteOptions)
837+
}
838+
839+
return deleteOptions
840+
}
841+
771842
// DeleteStreamed bulk deletes the specified ids via BulkExec.
772843
// The delete statement is created using BuildDeleteStmt with the passed entityType.
773844
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
774845
// concurrency is controlled via Options.MaxConnectionsPerTable.
775846
// IDs for which the query ran successfully will be passed to onSuccess.
776847
func (db *DB) DeleteStreamed(
777-
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
848+
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
778849
) error {
850+
851+
deleteOptions := NewDeleteOptions(deleteOpts...)
852+
779853
sem := db.GetSemaphoreForTable(TableName(entityType))
854+
855+
var stmt string
856+
857+
if deleteOptions.column != "" {
858+
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
859+
} else {
860+
stmt = db.BuildDeleteStmt(entityType)
861+
}
862+
780863
return db.BulkExec(
781-
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
864+
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
782865
)
783866
}
784867

@@ -794,7 +877,7 @@ func (db *DB) Delete(
794877
}
795878
close(idsCh)
796879

797-
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
880+
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
798881
}
799882

800883
// ExecTx executes the provided function within a database transaction.

0 commit comments

Comments
 (0)