Skip to content

Commit fd3cbec

Browse files
authored
Merge pull request #589 from IntersectMBO/dcoutts/table-union
Table union top level construction
2 parents 2e86159 + 19bf835 commit fd3cbec

File tree

4 files changed

+250
-132
lines changed

4 files changed

+250
-132
lines changed

src/Database/LSMTree/Internal.hs

Lines changed: 165 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ module Database.LSMTree.Internal (
6767
-- * Mutiple writable tables
6868
, duplicate
6969
-- * Table union
70-
, union
7170
, unions
7271
) where
7372

@@ -78,7 +77,7 @@ import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
7877
import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
7978
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
8079
import Control.DeepSeq
81-
import Control.Monad (forM, unless, void, zipWithM_)
80+
import Control.Monad (forM, unless, void)
8281
import Control.Monad.Class.MonadST (MonadST (..))
8382
import Control.Monad.Class.MonadThrow
8483
import Control.Monad.Primitive
@@ -92,7 +91,7 @@ import Data.List.NonEmpty (NonEmpty (..))
9291
import qualified Data.List.NonEmpty as NE
9392
import Data.Map.Strict (Map)
9493
import qualified Data.Map.Strict as Map
95-
import Data.Maybe (catMaybes)
94+
import Data.Maybe (catMaybes, maybeToList)
9695
import qualified Data.Set as Set
9796
import Data.Typeable
9897
import qualified Data.Vector as V
@@ -104,12 +103,14 @@ import Database.LSMTree.Internal.Entry (Entry)
104103
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
105104
ResolveSerialisedValue, lookupsIO)
106105
import Database.LSMTree.Internal.MergeSchedule
106+
import Database.LSMTree.Internal.MergingTree
107107
import Database.LSMTree.Internal.Paths (SessionRoot (..),
108108
SnapshotMetaDataChecksumFile (..),
109109
SnapshotMetaDataFile (..), SnapshotName)
110110
import qualified Database.LSMTree.Internal.Paths as Paths
111111
import Database.LSMTree.Internal.Range (Range (..))
112112
import Database.LSMTree.Internal.Run (Run)
113+
import qualified Database.LSMTree.Internal.Run as Run
113114
import Database.LSMTree.Internal.RunNumber
114115
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
115116
import qualified Database.LSMTree.Internal.RunReaders as Readers
@@ -232,9 +233,6 @@ data LSMTreeError =
232233
Int -- ^ Vector index of table @t1@ involved in the mismatch
233234
Int -- ^ Vector index of table @t2@ involved in the mismatch
234235
-- | 'unions' was called on tables that do not have the same configuration.
235-
| ErrUnionsTableConfigMismatch
236-
Int -- ^ Vector index of table @t1@ involved in the mismatch
237-
Int -- ^ Vector index of table @t2@ involved in the mismatch
238236
deriving stock (Show, Eq)
239237
deriving anyclass (Exception)
240238

@@ -697,24 +695,37 @@ new sesh conf = do
697695
withOpenSession sesh $ \seshEnv ->
698696
withActionRegistry $ \reg -> do
699697
am <- newArenaManager
700-
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
701-
incrUniqCounter (sessionUniqCounter seshEnv)
702-
tableWriteBufferBlobs
703-
<- withRollback reg
704-
(WBB.new (sessionHasFS seshEnv) blobpath)
705-
releaseRef
706-
let tableWriteBuffer = WB.empty
707-
tableLevels = V.empty
708-
tableCache <- mkLevelsCache reg tableLevels
709-
let tc = TableContent {
710-
tableWriteBuffer
711-
, tableWriteBufferBlobs
712-
, tableLevels
713-
, tableCache
714-
, tableUnionLevel = NoUnion
715-
}
698+
tc <- newEmptyTableContent seshEnv reg
716699
newWith reg sesh seshEnv conf am tc
717700

