Skip to content

Commit 743645a

Browse files
committed
Address overlapping sections and add test
1 parent bfe3fef commit 743645a

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

src/Ambar/Emulator/Connector/Postgres.hs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,12 @@ data Section = Section
132132
-- as splitting the boundaries into sections and querying for each section.
133133
splitBoundaries :: Boundaries -> [Section]
134134
splitBoundaries (Boundaries []) = [Section Nothing Nothing []]
135-
splitBoundaries (Boundaries bs) = zipWith3 Section starts ends batches
135+
splitBoundaries (Boundaries bs) = zipWith3 Section starts ends chunks
136136
where
137-
highest = fmap snd . listToMaybe . reverse
138-
lowest = fmap fst . listToMaybe
139-
batches = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs
140-
starts = Nothing : fmap highest batches
141-
ends = drop 1 (fmap lowest batches) <> [Nothing]
137+
chunks = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs
138+
lowest = fmap (fmap fst . listToMaybe) chunks
139+
starts = Nothing : drop 1 lowest
140+
ends = drop 1 lowest <> [Nothing]
142141

143142
connect
144143
:: PostgreSQL

tests/Test/Connector/PostgreSQL.hs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module Test.Connector.PostgreSQL
1414

1515
import Control.Concurrent (MVar, newMVar, modifyMVar)
1616
import Control.Exception (bracket, throwIO, ErrorCall(..), fromException)
17-
import Control.Monad (void, forM_)
17+
import Control.Monad (void, forM_, forM)
1818
import qualified Data.Aeson as Aeson
1919
import Data.Aeson (FromJSON)
2020
import Data.List (isInfixOf)
@@ -60,8 +60,8 @@ testPostgreSQL p = do
6060
void $ P.execute_ conn $ fromString $
6161
"ALTER SEQUENCE " <> tableName table <> "_id_seq INCREMENT BY 2"
6262

63-
let n = 50000
64-
connected $ Topic.withConsumer topic group $ \consumer -> deadline (seconds 10) $ do
63+
let n = 10000
64+
connected $ Topic.withConsumer topic group $ \consumer -> deadline (seconds 5) $ do
6565
-- pre-fill the table
6666
insert conn table (take n $ head $ mocks table)
6767
-- advance connector.
@@ -72,6 +72,31 @@ testPostgreSQL p = do
7272
insert conn table [head (mocks table !! 1)]
7373
void $ readEntry @Event consumer
7474

75+
it "splitBoundaries does not produce overlapping sections" $ do
76+
with (PartitionCount 1) $ \conn table topic connected -> do
77+
let odds = [1, 3..]
78+
evens = [2,4..]
79+
n = 2000
80+
-- cheat a little bit by specifying the ids.
81+
insertV :: Int -> IO ()
82+
insertV x = void $ P.execute_ conn $ fromString $
83+
"INSERT INTO " <> tableName table
84+
<> " (id, aggregate_id, sequence_number) VALUES ("
85+
<> show x <> ","<> show x <> ", "<> show x <>
86+
")"
87+
88+
-- leave lots of gaps
89+
forM_ (take n odds) insertV
90+
connected $ Topic.withConsumer topic group $ \consumer -> deadline (seconds 1) $ do
91+
oddEntries <- forM [1..n] $ \_ -> readEntry @Event consumer
92+
93+
-- fill all the gaps
94+
forM_ (take n evens) insertV
95+
evenEntries <- forM [1..n] $ \_ -> readEntry @Event consumer
96+
97+
let ids = fmap (e_id . fst) $ oddEntries <> evenEntries
98+
ids `shouldBe` (take n odds <> take n evens)
99+
75100
-- Test that column types are supported/unsupported by
76101
-- creating database entries with the value and reporting
77102
-- on the emulator's behaviour when trying to decode them.
@@ -331,7 +356,7 @@ instance Table (EventsTable PostgreSQL) where
331356
tableName (EventsTable name) = name
332357
tableCols _ = ["id", "aggregate_id", "sequence_number"]
333358
mocks _ =
334-
-- the aggregate_id is given when the records are inserted into the database
359+
-- the event id is given when the records are inserted into the database
335360
[ [ Event err agg_id seq_id | seq_id <- [0..] ]
336361
| agg_id <- [0..]
337362
]

0 commit comments

Comments
 (0)