3
3
-- This module provides a routine for sorting events in constant-space via
4
4
-- on-disk merge sort.
5
5
module GHC.RTS.Events.Sort
6
- ( GHC.RTS.Events.Sort. sortEvents
6
+ ( sortEvents
7
+ , sortEvents'
8
+ , SortParams (.. )
9
+ , defaultSortParams
7
10
) where
8
11
9
12
import Data.Traversable
@@ -20,24 +23,11 @@ import Data.Binary.Put as P
20
23
import qualified Data.ByteString.Lazy as BSL
21
24
import qualified Data.Sequence as S
22
25
23
- import GHC.RTS.Events
26
+ import GHC.RTS.Events hiding ( sortEvents )
24
27
import GHC.RTS.Events.Binary (putEventLog )
25
28
26
29
type SortedChunk = FilePath
27
30
28
- -- | The chunk size which the input eventlog is broken into (in events). This
29
- -- determines the upper-bound on memory usage during the sorting process.
30
- --
31
- -- This value is a reasonable trade-off between memory and computation,
32
- -- requiring approximately 100MBytes while sorting a "typical" eventlog.
33
- cHUNK_SIZE :: Int
34
- cHUNK_SIZE = 500 * 1000
35
-
36
- -- | Maximum number of chunks to merge at once. Determined by the largest
37
- -- number of file descriptors we can safely open at once.
38
- fAN_IN :: Int
39
- fAN_IN = 256
40
-
41
31
newtype OnTime = OnTime Event
42
32
43
33
instance Ord OnTime where
@@ -46,39 +36,75 @@ instance Ord OnTime where
46
36
instance Eq OnTime where
47
37
(==) = coerce ((==) `on` evTime)
48
38
39
+ -- | Parameters which determine the behavior of the merge sort.
40
+ data SortParams = SortParams
41
+ { -- | The chunk size which the input eventlog is broken into (in events). This
42
+ -- determines the upper-bound on memory usage during the sorting process.
43
+ --
44
+ -- This value is a reasonable trade-off between memory and computation,
45
+ -- requiring approximately 100MBytes while sorting a "typical" eventlog.
46
+ chunkSize :: ! Int
47
+
48
+ -- | Maximum number of chunks to merge at once. Determined by the largest
49
+ -- number of file descriptors we can safely open at once.
50
+ , maxFanIn :: ! Int
51
+ }
52
+
53
+ -- | A reasonable set of sorting parameters.
54
+ defaultSortParams :: SortParams
55
+ defaultSortParams =
56
+ SortParams { chunkSize = 500 * 1000
57
+ , maxFanIn = 256
58
+ }
59
+
49
60
-- | @sortEvents tmpDir outPath eventlog@ sorts @eventlog@ via on-disk merge
50
61
-- sort, using @tmpDir@ for intermediate data. The sorted eventlog is written
51
62
-- to @eventlog@.
52
- sortEvents :: FilePath -- ^ temporary directory
53
- -> FilePath -- ^ output eventlog file path
54
- -> EventLog -- ^ eventlog to sort
55
- -> IO ()
56
- sortEvents _tmpDir _outPath (EventLog _ (Data [] )) = fail " sortEvents: no events"
57
- sortEvents tmpDir outPath (EventLog hdr (Data events0)) = do
63
+ sortEvents
64
+ :: FilePath -- ^ temporary directory
65
+ -> FilePath -- ^ output eventlog file path
66
+ -> EventLog -- ^ eventlog to sort
67
+ -> IO ()
68
+ sortEvents = sortEvents' defaultSortParams
69
+
70
+ -- | @sortEvents' params tmpDir outPath eventlog@ sorts
71
+ -- @eventlog@ via on-disk merge sort, using @tmpDir@ for
72
+ -- intermediate data. The sorted eventlog is written to
73
+ -- @eventlog@.
74
+ sortEvents'
75
+ :: SortParams
76
+ -> FilePath -- ^ temporary directory
77
+ -> FilePath -- ^ output eventlog file path
78
+ -> EventLog -- ^ eventlog to sort
79
+ -> IO ()
80
+ sortEvents' _params _tmpDir _outPath (EventLog _ (Data [] )) = fail " sortEvents: no events"
81
+ sortEvents' params tmpDir outPath (EventLog hdr (Data events0)) = do
58
82
chunks <- toSortedChunks events0
59
83
hdl <- openBinaryFile outPath WriteMode
60
84
mergeChunks' hdl chunks
61
85
hClose hdl
62
86
return ()
63
87
where
88
+ SortParams chunkSize fanIn = params
89
+
64
90
toSortedChunks :: [Event ] -> IO (S. Seq SortedChunk )
65
91
toSortedChunks =
66
92
fmap S. fromList
67
93
. mapM (writeTempChunk . sortEventsInMem)
68
- . chunksOf cHUNK_SIZE
94
+ . chunksOf chunkSize
69
95
70
96
mergeChunks' :: Handle -> S. Seq SortedChunk -> IO ()
71
97
mergeChunks' destFile chunks
72
98
| S. null chunks =
73
99
fail " sortEvents: this can't happen"
74
- | S. length chunks <= fAN_IN = do
100
+ | S. length chunks <= fanIn = do
75
101
events <- mapM readChunk chunks
76
102
let sorted = mergeSort $ toList (coerce events :: S. Seq [OnTime ])
77
103
writeChunk destFile (coerce sorted)
78
104
mapM_ removeFile chunks
79
105
hClose destFile
80
106
| otherwise = do
81
- chunksss <- flip mapM (nChunks fAN_IN chunks) $ \ fps -> do
107
+ chunksss <- flip mapM (nChunks fanIn chunks) $ \ fps -> do
82
108
(fp, hdl) <- createTempChunk
83
109
mergeChunks' hdl fps
84
110
return fp
0 commit comments