Skip to content

Commit 48af1a6

Browse files
committed
Add functional options to 'DeleteStreamed' and adjust for 'UpsertStreamed'
1 parent 2d47d95 commit 48af1a6

File tree

1 file changed

+89
-6
lines changed

1 file changed

+89
-6
lines changed

database/db.go

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -691,25 +691,67 @@ func (db *DB) CreateIgnoreStreamed(
691691
)
692692
}
693693

694+
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
695+
return func(options *ExecOptions) {
696+
options.onSuccess = onSuccess
697+
}
698+
}
699+
700+
func WithStatement(stmt string, placeholders int) ExecOption {
701+
return func(options *ExecOptions) {
702+
options.stmt = stmt
703+
options.placeholders = placeholders
704+
}
705+
}
706+
707+
type ExecOption func(options *ExecOptions)
708+
709+
type ExecOptions struct {
710+
onSuccess []OnSuccess[Entity]
711+
stmt string
712+
placeholders int
713+
}
714+
715+
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
716+
execOptions := &ExecOptions{}
717+
718+
for _, option := range execOpts {
719+
option(execOptions)
720+
}
721+
722+
return execOptions
723+
}
724+
694725
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
695726
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
696727
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
697728
// concurrency is controlled via Options.MaxConnectionsPerTable.
698729
// Entities for which the query ran successfully will be passed to onSuccess.
699730
func (db *DB) UpsertStreamed(
700-
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
731+
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
701732
) error {
733+
734+
execOptions := NewExecOptions(execOpts...)
735+
702736
first, forward, err := com.CopyFirst(ctx, entities)
703737
if err != nil {
704738
return errors.Wrap(err, "can't copy first entity")
705739
}
706740

707741
sem := db.GetSemaphoreForTable(TableName(first))
708-
stmt, placeholders := db.BuildUpsertStmt(first)
742+
var stmt string
743+
var placeholders int
744+
745+
if execOptions.stmt != "" {
746+
stmt = execOptions.stmt
747+
placeholders = execOptions.placeholders
748+
} else {
749+
stmt, placeholders = db.BuildUpsertStmt(first)
750+
}
709751

710752
return db.NamedBulkExec(
711753
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
712-
forward, SplitOnDupId[Entity], onSuccess...,
754+
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
713755
)
714756
}
715757

@@ -728,17 +770,58 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
728770
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
729771
}
730772

773+
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
774+
return func(options *DeleteOptions) {
775+
options.onSuccess = onSuccess
776+
}
777+
}
778+
779+
func ByColumn(column string) DeleteOption {
780+
return func(options *DeleteOptions) {
781+
options.column = column
782+
}
783+
}
784+
785+
type DeleteOption func(options *DeleteOptions)
786+
787+
type DeleteOptions struct {
788+
onSuccess []OnSuccess[any]
789+
column string
790+
}
791+
792+
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
793+
deleteOptions := &DeleteOptions{}
794+
795+
for _, option := range execOpts {
796+
option(deleteOptions)
797+
}
798+
799+
return deleteOptions
800+
}
801+
731802
// DeleteStreamed bulk deletes the specified ids via BulkExec.
732803
// The delete statement is created using BuildDeleteStmt with the passed entityType.
733804
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
734805
// concurrency is controlled via Options.MaxConnectionsPerTable.
735806
// IDs for which the query ran successfully will be passed to onSuccess.
736807
func (db *DB) DeleteStreamed(
737-
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
808+
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
738809
) error {
810+
811+
deleteOptions := NewDeleteOptions(deleteOpts...)
812+
739813
sem := db.GetSemaphoreForTable(TableName(entityType))
814+
815+
var stmt string
816+
817+
if deleteOptions.column != "" {
818+
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
819+
} else {
820+
stmt = db.BuildDeleteStmt(entityType)
821+
}
822+
740823
return db.BulkExec(
741-
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
824+
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
742825
)
743826
}
744827

@@ -754,7 +837,7 @@ func (db *DB) Delete(
754837
}
755838
close(idsCh)
756839

757-
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
840+
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
758841
}
759842

760843
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {

0 commit comments

Comments
 (0)