66 "fmt"
77 "slices"
88 "sort"
9+ "time"
910
1011 "github.com/hookdeck/outpost/internal/redis"
1112)
@@ -26,7 +27,11 @@ type EntityStore interface {
2627}
2728
2829var (
30+ ErrTenantNotFound = errors .New ("tenant does not exist" )
31+ ErrTenantDeleted = errors .New ("tenant has been deleted" )
2932 ErrDuplicateDestination = errors .New ("destination already exists" )
33+ ErrDestinationNotFound = errors .New ("destination does not exist" )
34+ ErrDestinationDeleted = errors .New ("destination has been deleted" )
3035)
3136
3237func redisTenantID (tenantID string ) string {
@@ -89,26 +94,50 @@ func (s *entityStoreImpl) RetrieveTenant(ctx context.Context, tenantID string) (
8994}
9095
9196func (s * entityStoreImpl ) UpsertTenant (ctx context.Context , tenant Tenant ) error {
92- return s .redisClient .HSet (ctx , redisTenantID (tenant .ID ), tenant ).Err ()
97+ key := redisTenantID (tenant .ID )
98+
99+ _ , err := s .redisClient .TxPipelined (ctx , func (pipe redis.Pipeliner ) error {
100+ // Support overriding deleted resources
101+ pipe .Persist (ctx , key )
102+ pipe .HDel (ctx , key , "deleted_at" )
103+
104+ // Set tenant data
105+ pipe .HSet (ctx , key , tenant )
106+ return nil
107+ })
108+
109+ return err
93110}
94111
95112func (s * entityStoreImpl ) DeleteTenant (ctx context.Context , tenantID string ) error {
96113 maxRetries := 100
97114
115+ if exists , err := s .redisClient .Exists (ctx , redisTenantID (tenantID )).Result (); err != nil {
116+ return err
117+ } else if exists == 0 {
118+ return ErrTenantNotFound
119+ }
120+
98121 txf := func (tx * redis.Tx ) error {
99122 destinationIDs , err := s .redisClient .HKeys (ctx , redisTenantDestinationSummaryKey (tenantID )).Result ()
100123 if err != nil {
101124 return err
102125 }
103- _ , err = s .redisClient .TxPipelined (ctx , func (r redis.Pipeliner ) error {
126+ if _ , err := s .redisClient .TxPipelined (ctx , func (pipe redis.Pipeliner ) error {
127+ now := time .Now ()
104128 for _ , destinationID := range destinationIDs {
105- r . Del (ctx , redisDestinationID (destinationID , tenantID ))
129+ s . deleteDestinationOperation (ctx , pipe , redisDestinationID (destinationID , tenantID ), now )
106130 }
107- r .Del (ctx , redisTenantDestinationSummaryKey (tenantID ))
108- r .Del (ctx , redisTenantID (tenantID ))
131+ pipe .Del (ctx , redisTenantDestinationSummaryKey (tenantID ))
132+ tenantKey := redisTenantID (tenantID )
133+ pipe .Del (ctx , tenantKey )
134+ pipe .HSet (ctx , tenantKey , "deleted_at" , now )
135+ pipe .Expire (ctx , tenantKey , 7 * 24 * time .Hour )
109136 return nil
110- })
111- return err
137+ }); err != nil {
138+ return err
139+ }
140+ return nil
112141 }
113142
114143 for i := 0 ; i < maxRetries ; i ++ {
@@ -208,12 +237,13 @@ func (s *entityStoreImpl) RetrieveDestination(ctx context.Context, tenantID, des
208237
209238func (m * entityStoreImpl ) CreateDestination (ctx context.Context , destination Destination ) error {
210239 key := redisDestinationID (destination .ID , destination .TenantID )
211- destinationExists , err := m . redisClient . Exists ( ctx , key ). Result ()
212- if err != nil {
240+ // Check if destination exists
241+ if fields , err := m . redisClient . HGetAll ( ctx , key ). Result (); err != nil {
213242 return err
214- }
215- if destinationExists > 0 {
216- return ErrDuplicateDestination
243+ } else if len (fields ) > 0 {
244+ if _ , isDeleted := fields ["deleted_at" ]; ! isDeleted {
245+ return ErrDuplicateDestination
246+ }
217247 }
218248 return m .UpsertDestination (ctx , destination )
219249}
@@ -229,6 +259,10 @@ func (m *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des
229259 if err != nil {
230260 return err
231261 }
262+ // Support overriding deleted resources
263+ r .Persist (ctx , key )
264+ r .HDel (ctx , key , "deleted_at" )
265+ // Set the new destination values
232266 r .HSet (ctx , key , "id" , destination .ID )
233267 r .HSet (ctx , key , "type" , destination .Type )
234268 r .HSet (ctx , key , "topics" , & destination .Topics )
@@ -247,13 +281,30 @@ func (m *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des
247281}
248282
249283func (s * entityStoreImpl ) DeleteDestination (ctx context.Context , tenantID , destinationID string ) error {
250- _ , err := s .redisClient .TxPipelined (ctx , func (r redis.Pipeliner ) error {
251- if err := r .HDel (ctx , redisTenantDestinationSummaryKey (tenantID ), destinationID ).Err (); err != nil {
252- return err
253- }
254- return r .Del (ctx , redisDestinationID (destinationID , tenantID )).Err ()
255- })
256- return err
284+ key := redisDestinationID (destinationID , tenantID )
285+ summaryKey := redisTenantDestinationSummaryKey (tenantID )
286+
287+ // Check if destination exists
288+ if exists , err := s .redisClient .Exists (ctx , key ).Result (); err != nil {
289+ return err
290+ } else if exists == 0 {
291+ return ErrDestinationNotFound
292+ }
293+
294+ pipe := s .redisClient .Pipeline ()
295+ pipe .HDel (ctx , summaryKey , destinationID )
296+ s .deleteDestinationOperation (ctx , pipe , key , time .Now ())
297+ if _ , err := pipe .Exec (ctx ); err != nil {
298+ return err
299+ }
300+
301+ return nil
302+ }
303+
304+ func (s * entityStoreImpl ) deleteDestinationOperation (ctx context.Context , pipe redis.Pipeliner , key string , ts time.Time ) {
305+ pipe .Del (ctx , key )
306+ pipe .HSet (ctx , key , "deleted_at" , ts )
307+ pipe .Expire (ctx , key , 7 * 24 * time .Hour )
257308}
258309
259310func (s * entityStoreImpl ) MatchEvent (ctx context.Context , event Event ) ([]DestinationSummary , error ) {
0 commit comments