@@ -101,13 +101,35 @@ writeToSinkSTM qTVar disconnectedSize connectedSize wasUsedTVar traceObject = do
101101 (q', res) <- ((,) <$> isFullTBQueue q <*> isEmptyTBQueue q) >>= \ case
102102 -- The `TBQueue` is full.
103103 (True , _) -> do
104- flushQueue q
104+ -- Only get the length when you are sure it's needed.
105+ qLen <- lengthTBQueue q
106+ if fromIntegral qLen == connectedSize
107+ then do
108+ -- The queue is full, but if it's a small queue, we
109+ -- can switch it to a big one and give a chance not
110+ -- to flush items to stdout yet.
111+ q' <- growQueue q
112+ pure (q', [] )
113+ else do
114+ -- The big queue is full, will flush it to stdout.
115+ res <- flushTBQueue q
116+ pure (q, res)
105117 -- The `TBQueue` is not full and the `TBQueue` is empty.
106118 (_, True ) -> do
107119 wasUsed <- readTVar wasUsedTVar
108120 q' <- if wasUsed
109- then shrinkQueue q
110- else pure q
121+ then do
122+ -- Get the length when sure it's needed.
123+ qLen <- lengthTBQueue q
124+ if fromIntegral qLen == disconnectedSize
125+ -- If the sink was used, now we can shrink it.
126+ then switchQueue connectedSize
127+ -- Queue is already of the desired capacity.
128+ else pure q
129+ else do
130+ -- The sink was not used, nothing to do here.
131+ pure q
132+ -- This case never flushes.
111133 pure (q', [] )
112134 -- The `TBQueue` is not full and the `TBQueue` is not empty.
113135 (_, _) -> do
@@ -118,32 +140,14 @@ writeToSinkSTM qTVar disconnectedSize connectedSize wasUsedTVar traceObject = do
118140
119141 where
120142
121- -- The queue is full, but if it's a small queue, we can switch it
122- -- to a big one and give a chance not to flush items to stdout yet.
123- flushQueue q = do
124- qLen <- lengthTBQueue q
125- if fromIntegral qLen == connectedSize
126- then do
127- -- The small queue is full, so we have to switch to a big one and
128- -- then flush collected items from the small queue and store them in
129- -- a big one.
130-
131- acceptedItems <- -- trace ("growQueue disconnected" ++ show disconnectedSize) $
132- flushTBQueue q
133- bigQ <- switchQueue disconnectedSize
134- mapM_ (writeTBQueue bigQ) acceptedItems
135- pure (bigQ, [] )
136- else do
137- -- The big queue is full, we have to flush it to stdout.
138- res <- flushTBQueue q
139- pure (q, res)
140-
141- -- if the sink was used and it
142- shrinkQueue q = do
143- qLen <- lengthTBQueue q
144- if fromIntegral qLen == disconnectedSize
145- then switchQueue connectedSize
146- else pure q
143+ -- The small queue is full, so we have to switch to a big one and
144+ -- then flush collected items from the small queue and store them in
145+ -- a big one.
146+ growQueue q = do
147+ acceptedItems <- flushTBQueue q
148+ bigQ <- switchQueue disconnectedSize
149+ mapM_ (writeTBQueue bigQ) acceptedItems
150+ pure bigQ
147151
148152 switchQueue size = do
149153 q' <- newTBQueue (fromIntegral size)
0 commit comments