@@ -505,7 +505,28 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
505505 withLog " setQueueService" st $ \ sl -> logQueueService sl rId party serviceId
506506
507507 setQueueServices _ _ _ [] = pure $ Right M. empty
508- setQueueServices _ _ _ _ = pure $ Right M. empty -- TODO batch implementation
508+ setQueueServices st party serviceId qs = E. uninterruptibleMask_ $ runExceptT $ do
509+ updated <- S. fromList <$> withDB' " setQueueServices" st (\ db ->
510+ map fromOnly <$> DB. query db updateQuery (serviceId, In (map recipientId qs)))
511+ forM_ qs $ \ sq -> when (S. member (recipientId sq) updated) $ do
512+ ExceptT $ readQueueRecIO (queueRec sq) >>= \ case
513+ Left e -> pure $ Left e
514+ Right q -> runExceptT $ do
515+ let q' = updateRec q
516+ atomically $ writeTVar (queueRec sq) $ Just q'
517+ withLog " setQueueServices" st $ \ sl -> logQueueService sl (recipientId sq) party serviceId
518+ pure $ M. fromList [(recipientId sq, if S. member (recipientId sq) updated then Right () else Left AUTH ) | sq <- qs]
519+ where
520+ updateQuery = case party of
521+ SRecipientService ->
522+ " UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id IN ? AND deleted_at IS NULL RETURNING recipient_id"
523+ SNotifierService ->
524+ " UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL RETURNING recipient_id"
525+ updateRec q = case party of
526+ SRecipientService -> q {rcvServiceId = serviceId}
527+ SNotifierService -> case notifier q of
528+ Just nc -> q {notifier = Just nc {ntfServiceId = serviceId}}
529+ Nothing -> q
509530
510531 getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId , a )] -> IO (Either ErrorType ([(Maybe ServiceId , [(NotifierId , a )])], [(NotifierId , a )]))
511532 getQueueNtfServices st ntfs = E. uninterruptibleMask_ $ runExceptT $ do
0 commit comments