|
| 1 | +{- |
| 2 | + Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | + All rights reserved. |
| 4 | +
|
| 5 | + This source code is licensed under the BSD-style license found in the |
| 6 | + LICENSE file in the root directory of this source tree. |
| 7 | +-} |
| 8 | +{-# LANGUAGE TypeApplications #-} |
| 9 | + |
| 10 | +-- | S3 backup backend for Glean. |
| 11 | +module Glean.Database.Backup.S3 (withS3Backups, fakeS3Backend) where |
| 12 | + |
| 13 | +import Control.Exception.Safe (throwIO) |
| 14 | +import Control.Monad ((<=<), (>=>)) |
| 15 | +import Data.ByteString (ByteString) |
| 16 | +import qualified Data.ByteString as BS |
| 17 | +import qualified Data.ByteString.Builder as BB |
| 18 | +import qualified Data.ByteString.Lazy as LBS |
| 19 | +import Data.Default (Default (..)) |
| 20 | +import Data.Functor (void) |
| 21 | +import qualified Data.HashMap.Strict as HashMap |
| 22 | +import qualified Data.Map as Map |
| 23 | +import Data.Maybe (catMaybes) |
| 24 | +import qualified Data.Set as Set |
| 25 | +import Data.Text (Text) |
| 26 | +import qualified Data.Text as Text |
| 27 | +import qualified Data.Text.Encoding as Text |
| 28 | +import qualified Network.HTTP.Client as HTTP |
| 29 | +import System.IO.Unsafe (unsafePerformIO) |
| 30 | +import UnliftIO (IORef, atomicModifyIORef', newIORef, readIORef, writeIORef) |
| 31 | +import UnliftIO.Async (forConcurrently) |
| 32 | +import UnliftIO.Exception.Lens |
| 33 | + |
| 34 | +import qualified Amazonka as AWS |
| 35 | +import qualified Amazonka.S3 as S3 |
| 36 | +import Amazonka.S3.DeleteObject |
| 37 | +import Amazonka.S3.GetObject |
| 38 | +import Amazonka.S3.HeadObject |
| 39 | +import Amazonka.S3.ListObjectsV2 |
| 40 | +import Amazonka.S3.PutObject |
| 41 | +import Amazonka.S3.Types.Object |
| 42 | +import Conduit |
| 43 | +import Lens.Micro |
| 44 | + |
| 45 | +import qualified Glean.Database.Config as DBConfig |
| 46 | +import Glean.Database.Backup.Backend |
| 47 | +import Glean.Database.Exception |
| 48 | +import Glean.Internal.Types (Meta (..)) |
| 49 | +import Glean.Types (Repo (..)) |
| 50 | +import qualified Glean.Types as Thrift hiding (Exception) |
| 51 | +import Glean.Util.Some |
| 52 | +import qualified Thrift.Protocol.JSON as Thrift |
| 53 | + |
| 54 | +withS3Backups :: DBConfig.Config -> DBConfig.Config |
| 55 | +withS3Backups cfg@DBConfig.Config{..} = |
| 56 | + cfg{DBConfig.cfgBackupBackends = cfgBackupBackends <> HashMap.fromList [("s3", Some (def @S3Backend))]} |
| 57 | + |
| 58 | +-- | S3 backup backend, which auto discovers its environment configuration. |
| 59 | +data S3Backend = S3Backend {s3AwsEnvLazy :: LazyInit AWS.Env, s3BucketFactory :: (LazyInit AWS.Env -> Text -> Some S3BucketApi)} |
| 60 | + |
| 61 | +instance Default S3Backend where |
| 62 | + def = newS3Backend (\env name -> GenuineS3Bucket env (S3.BucketName name)) |
| 63 | + |
| 64 | +newS3Backend :: (S3BucketApi bucket) => (LazyInit AWS.Env -> Text -> bucket) -> S3Backend |
| 65 | +newS3Backend factory = |
| 66 | + S3Backend |
| 67 | + { s3AwsEnvLazy = LazyInit{initializer = AWS.newEnv AWS.discover, value = unsafePerformIO . newIORef $ Nothing} |
| 68 | + , s3BucketFactory = \env name -> Some (factory env name) |
| 69 | + } |
| 70 | + |
| 71 | +-- | Creates a new fake in-memory S3 backend. |
| 72 | +fakeS3Backend :: (MonadIO m) => m S3Backend |
| 73 | +fakeS3Backend = do |
| 74 | + fakeFiles <- newIORef Map.empty |
| 75 | + pure $ newS3Backend (\_env name -> FakeS3Bucket{fakeBucketName = name, fakeFiles}) |
| 76 | + |
| 77 | +instance Backend S3Backend where |
| 78 | + fromPath S3Backend{s3AwsEnvLazy, s3BucketFactory} path = do |
| 79 | + let (bucketName, bucketBasePath') = Text.breakOn "/" path |
| 80 | + (_, bucketBasePath) <- Text.uncons bucketBasePath' |
| 81 | + Just . Some $ |
| 82 | + S3Site |
| 83 | + { s3Client = s3BucketFactory s3AwsEnvLazy bucketName |
| 84 | + , bucketBasePath |
| 85 | + } |
| 86 | + |
| 87 | +-- | File in an S3 bucket. |
| 88 | +data Item = Item |
| 89 | + { itemPath :: Text |
| 90 | + } |
| 91 | + |
| 92 | +type Metadata = HashMap.HashMap Text Text |
| 93 | + |
| 94 | +-- | A client to an S3 bucket, as a seam for unit testing purposes. |
| 95 | +class S3BucketApi a where |
| 96 | + -- | Gets the name of the bucket this is a client for. |
| 97 | + bucketName :: a -> Text |
| 98 | + |
| 99 | + {- | Lists items with a prefix. |
| 100 | +
|
| 101 | + We aren't using the delimiter functionality (which does dedupe by common |
| 102 | + prefixes) as we need both the meta and the file itself to exist for |
| 103 | + consistency, so we need to catch both of those. |
| 104 | +
|
| 105 | + This also doesn't concern itself with pagination because our consuming API |
| 106 | + doesn't either. |
| 107 | + -} |
| 108 | + listItemsWithPrefix :: (MonadResource m, MonadUnliftIO m) => a -> Text -> m [Item] |
| 109 | + |
| 110 | + -- | Uploads a file on disk to the given path in the bucket. |
| 111 | + uploadFile :: (MonadResource m, MonadUnliftIO m) => a -> Text -> Metadata -> AWS.RequestBody -> m () |
| 112 | + |
| 113 | + {- | Downloads a file from S3. |
| 114 | +
|
| 115 | + Throws for any error but absent key. |
| 116 | + -} |
| 117 | + downloadFile :: (MonadResource m, MonadUnliftIO m) => a -> Text -> m (Maybe (Metadata, AWS.ResponseBody)) |
| 118 | + |
| 119 | + {- | Checks a file exists on S3. |
| 120 | +
|
| 121 | + Throws for any error but absent key. |
| 122 | + -} |
| 123 | + headFile :: (MonadResource m, MonadUnliftIO m) => a -> Text -> m (Maybe Metadata) |
| 124 | + |
| 125 | + -- | Deletes a file in the bucket. |
| 126 | + deleteFile :: (MonadResource m, MonadUnliftIO m) => a -> Text -> m () |
| 127 | + |
| 128 | +instance S3BucketApi (Some S3BucketApi) where |
| 129 | + bucketName (Some bucket) = bucketName bucket |
| 130 | + listItemsWithPrefix (Some bucket) = listItemsWithPrefix bucket |
| 131 | + uploadFile (Some bucket) = uploadFile bucket |
| 132 | + downloadFile (Some bucket) = downloadFile bucket |
| 133 | + headFile (Some bucket) = headFile bucket |
| 134 | + deleteFile (Some bucket) = deleteFile bucket |
| 135 | + |
| 136 | +-- TODO(jadel): surely there's a better way |
| 137 | +data LazyInit a = LazyInit {initializer :: IO a, value :: IORef (Maybe a)} |
| 138 | + |
| 139 | +getOrInit :: LazyInit b -> IO b |
| 140 | +getOrInit LazyInit{initializer, value} = do |
| 141 | + actual <- readIORef value |
| 142 | + case actual of |
| 143 | + Just v -> pure v |
| 144 | + Nothing -> do |
| 145 | + v <- initializer |
| 146 | + writeIORef value (Just v) |
| 147 | + pure v |
| 148 | + |
| 149 | +data GenuineS3Bucket = GenuineS3Bucket |
| 150 | + { awsEnvLazy :: LazyInit AWS.Env |
| 151 | + , s3BucketName :: S3.BucketName |
| 152 | + } |
| 153 | + |
| 154 | +instance S3BucketApi GenuineS3Bucket where |
| 155 | + bucketName = (^. S3._BucketName) . s3BucketName |
| 156 | + |
| 157 | + listItemsWithPrefix GenuineS3Bucket{awsEnvLazy, s3BucketName} prefix = do |
| 158 | + awsEnv <- liftIO $ getOrInit awsEnvLazy |
| 159 | + let request = |
| 160 | + newListObjectsV2 s3BucketName |
| 161 | + & listObjectsV2_prefix |
| 162 | + ?~ prefix |
| 163 | + runConduit $ |
| 164 | + AWS.paginate awsEnv request |
| 165 | + .| concatMapC (^. listObjectsV2Response_contents) |
| 166 | + .| concatC |
| 167 | + .| mapC (\obj -> Item{itemPath = obj ^. object_key . S3._ObjectKey}) |
| 168 | + .| sinkList |
| 169 | + |
| 170 | + uploadFile GenuineS3Bucket{awsEnvLazy, s3BucketName} name metadata body = do |
| 171 | + awsEnv <- liftIO $ getOrInit awsEnvLazy |
| 172 | + let req = |
| 173 | + newPutObject s3BucketName (S3.ObjectKey name) body |
| 174 | + & (putObject_metadata .~ metadata) |
| 175 | + void . AWS.send awsEnv $ req |
| 176 | + |
| 177 | + headFile GenuineS3Bucket{awsEnvLazy, s3BucketName} name = do |
| 178 | + awsEnv <- liftIO $ getOrInit awsEnvLazy |
| 179 | + let req = newHeadObject s3BucketName (S3.ObjectKey name) |
| 180 | + handling_ S3._NoSuchKey (pure Nothing) $ Just . (^. headObjectResponse_metadata) <$> AWS.send awsEnv req |
| 181 | + |
| 182 | + downloadFile GenuineS3Bucket{awsEnvLazy, s3BucketName} name = do |
| 183 | + awsEnv <- liftIO $ getOrInit awsEnvLazy |
| 184 | + let req = newGetObject s3BucketName (S3.ObjectKey name) |
| 185 | + handling_ S3._NoSuchKey (pure Nothing) $ Just . (\r -> (r ^. getObjectResponse_metadata, r ^. getObjectResponse_body)) <$> AWS.send awsEnv req |
| 186 | + |
| 187 | + deleteFile GenuineS3Bucket{awsEnvLazy, s3BucketName} name = do |
| 188 | + awsEnv <- liftIO $ getOrInit awsEnvLazy |
| 189 | + let req = newDeleteObject s3BucketName (S3.ObjectKey name) |
| 190 | + void $ AWS.send awsEnv req |
| 191 | + |
| 192 | +data FakeS3Bucket = FakeS3Bucket |
| 193 | + { fakeFiles :: IORef (Map.Map Text (Metadata, ByteString)) |
| 194 | + , fakeBucketName :: Text |
| 195 | + } |
| 196 | + |
| 197 | +-- FIXME(jadel): upstream this to http-client |
| 198 | +materializeRequestBody :: HTTP.RequestBody -> IO LBS.ByteString |
| 199 | +materializeRequestBody = \case |
| 200 | + HTTP.RequestBodyLBS lbs -> pure lbs |
| 201 | + HTTP.RequestBodyBS bs -> pure (BS.fromStrict bs) |
| 202 | + HTTP.RequestBodyBuilder _ b -> pure (BB.toLazyByteString b) |
| 203 | + HTTP.RequestBodyStream _ g -> withPopper g |
| 204 | + HTTP.RequestBodyStreamChunked g -> withPopper g |
| 205 | + HTTP.RequestBodyIO io -> materializeRequestBody =<< io |
| 206 | + where |
| 207 | + withPopper :: HTTP.GivesPopper () -> IO LBS.ByteString |
| 208 | + withPopper giver = do |
| 209 | + r <- newIORef [] |
| 210 | + giver $ do writeIORef r <=< getChunks |
| 211 | + LBS.fromChunks <$> readIORef r |
| 212 | + |
| 213 | + getChunks :: IO ByteString -> IO [ByteString] |
| 214 | + getChunks io = |
| 215 | + io >>= \case |
| 216 | + "" -> pure [] |
| 217 | + chunk -> (chunk :) <$> getChunks io |
| 218 | + |
| 219 | +instance S3BucketApi FakeS3Bucket where |
| 220 | + bucketName = fakeBucketName |
| 221 | + |
| 222 | + listItemsWithPrefix FakeS3Bucket{fakeFiles} prefix = do |
| 223 | + fmap Item . filter (prefix `Text.isPrefixOf`) . Map.keys <$> readIORef fakeFiles |
| 224 | + |
| 225 | + uploadFile FakeS3Bucket{fakeFiles} name metadata body = do |
| 226 | + body <- liftIO . materializeRequestBody $ AWS.toRequestBody body |
| 227 | + atomicModifyIORef' fakeFiles (\files -> (files <> Map.singleton name (metadata, BS.toStrict body), ())) |
| 228 | + |
| 229 | + headFile FakeS3Bucket{fakeFiles} name = do |
| 230 | + fmap fst . Map.lookup name <$> readIORef fakeFiles |
| 231 | + |
| 232 | + downloadFile FakeS3Bucket{fakeFiles} name = do |
| 233 | + fmap (\(meta, body) -> (meta, AWS.ResponseBody $ yield body)) . Map.lookup name <$> readIORef fakeFiles |
| 234 | + |
| 235 | + deleteFile FakeS3Bucket{fakeFiles} name = do |
| 236 | + atomicModifyIORef' fakeFiles ((,()) . (`Map.withoutKeys` (Set.singleton name))) |
| 237 | + |
| 238 | +data S3Site = S3Site |
| 239 | + { s3Client :: Some S3BucketApi |
| 240 | + , bucketBasePath :: Text |
| 241 | + } |
| 242 | + |
| 243 | +makeRepoPath :: Text -> Repo -> Text |
| 244 | +makeRepoPath bucketBasePath Repo{repo_name, repo_hash} = Text.intercalate "/" [bucketBasePath, repo_name, repo_hash] |
| 245 | + |
| 246 | +dbPath :: Text -> Text |
| 247 | +dbPath = (<> ".tar.gz") |
| 248 | + |
| 249 | +metadataKey :: Text |
| 250 | +metadataKey = "glean-metadata" |
| 251 | + |
| 252 | +parseMeta :: (MonadIO m, MonadThrow m) => Repo -> Text -> m Meta |
| 253 | +parseMeta repo header = do |
| 254 | + parsed <- either (dbError repo) pure . Thrift.deserializeJSON . Text.encodeUtf8 $ header |
| 255 | + pure parsed |
| 256 | + |
| 257 | +instance Site S3Site where |
| 258 | + backup S3Site{s3Client, bucketBasePath} repo meta _ttl fromPath = runResourceT $ do |
| 259 | + let repoPath = makeRepoPath bucketBasePath repo |
| 260 | + body <- AWS.chunkedFile AWS.defaultChunkSize fromPath |
| 261 | + |
| 262 | + -- https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html#UserMetadata |
| 263 | + -- Arbitrary printable Unicode characters only means that any binary |
| 264 | + -- encoding would force us to base64 it, which seems like a hassle and |
| 265 | + -- maybe not even more compact. |
| 266 | + let meta' = HashMap.singleton metadataKey (Text.decodeUtf8 $ Thrift.serializeJSON meta) |
| 267 | + _ <- uploadFile s3Client (dbPath repoPath) meta' body |
| 268 | + pure $ Data (fromIntegral $ AWS.contentLength body) |
| 269 | + |
| 270 | + delete S3Site{s3Client, bucketBasePath} repo = runResourceT $ do |
| 271 | + let repoPath = makeRepoPath bucketBasePath repo |
| 272 | + deleteFile s3Client (dbPath repoPath) |
| 273 | + |
| 274 | + restore S3Site{s3Client, bucketBasePath} repo intoPath = runResourceT $ do |
| 275 | + let repoPath = makeRepoPath bucketBasePath repo |
| 276 | + res <- downloadFile s3Client (dbPath repoPath) |
| 277 | + case res of |
| 278 | + Just (meta, repoStream) |
| 279 | + | Just metaJson <- HashMap.lookup metadataKey meta -> do |
| 280 | + meta <- parseMeta repo metaJson |
| 281 | + runConduit $ AWS.sinkBody repoStream (sinkFile intoPath) |
| 282 | + pure meta |
| 283 | + _ -> throwIO . Thrift.InvalidLocator $ "locator is missing either metadata or db.tar.gz" <> (Text.pack . show) repo |
| 284 | + |
| 285 | + inspect S3Site{s3Client, bucketBasePath} repo = runResourceT $ do |
| 286 | + let repoPath = makeRepoPath bucketBasePath repo |
| 287 | + res <- headFile s3Client (dbPath repoPath) |
| 288 | + |
| 289 | + case res of |
| 290 | + Just meta |
| 291 | + | Just metaJson <- HashMap.lookup metadataKey meta -> |
| 292 | + parseMeta repo metaJson |
| 293 | + _ -> throwIO . Thrift.InvalidLocator $ "locator is missing either metadata or db.tar.gz: " <> (Text.pack . show) repo |
| 294 | + |
| 295 | + enumerate site@S3Site{s3Client, bucketBasePath} = runResourceT $ do |
| 296 | + items <- listItemsWithPrefix s3Client bucketBasePath |
| 297 | + let parsed = catMaybes $ map parseItemFilename items |
| 298 | + forConcurrently parsed $ \repo -> (repo,) <$> liftIO (inspect site repo) |
| 299 | + where |
| 300 | + parseItemFilename = |
| 301 | + (Text.stripPrefix (bucketBasePath <> "/") . itemPath) |
| 302 | + >=> Text.stripSuffix ".tar.gz" |
| 303 | + >=> splitFilename |
| 304 | + -- >>> splitFilename "myrepo/123" |
| 305 | + -- Just (Repo "myrepo" "123") |
| 306 | + splitFilename name |
| 307 | + | let (withTrailingSlash, repo_hash) = Text.breakOnEnd "/" name |
| 308 | + , Just (repo_name, _slash) <- Text.unsnoc withTrailingSlash = |
| 309 | + Just Repo{repo_name, repo_hash} |
| 310 | + splitFilename _name = Nothing |
| 311 | + |
| 312 | + toPath S3Site{s3Client, bucketBasePath} = bucketName s3Client <> "/" <> bucketBasePath |
0 commit comments