@@ -92,7 +92,7 @@ import Data.List.NonEmpty (NonEmpty (..))
9292import qualified Data.List.NonEmpty as NE
9393import Data.Map.Strict (Map )
9494import qualified Data.Map.Strict as Map
95- import Data.Maybe (catMaybes )
95+ import Data.Maybe (catMaybes , maybeToList )
9696import qualified Data.Set as Set
9797import Data.Typeable
9898import qualified Data.Vector as V
@@ -104,12 +104,14 @@ import Database.LSMTree.Internal.Entry (Entry)
104104import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy ,
105105 ResolveSerialisedValue , lookupsIO )
106106import Database.LSMTree.Internal.MergeSchedule
107+ import Database.LSMTree.Internal.MergingTree
107108import Database.LSMTree.Internal.Paths (SessionRoot (.. ),
108109 SnapshotMetaDataChecksumFile (.. ),
109110 SnapshotMetaDataFile (.. ), SnapshotName )
110111import qualified Database.LSMTree.Internal.Paths as Paths
111112import Database.LSMTree.Internal.Range (Range (.. ))
112113import Database.LSMTree.Internal.Run (Run )
114+ import qualified Database.LSMTree.Internal.Run as Run
113115import Database.LSMTree.Internal.RunNumber
114116import Database.LSMTree.Internal.RunReaders (OffsetKey (.. ))
115117import qualified Database.LSMTree.Internal.RunReaders as Readers
@@ -1369,42 +1371,149 @@ unions ts = do
13691371
13701372 traceWith (sessionTracer sesh) $ TraceUnions (NE. map tableId ts)
13711373
1372- -- The TableConfig for the new table is taken from the first / left
1373- -- table in the union. This works because the new table is almost
1374- -- completely fresh. It will have an empty write buffer and no runs
1375- -- in the normal levels. All the existing runs get squashed down into
1376- -- a single run before rejoining as a last level.
1377- let conf = tableConfig (NE. head ts)
1374+ -- The TableConfig for the new table is taken from the last table in the
1375+ -- union. This corresponds to the "Data.Map.union updates baseMap" order,
1376+ -- where we take the config from the base map, not the updates.
1377+ --
1378+ -- This could be modified to take the new config as an input. It works to
1379+ -- pick any config because the new table is almost completely fresh. It
1380+ -- will have an empty write buffer and no runs in the normal levels. All
1381+ -- the existing runs get squashed down into a single run before rejoining
1382+ -- as a last level.
1383+ let conf = tableConfig (NE. last ts)
13781384
13791385 -- We acquire a read-lock on the session open-state to prevent races, see
13801386 -- 'sessionOpenTables'.
13811387 modifyWithActionRegistry
13821388 (atomically $ RW. unsafeAcquireReadAccess (sessionState sesh))
1383- (\ _ -> atomically $ RW. unsafeReleaseReadAccess (sessionState sesh)) $ \ reg -> \ case
1389+ (\ _ -> atomically $ RW. unsafeReleaseReadAccess (sessionState sesh)) $
1390+ \ reg -> \ case
13841391 SessionClosed -> throwIO ErrSessionClosed
13851392 seshState@ (SessionOpen seshEnv) -> do
1386- contents <-
1387- forM ts $ \ t -> do
1388- withOpenTable t $ \ tEnv ->
1389- -- The table contents escape the read access, but we just added references
1390- -- to each run so it is safe.
1391- RW. withReadAccess (tableContent tEnv) (duplicateTableContent reg)
1392-
1393- content <-
1394- error " unions: combine contents into merging tree" $ -- TODO
1395- contents
1396-
1397- t <-
1398- newWith
1399- reg
1400- sesh
1401- seshEnv
1402- conf
1403- (error " unions: ArenaManager" ) -- TODO
1404- content
1405-
1393+ t <- unionsInOpenSession reg sesh seshEnv conf ts
14061394 pure (seshState, t)
14071395
1396+ {-# SPECIALISE unionsInOpenSession ::
1397+ ActionRegistry IO
1398+ -> Session IO h
1399+ -> SessionEnv IO h
1400+ -> TableConfig
1401+ -> NonEmpty (Table IO h)
1402+ -> IO (Table IO h) #-}
1403+ unionsInOpenSession ::
1404+ (MonadSTM m , MonadMask m , MonadMVar m , MonadST m )
1405+ => ActionRegistry m
1406+ -> Session m h
1407+ -> SessionEnv m h
1408+ -> TableConfig
1409+ -> NonEmpty (Table m h )
1410+ -> m (Table m h )
1411+ unionsInOpenSession reg sesh seshEnv conf ts = do
1412+
1413+ mts <- forM (NE. toList ts) $ \ t ->
1414+ withOpenTable t $ \ tEnv ->
1415+ RW. withReadAccess (tableContent tEnv) $ \ tc ->
1416+ -- tableContentToMergingTree duplicates all runs and merges
1417+ -- so the ones from the tableContent here do not escape
1418+ -- the read access.
1419+ withRollback reg
1420+ (tableContentToMergingTree seshEnv conf tc)
1421+ releaseRef
1422+ mt <- withRollback reg (newPendingUnionMerge mts) releaseRef
1423+
1424+ -- The mts here is a temporary value, since newPendingUnionMerge
1425+ -- will make its own references, so release mts at the end of
1426+ -- the action registry bracket
1427+ forM_ mts (delayedCommit reg . releaseRef)
1428+
1429+ empty <- newEmptyTableContent seshEnv reg
1430+ let content = empty { tableUnionLevel = Union mt }
1431+
1432+ -- Pick the arena manager to optimise the case of:
1433+ -- someUpdates <> bigTableWithLotsOfLookups
1434+ -- by reusing the arena manager from the last one.
1435+ am = tableArenaManager (NE. last ts)
1436+
1437+ newWith reg sesh seshEnv conf am content
1438+
1439+ {-# SPECIALISE tableContentToMergingTree ::
1440+ SessionEnv IO h
1441+ -> TableConfig
1442+ -> TableContent IO h
1443+ -> IO (Ref (MergingTree IO h)) #-}
1444+ tableContentToMergingTree ::
1445+ forall m h .
1446+ (MonadMask m , MonadMVar m , MonadST m , MonadSTM m )
1447+ => SessionEnv m h
1448+ -> TableConfig
1449+ -> TableContent m h
1450+ -> m (Ref (MergingTree m h ))
1451+ tableContentToMergingTree seshEnv conf
1452+ tc@ TableContent {
1453+ tableLevels,
1454+ tableUnionLevel
1455+ } =
1456+ bracket (writeBufferToNewRun seshEnv conf tc)
1457+ (mapM_ releaseRef) $ \ mwbr ->
1458+ let runs :: [PreExistingRun m h ]
1459+ runs = maybeToList (fmap PreExistingRun mwbr)
1460+ ++ concatMap levelToPreExistingRuns (V. toList tableLevels)
1461+ -- any pre-existing union in the input table:
1462+ unionmt = case tableUnionLevel of
1463+ NoUnion -> Nothing
1464+ Union mt -> Just mt
1465+ in newPendingLevelMerge runs unionmt
1466+ where
1467+ levelToPreExistingRuns :: Level m h -> [PreExistingRun m h ]
1468+ levelToPreExistingRuns Level {incomingRun, residentRuns} =
1469+ case incomingRun of
1470+ Single r -> PreExistingRun r
1471+ Merging _ _ _ mr -> PreExistingMergingRun mr
1472+ : map PreExistingRun (V. toList residentRuns)
1473+
1474+ -- TODO: can we share this or move it to MergeSchedule?
1475+ {-# SPECIALISE writeBufferToNewRun ::
1476+ SessionEnv IO h
1477+ -> TableConfig
1478+ -> TableContent IO h
1479+ -> IO (Maybe (Ref (Run IO h))) #-}
1480+ writeBufferToNewRun ::
1481+ (MonadMask m , MonadST m , MonadSTM m )
1482+ => SessionEnv m h
1483+ -> TableConfig
1484+ -> TableContent m h
1485+ -> m (Maybe (Ref (Run m h )))
1486+ writeBufferToNewRun SessionEnv {
1487+ sessionRoot = root,
1488+ sessionHasFS = hfs,
1489+ sessionHasBlockIO = hbio,
1490+ sessionUniqCounter = uc
1491+ }
1492+ conf@ TableConfig {
1493+ confDiskCachePolicy,
1494+ confFencePointerIndex
1495+ }
1496+ TableContent {
1497+ tableWriteBuffer,
1498+ tableWriteBufferBlobs
1499+ }
1500+ | WB. null tableWriteBuffer = pure Nothing
1501+ | otherwise = Just <$> do
1502+ ! n <- incrUniqCounter uc
1503+ let ! ln = LevelNo 1
1504+ ! cache = diskCachePolicyForLevel confDiskCachePolicy ln
1505+ ! alloc = bloomFilterAllocForLevel conf ln
1506+ ! indexType = indexTypeForRun confFencePointerIndex
1507+ ! path = Paths. runPath (Paths. activeDir root)
1508+ (uniqueToRunNumber n)
1509+ Run. fromWriteBuffer hfs hbio
1510+ cache
1511+ alloc
1512+ indexType
1513+ path
1514+ tableWriteBuffer
1515+ tableWriteBufferBlobs
1516+
14081517-- | Check that all tables in the session match. If so, return the matched
14091518-- session. If there is a mismatch, return the list indices of the mismatching
14101519-- tables.
0 commit comments