701+
{-# SPECIALISE newEmptyTableContent ::
702+
SessionEnv IO h
703+
-> ActionRegistry IO
704+
-> IO (TableContent IO h) #-}
705+
newEmptyTableContent ::
706+
(PrimMonad m, MonadMask m, MonadMVar m)
707+
=> SessionEnv m h
708+
-> ActionRegistry m
709+
-> m (TableContent m h)
710+
newEmptyTableContent seshEnv reg = do
711+
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
712+
incrUniqCounter (sessionUniqCounter seshEnv)
713+
let tableWriteBuffer = WB.empty
714+
tableWriteBufferBlobs
715+
<- withRollback reg
716+
(WBB.new (sessionHasFS seshEnv) blobpath)
717+
releaseRef
718+
let tableLevels = V.empty
719+
tableCache <- mkLevelsCache reg tableLevels
720+
pure TableContent {
721+
tableWriteBuffer
722+
, tableWriteBufferBlobs
723+
, tableLevels
724+
, tableCache
725+
, tableUnionLevel = NoUnion
726+
}
727+
728+
718729
{-# SPECIALISE newWith ::
719730
ActionRegistry IO
720731
-> Session IO h
@@ -1336,15 +1347,6 @@ duplicate t@Table{..} = do
13361347
Table union
13371348
-------------------------------------------------------------------------------}
13381349

1339-
{-# SPECIALISE union :: Table IO h -> Table IO h -> IO (Table IO h) #-}
1340-
-- | See 'Database.LSMTree.Normal.union'.
1341-
union ::
1342-
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
1343-
=> Table m h
1344-
-> Table m h
1345-
-> m (Table m h)
1346-
union t1 t2 = unions $ t1 :| [t2]
1347-
13481350
{-# SPECIALISE unions :: NonEmpty (Table IO h) -> IO (Table IO h) #-}
13491351
-- | See 'Database.LSMTree.Normal.unions'.
13501352
unions ::
@@ -1359,63 +1361,148 @@ unions ts = do
13591361

13601362
traceWith (sessionTracer sesh) $ TraceUnions (NE.map tableId ts)
13611363

1362-
-- TODO: Do we really need the configs to match exactly? It's okay as a
1363-
-- requirement for now, but we might want to revisit it. Some settings don't
1364-
-- really need to match for things to work, but of course we'd still need to
1365-
-- answer the question of which config to use for the new table, possibly
1366-
-- requiring to supply it manually? Or, we could generalise the exact match
1367-
-- to have a config compatibility test and config merge?
1368-
conf <-
1369-
case match (fmap tableConfig ts) of
1370-
Left (i, j) -> throwIO $ ErrUnionsTableConfigMismatch i j
1371-
Right conf -> pure conf
1364+
-- The TableConfig for the new table is taken from the last table in the
1365+
-- union. This corresponds to the "Data.Map.union updates baseMap" order,
1366+
-- where we take the config from the base map, not the updates.
1367+
--
1368+
-- This could be modified to take the new config as an input. It works to
1369+
-- pick any config because the new table is almost completely fresh. It
1370+
-- will have an empty write buffer and no runs in the normal levels. All
1371+
-- the existing runs get squashed down into a single run before rejoining
1372+
-- as a last level.
1373+
let conf = tableConfig (NE.last ts)
13721374

13731375
-- We acquire a read-lock on the session open-state to prevent races, see
13741376
-- 'sessionOpenTables'.
13751377
modifyWithActionRegistry
13761378
(atomically $ RW.unsafeAcquireReadAccess (sessionState sesh))
1377-
(\_ -> atomically $ RW.unsafeReleaseReadAccess (sessionState sesh)) $ \reg -> \case
1379+
(\_ -> atomically $ RW.unsafeReleaseReadAccess (sessionState sesh)) $
1380+
\reg -> \case
13781381
SessionClosed -> throwIO ErrSessionClosed
13791382
seshState@(SessionOpen seshEnv) -> do
1380-
contents <-
1381-
forM ts $ \t -> do
1382-
withOpenTable t $ \tEnv ->
1383-
-- The table contents escape the read access, but we just added references
1384-
-- to each run so it is safe.
1385-
RW.withReadAccess (tableContent tEnv) (duplicateTableContent reg)
1386-
1387-
content <-
1388-
error "unions: combine contents into merging tree" $ -- TODO
1389-
contents
1390-
1391-
t <-
1392-
newWith
1393-
reg
1394-
sesh
1395-
seshEnv
1396-
conf
1397-
(error "unions: ArenaManager") -- TODO
1398-
content
1399-
1383+
t <- unionsInOpenSession reg sesh seshEnv conf ts
14001384
pure (seshState, t)
14011385

1402-
-- | Like 'matchBy', but the match function is @(==)@.
1403-
match :: Eq a => NonEmpty a -> Either (Int, Int) a
1404-
match = matchBy (==)
1405-
1406-
-- | Check that all values in the list match. If so, return the matched value.
1407-
-- If there is a mismatch, return the list indices of the mismatching values.
1408-
matchBy :: forall a. (a -> a -> Bool) -> NonEmpty a -> Either (Int, Int) a
1409-
matchBy eq (x0 :| xs) =
1410-
case zipWithM_ (matchOne x0) [1..] xs of
1411-
Left i -> Left (0, i)
1412-
Right () -> Right x0
1386+
{-# SPECIALISE unionsInOpenSession ::
1387+
ActionRegistry IO
1388+
-> Session IO h
1389+
-> SessionEnv IO h
1390+
-> TableConfig
1391+
-> NonEmpty (Table IO h)
1392+
-> IO (Table IO h) #-}
1393+
unionsInOpenSession ::
1394+
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m)
1395+
=> ActionRegistry m
1396+
-> Session m h
1397+
-> SessionEnv m h
1398+
-> TableConfig
1399+
-> NonEmpty (Table m h)
1400+
-> m (Table m h)
1401+
unionsInOpenSession reg sesh seshEnv conf ts = do
1402+
1403+
mts <- forM (NE.toList ts) $ \t ->
1404+
withOpenTable t $ \tEnv ->
1405+
RW.withReadAccess (tableContent tEnv) $ \tc ->
1406+
-- tableContentToMergingTree duplicates all runs and merges
1407+
-- so the ones from the tableContent here do not escape
1408+
-- the read access.
1409+
withRollback reg
1410+
(tableContentToMergingTree seshEnv conf tc)
1411+
releaseRef
1412+
mt <- withRollback reg (newPendingUnionMerge mts) releaseRef
1413+
1414+
-- The mts here is a temporary value, since newPendingUnionMerge
1415+
-- will make its own references, so release mts at the end of
1416+
-- the action registry bracket
1417+
forM_ mts (delayedCommit reg . releaseRef)
1418+
1419+
empty <- newEmptyTableContent seshEnv reg
1420+
let content = empty { tableUnionLevel = Union mt }
1421+
1422+
-- Pick the arena manager to optimise the case of:
1423+
-- someUpdates <> bigTableWithLotsOfLookups
1424+
-- by reusing the arena manager from the last one.
1425+
am = tableArenaManager (NE.last ts)
1426+
1427+
newWith reg sesh seshEnv conf am content
1428+
1429+
{-# SPECIALISE tableContentToMergingTree ::
1430+
SessionEnv IO h
1431+
-> TableConfig
1432+
-> TableContent IO h
1433+
-> IO (Ref (MergingTree IO h)) #-}
1434+
tableContentToMergingTree ::
1435+
forall m h.
1436+
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
1437+
=> SessionEnv m h
1438+
-> TableConfig
1439+
-> TableContent m h
1440+
-> m (Ref (MergingTree m h))
1441+
tableContentToMergingTree seshEnv conf
1442+
tc@TableContent {
1443+
tableLevels,
1444+
tableUnionLevel
1445+
} =
1446+
bracket (writeBufferToNewRun seshEnv conf tc)
1447+
(mapM_ releaseRef) $ \mwbr ->
1448+
let runs :: [PreExistingRun m h]
1449+
runs = maybeToList (fmap PreExistingRun mwbr)
1450+
++ concatMap levelToPreExistingRuns (V.toList tableLevels)
1451+
-- any pre-existing union in the input table:
1452+
unionmt = case tableUnionLevel of
1453+
NoUnion -> Nothing
1454+
Union mt -> Just mt
1455+
in newPendingLevelMerge runs unionmt
14131456
where
1414-
matchOne :: a -> Int -> a -> Either Int ()
1415-
matchOne x i y =
1416-
if (x `eq` y)
1417-
then Right ()
1418-
else Left i
1457+
levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
1458+
levelToPreExistingRuns Level{incomingRun, residentRuns} =
1459+
case incomingRun of
1460+
Single r -> PreExistingRun r
1461+
Merging _ _ _ mr -> PreExistingMergingRun mr
1462+
: map PreExistingRun (V.toList residentRuns)
1463+
1464+
--TODO: can we share this or move it to MergeSchedule?
1465+
{-# SPECIALISE writeBufferToNewRun ::
1466+
SessionEnv IO h
1467+
-> TableConfig
1468+
-> TableContent IO h
1469+
-> IO (Maybe (Ref (Run IO h))) #-}
1470+
writeBufferToNewRun ::
1471+
(MonadMask m, MonadST m, MonadSTM m)
1472+
=> SessionEnv m h
1473+
-> TableConfig
1474+
-> TableContent m h
1475+
-> m (Maybe (Ref (Run m h)))
1476+
writeBufferToNewRun SessionEnv {
1477+
sessionRoot = root,
1478+
sessionHasFS = hfs,
1479+
sessionHasBlockIO = hbio,
1480+
sessionUniqCounter = uc
1481+
}
1482+
conf@TableConfig {
1483+
confDiskCachePolicy,
1484+
confFencePointerIndex
1485+
}
1486+
TableContent{
1487+
tableWriteBuffer,
1488+
tableWriteBufferBlobs
1489+
}
1490+
| WB.null tableWriteBuffer = pure Nothing
1491+
| otherwise = Just <$> do
1492+
!n <- incrUniqCounter uc
1493+
let !ln = LevelNo 1
1494+
!cache = diskCachePolicyForLevel confDiskCachePolicy ln
1495+
!alloc = bloomFilterAllocForLevel conf ln
1496+
!indexType = indexTypeForRun confFencePointerIndex
1497+
!path = Paths.runPath (Paths.activeDir root)
1498+
(uniqueToRunNumber n)
1499+
Run.fromWriteBuffer hfs hbio
1500+
cache
1501+
alloc
1502+
indexType
1503+
path
1504+
tableWriteBuffer
1505+
tableWriteBufferBlobs
14191506

14201507
-- | Check that all tables in the session match. If so, return the matched
14211508
-- session. If there is a mismatch, return the list indices of the mismatching

0 commit comments

Comments
 (0)