Skip to content

Commit 731d118

Browse files
authored
Add a server to provide information about data sources and destinations (#48)
2 parents 619a3ca + 0eff11d commit 731d118

File tree

14 files changed

+187
-9
lines changed

14 files changed

+187
-9
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ data_destinations:
114114
- file_source
115115
```
116116
117+
## Projections info
118+
119+
The emulator starts a server that provides information about projections progress on the `/projections` endpoint.
120+
117121
## Running the program
118122

119123
You only need to provide the address of the configuration file and the emulator
@@ -128,6 +132,8 @@ will start streaming your data.
128132
You can see all commands available with the `--help` flag
129133

130134
``` bash
135+
$ emulator --help
136+
131137
Ambar Emulator v0.0.1 - alpha release
132138
133139
A local version of Ambar <https://ambar.cloud>
@@ -144,3 +150,29 @@ Available commands:
144150
145151
More info at <https://github.com/ambarltd/emulator>
146152
```
153+
154+
And these are the options for the `run` command.
155+
156+
```
157+
$ emulator run --help
158+
159+
Usage: emulator run [--partitions-per-topic INT] [--port INT]
160+
[--override-polling-interval SECONDS] [--data-path PATH]
161+
--config FILE [--verbose]
162+
163+
run the emulator
164+
165+
Available options:
166+
-h,--help Show this help text
167+
--partitions-per-topic INT
168+
How many partitions should newly created topics have.
169+
--port INT Port to attach projections info server to.
170+
--override-polling-interval SECONDS
171+
Override the polling interval for all polled data
172+
sources.
173+
--data-path PATH Where to put emulation data including file queues.
174+
Defaults to $XDG_DATA_HOME/ambar-emulator.
175+
--config FILE Yaml file with environment configuration. Spec at at
176+
<https://github.com/ambarltd/emulator>.
177+
--verbose Enable verbose logging.
178+
```

build/Dockerfile.dynamic

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Builds a dynamically linked binary.
2+
# Quicker for rebuilds. Works in OSX.
3+
4+
FROM alpine:3.19
5+
6+
RUN apk update
7+
RUN apk add libffi-dev
8+
RUN apk add ncurses-dev
9+
RUN apk add libpq-dev
10+
RUN apk add gmp-dev
11+
RUN apk add zlib-dev
12+
RUN apk add libc-dev
13+
RUN apk add musl-dev
14+
RUN apk add mariadb-connector-c-dev
15+
RUN apk add pcre-dev
16+
17+
RUN apk add git
18+
RUN apk add curl
19+
RUN apk add binutils-gold
20+
RUN apk add gcc
21+
RUN apk add g++
22+
RUN apk add make
23+
RUN apk add perl
24+
RUN apk add pkgconfig
25+
RUN apk add tar
26+
RUN apk add xz
27+
RUN apk add bash
28+
29+
RUN curl --proto '=https' --tlsv1.3 -sSf https://get-ghcup.haskell.org | \
30+
BOOTSTRAP_HASKELL_NONINTERACTIVE=1 \
31+
BOOTSTRAP_HASKELL_GHC_VERSION="9.10.1" \
32+
BOOTSTRAP_HASKELL_INSTALL_NO_STACK=1 \
33+
sh
34+
35+
ENV PATH="/root/.ghcup/bin:$PATH"
36+
ENV PATH="/root/.cabal/bin:$PATH"
37+
38+
WORKDIR /opt/emulator
39+
40+
# Add only the build files and build only the dependencies. Docker will cache
41+
# this layer, freeing us up to modify source code without re-building the
42+
# dependencies (unless the .cabal file changes)
43+
COPY ./emulator.cabal /opt/emulator
44+
COPY ./cabal.project /opt/emulator
45+
COPY ./deps /opt/emulator/deps
46+
RUN cabal build --dependencies-only +RTS -N
47+
48+
COPY ./src /opt/emulator/src
49+
COPY ./tests /opt/emulator/tests
50+
COPY ./benchmarks /opt/emulator/benchmarks
51+
RUN cabal build exe:emulator
52+
RUN cp $(cabal list-bin emulator) /opt/emulator/emulator
53+
RUN mkdir -p /opt/emulator/config
54+
55+
ENTRYPOINT ["sh", "-c", "rm -Rf /root/.local/share/ambar-emulator/queues/*.lock /root/.local/share/ambar-emulator/queues/**/*.lock && /opt/emulator/emulator $0 $@"]
56+
57+
CMD ["run", "--config", "/opt/emulator/config/config.yaml"]

emulator.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ library
5454
Ambar.Emulator.Queue.Partition
5555
Ambar.Emulator.Queue.Partition.File
5656
Ambar.Emulator.Queue.Partition.STMReader
57+
Ambar.Emulator.Server
5758
Ambar.Transport
5859
Ambar.Transport.Http
5960
Ambar.Transport.File
@@ -95,6 +96,8 @@ library
9596
, time
9697
, unordered-containers
9798
, yaml
99+
, warp
100+
, wai
98101

99102
test-suite emulator-tests
100103
import: common

src/Ambar/Emulator.hs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState)
2121
import Ambar.Emulator.Connector.MySQL (MySQLState)
2222
import Ambar.Emulator.Connector.Postgres (PostgreSQLState)
2323

24+
import qualified Ambar.Emulator.Server as Server
2425
import qualified Ambar.Emulator.Projector as Projector
2526
import Ambar.Emulator.Projector (Projection(..))
2627
import qualified Ambar.Transport.File as FileTransport
@@ -37,6 +38,7 @@ import Ambar.Emulator.Config
3738
, DataDestination(..)
3839
, Destination(..)
3940
)
41+
import Util.Async (withAsyncThrow)
4042
import Util.Delay (every, seconds)
4143
import Util.Directory (writeAtomically)
4244
import Util.Logger (SimpleLogger, annotate, logInfo, logDebugAction)
@@ -46,12 +48,18 @@ import Util.STM (atomicallyNamed)
4648
emulate :: SimpleLogger -> EmulatorConfig -> EnvironmentConfig -> IO ()
4749
emulate logger_ config env = do
4850
Queue.withQueue queuePath pcount $ \queue ->
51+
withServer queue $
4952
concurrently_ (connectAll queue) (projectAll queue)
5053
where
5154
queuePath = c_dataPath config </> "queues"
5255
statePath = c_dataPath config </> "state.json"
5356
pcount = Topic.PartitionCount $ c_partitionsPerTopic config
5457

58+
withServer queue act =
59+
case c_port config of
60+
Nothing -> act
61+
Just port -> withAsyncThrow (Server.run port queue) act
62+
5563
connectAll queue = do
5664
EmulatorState connectorStates <- load
5765
let getState source =
@@ -180,7 +188,7 @@ toConnectorConfig source sstate =
180188
"Incompatible state for source: " <> show (s_id source)
181189

182190
topicName :: Id DataSource -> TopicName
183-
topicName sid = TopicName $ "t-" <> unId sid
191+
topicName sid = TopicName $ unId sid
184192

185193
projectionId :: Id DataDestination -> Id Projection
186-
projectionId (Id dst) = Id ("p-" <> dst)
194+
projectionId (Id dst) = Id dst

src/Ambar/Emulator/Config.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ module Ambar.Emulator.Config
88
, Source(..)
99
, DataDestination(..)
1010
, Destination(..)
11+
, Port(..)
1112
)
1213
where
1314

@@ -28,6 +29,7 @@ import qualified Data.Map.Strict as Map
2829
import qualified Data.Yaml as Yaml
2930

3031
import Ambar.Emulator.Connector.Poll (PollingInterval(..))
32+
import Ambar.Emulator.Server (Port(..))
3133
import Util.Delay (millis)
3234
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..))
3335
import Ambar.Emulator.Connector.Postgres (PostgreSQL(..))
@@ -42,6 +44,7 @@ newtype Id a = Id { unId :: Text }
4244
-- | Configures the internals of the emulator
4345
data EmulatorConfig = EmulatorConfig
4446
{ c_partitionsPerTopic :: Int
47+
, c_port :: Maybe Port
4548
, c_dataPath :: FilePath
4649
}
4750

src/Ambar/Emulator/Queue.hs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ import Ambar.Emulator.Queue.Topic
3131
, TopicState(..)
3232
, PartitionNumber(..)
3333
, PartitionCount(..)
34+
, ConsumerGroupName
3435
)
3536
import qualified Ambar.Emulator.Queue.Topic as T
37+
import Ambar.Emulator.Queue.Partition (Count, Offset)
3638
import qualified Ambar.Emulator.Queue.Partition.File as FilePartition
3739
import Ambar.Emulator.Queue.Partition.File (FilePartition)
3840
import Util.Async (withAsyncThrow)
@@ -203,8 +205,18 @@ inventoryRelease (Store path) = do
203205
let lock = path </> "inventory.lock"
204206
removeFile lock
205207

206-
getInfo :: Queue -> IO (HashMap TopicName PartitionCount)
208+
type QueueInfo =
209+
HashMap TopicName
210+
( HashMap PartitionNumber Count
211+
, HashMap ConsumerGroupName (HashMap PartitionNumber Offset)
212+
)
213+
214+
getInfo :: Queue -> IO QueueInfo
207215
getInfo queue = do
208216
topics <- readMVar (q_topics queue)
209-
return $ flip fmap topics $ \tdata ->
210-
PartitionCount $ HashMap.size (d_partitions tdata)
217+
forM topics $ \(TopicData topic partitions) -> do
218+
consumers <- T.s_consumers <$> T.getState topic
219+
counts <- traverse FilePartition.getRecordCount partitions
220+
return (counts, consumers)
221+
222+

src/Ambar/Emulator/Queue/Partition.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ newtype Offset = Offset { unOffset :: Int }
2525
-- The total number of records in a partition.
2626
newtype Count = Count Int
2727
deriving (Show)
28-
deriving newtype (Eq, Ord, Enum, Integral, Real, Num)
28+
deriving newtype (Eq, Ord, Enum, Integral, Real, Num, ToJSON)
2929

3030
newtype Record = Record { unRecord :: ByteString }
3131
deriving newtype (Eq, Ord, Show)

src/Ambar/Emulator/Queue/Partition/File.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Ambar.Emulator.Queue.Partition.File
1010
, openNonLockingWritableFD
1111
, closeNonLockingWritableFD
1212
, writeFD
13+
, getRecordCount
1314
) where
1415

1516
import Control.Concurrent (MVar, newMVar, withMVarMasked, modifyMVar_)
@@ -199,6 +200,11 @@ openNonLockingWritableFD path = do
199200
closeNonLockingWritableFD :: FD -> IO ()
200201
closeNonLockingWritableFD = FD.close
201202

203+
getRecordCount :: FilePartition -> IO Count
204+
getRecordCount (FilePartition{..}) = do
205+
(count, _) <- STM.readTVarIO p_info
206+
return count
207+
202208
-- A single-threaded file readed.
203209
-- If you use it from multiple threads you will have problems.
204210
data FileReader = FileReader PartitionName (TMVar ReaderInfo)

src/Ambar/Emulator/Queue/Topic.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ commit (Consumer cid _ var) (Meta pnumber offset) =
489489
STM.modifyTVar (r_committed r) $ \previous -> max previous (offset + 1)
490490

491491
newtype PartitionCount = PartitionCount Int
492+
deriving newtype (ToJSON)
492493

493494
partitionCount :: Topic -> PartitionCount
494495
partitionCount topic = PartitionCount $ HashMap.size (t_partitions topic)

0 commit comments

Comments
 (0)