6868 ErrSyncAtomicWriteFailed = errors .New ("sync atomic write failed" )
6969 ErrSyncBatchWriteFailed = errors .New ("sync batch write failed" )
7070 ErrSyncFailedToGetChildMutations = errors .New ("sync failed to get child mutations" )
71+ ErrSyncFailedToGetPreDeleteHooks = errors .New ("sync failed to get pre delete hooks" )
7172)
7273
7374// Client is the client for interacting with GCP Spanner.
@@ -360,6 +361,10 @@ type readAllMapper interface {
360361 SelectAll () spanner.Statement
361362}
362363
364+ type readAllByKeysMapper [KeysContainer comparable ] interface {
365+ SelectAllByKeys (KeysContainer ) spanner.Statement
366+ }
367+
363368// mergeMapper handles the logic for updating an existing entity.
364369type mergeMapper [ExternalStruct any , SpannerStruct any ] interface {
365370 Merge (ExternalStruct , SpannerStruct ) SpannerStruct
@@ -435,9 +440,10 @@ type syncableEntityMapper[ExternalStruct any, SpannerStruct any, Key comparable]
435440 mergeAndCheckChangedMapper [ExternalStruct , SpannerStruct ]
436441 childDeleterMapper [SpannerStruct ]
437442 deleteByStructMapper [SpannerStruct ]
443+ preDeleteHookMapper [SpannerStruct ]
438444}
439445
440- type ChildDeleteKeyMutations struct {
446+ type ExtraMutationsGroup struct {
441447 tableName string
442448 mutations []* spanner.Mutation
443449}
@@ -449,7 +455,11 @@ type childDeleterMapper[SpannerStruct any] interface {
449455 ctx context.Context ,
450456 client * Client ,
451457 parentsToDelete []SpannerStruct ,
452- ) ([]ChildDeleteKeyMutations , error )
458+ ) ([]ExtraMutationsGroup , error )
459+ }
460+
461+ type preDeleteHookMapper [SpannerStruct any ] interface {
462+ PreDeleteHook (ctx context.Context , client * Client , rowsToDelete []SpannerStruct ) ([]ExtraMutationsGroup , error )
453463}
454464
455465// --- Generic Entity Components ---
@@ -860,6 +870,57 @@ func (c *entityRemover[M, ExternalStruct, SpannerStruct, Key]) removeWithTransac
860870 return nil
861871}
862872
873+ // allByKeysEntityReader handles the reading of a Spanner table with a set of key(s).
874+ type allByKeysEntityReader [
875+ M readAllByKeysMapper [KeysContainer ],
876+ KeysContainer comparable ,
877+ SpannerStruct any ] struct {
878+ * Client
879+ }
880+
881+ // newAllByKeysEntityReader creates a new reader.
882+ func newAllByKeysEntityReader [
883+ M readAllByKeysMapper [KeysContainer ],
884+ KeysContainer comparable ,
885+ SpannerStruct any ,
886+ ](
887+ c * Client ,
888+ ) * allByKeysEntityReader [M , KeysContainer , SpannerStruct ] {
889+ return & allByKeysEntityReader [M , KeysContainer , SpannerStruct ]{
890+ Client : c ,
891+ }
892+ }
893+
894+ func (r * allByKeysEntityReader [M , KeysContainer , SpannerStruct ]) readAllByKeys (
895+ ctx context.Context ,
896+ keys KeysContainer ,
897+ ) ([]SpannerStruct , error ) {
898+ var mapper M
899+ stmt := mapper .SelectAllByKeys (keys )
900+ txn := r .Single ()
901+ defer txn .Close ()
902+ it := txn .Query (ctx , stmt )
903+ defer it .Stop ()
904+
905+ var entities []SpannerStruct
906+ for {
907+ row , err := it .Next ()
908+ if errors .Is (err , iterator .Done ) {
909+ break
910+ }
911+ if err != nil {
912+ return nil , errors .Join (ErrInternalQueryFailure , err )
913+ }
914+ var entity SpannerStruct
915+ if err := row .ToStruct (& entity ); err != nil {
916+ return nil , err
917+ }
918+ entities = append (entities , entity )
919+ }
920+
921+ return entities , nil
922+ }
923+
863924// entitySynchronizer handles the synchronization of a Spanner table with a
864925// desired state provided as a slice of entities. It determines whether to
865926// use a single atomic transaction or a high-throughput batch write based on
@@ -874,6 +935,9 @@ type entitySynchronizer[
874935 // The number of mutations at which the synchronizer will switch from a
875936 // single atomic transaction to the non-atomic batch writer.
876937 batchWriteThreshold int
938+ // Mapper is the entity mapper that provides the necessary database operation logic.
939+ // This field should be configured before calling the Sync method.
940+ Mapper M
877941}
878942
879943// newEntitySynchronizer creates a new synchronizer with a default threshold.
@@ -882,12 +946,13 @@ func newEntitySynchronizer[
882946 ExternalStruct any ,
883947 SpannerStruct any ,
884948 Key comparable ,
885- ](
886- c * Client ,
887- ) * entitySynchronizer [ M , ExternalStruct , SpannerStruct , Key ] {
949+ ](c * Client ) * entitySynchronizer [ M , ExternalStruct , SpannerStruct , Key ] {
950+ var m M
951+
888952 return & entitySynchronizer [M , ExternalStruct , SpannerStruct , Key ]{
889953 Client : c ,
890954 batchWriteThreshold : defaultBatchSize ,
955+ Mapper : m ,
891956 }
892957}
893958
@@ -897,7 +962,7 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) Sync(
897962 ctx context.Context ,
898963 desiredState []ExternalStruct ,
899964) error {
900- var mapper M
965+ mapper := s . Mapper
901966 tableName := mapper .Table ()
902967
903968 // 1. READ: Fetch all existing entities from the database.
@@ -974,22 +1039,21 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) Sync(
9741039 "updates" , updates ,
9751040 "deletes" , deletes )
9761041
977- // 3. APPLY DELETES: Handle child and parent deletions first.
978- if err := s .applyDeletes (ctx , entitiesToDelete , deleteMutations , mapper ); err != nil {
979- return err
980- }
981-
982- // 4. APPLY UPSERTS: Apply all inserts and updates together.
1042+ // 3. APPLY UPSERTS: Apply all inserts and updates together.
9831043 if len (upsertMutations ) < s .batchWriteThreshold {
9841044 err = s .applyAtomic (ctx , upsertMutations , tableName )
9851045 } else {
9861046 err = s .applyNonAtomic (ctx , upsertMutations , tableName )
9871047 }
988-
9891048 if err != nil {
9901049 return err
9911050 }
9921051
1052+ // 4. APPLY DELETES: Handle child and parent deletions.
1053+ if err := s .applyDeletes (ctx , entitiesToDelete , deleteMutations , mapper ); err != nil {
1054+ return err
1055+ }
1056+
9931057 slog .InfoContext (ctx , "Sync successful" , "table" , tableName )
9941058
9951059 return nil
@@ -1009,6 +1073,20 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) applyDeletes
10091073 }
10101074 tableName := mapper .Table ()
10111075
1076+ // Handle pre delete hooks first.
1077+ mutationGroups , err := mapper .PreDeleteHook (ctx , s .Client , entitiesToDelete )
1078+ if err != nil {
1079+ return errors .Join (ErrSyncFailedToGetPreDeleteHooks , err )
1080+ }
1081+ for _ , group := range mutationGroups {
1082+ slog .InfoContext (ctx , "Applying pre delete mutations via batch writer" ,
1083+ "count" , len (group .mutations ), "table" , group .tableName )
1084+ err := s .applyNonAtomic (ctx , group .mutations , group .tableName )
1085+ if err != nil {
1086+ return err
1087+ }
1088+ }
1089+
10121090 // Handle manual child deletions first.
10131091 // The `ON DELETE CASCADE` constraint should be the default, but it can fail
10141092 // if a cascade exceeds Spanner's 80k mutation limit.
0 commit comments