Skip to content

Commit 0250711

Browse files
authored
Merge pull request #37 from ambarltd/file-connector
Real support for a File Connector
2 parents 7cec51d + 34c182d commit 0250711

File tree

16 files changed

+496
-223
lines changed

16 files changed

+496
-223
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ data_sources:
3636
partitioningColumn: aggregate_id
3737

3838
# Connect to a MySQL database
39-
- id: postgres_source
39+
- id: mysql_source
4040
description: Main events store
4141
type: mysql
4242
host: localhost
@@ -71,6 +71,16 @@ data_sources:
7171
autoIncrementingColumn: id
7272
partitioningColumn: aggregate_id
7373

74+
# Use a plain text file as a data source.
75+
# Each line must be a valid JSON object.
76+
# Values are projected as they are added.
77+
- id: file_source
78+
description: My file JSON event store
79+
type: file
80+
path: ./path/to/source.file
81+
incrementingField: id
82+
partitioningField: aggregate_id
83+
7484
# Connections to your endpoint.
7585
# The Emulator will send data read from the databases to these endpoints.
7686
data_destinations:

emulator.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ test-suite emulator-tests
119119
Test.Config
120120
Test.Queue
121121
Test.Connector
122+
Test.Connector.File
122123
Test.Connector.MySQL
123124
Test.Connector.PostgreSQL
124125
Test.Connector.MicrosoftSQLServer

examples/config.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ data_sources:
2121
partitioningColumn: aggregate_id
2222

2323
# Connect to a MySQL database
24-
- id: postgres_source
24+
- id: mysql_source
2525
description: Main events store
2626
type: mysql
2727
host: localhost
@@ -56,6 +56,16 @@ data_sources:
5656
autoIncrementingColumn: id
5757
partitioningColumn: aggregate_id
5858

59+
# Use a plain text file as a data source.
60+
# Each line must be a valid JSON object.
61+
# Values are projected as they are added.
62+
- id: file_source
63+
description: My file JSON event store
64+
type: file
65+
path: ./path/to/source.file
66+
incrementingField: id
67+
partitioningField: aggregate_id
68+
5969
# Connections to your endpoint.
6070
# The Emulator will send data read from the databases to these endpoints.
6171
data_destinations:

src/Ambar/Emulator.hs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import System.Directory (doesFileExist)
1616
import System.FilePath ((</>))
1717

1818
import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder)
19+
import Ambar.Emulator.Connector.File (FileConnectorState, mkFileConnector)
1920
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState)
2021
import Ambar.Emulator.Connector.MySQL (MySQLState)
2122
import Ambar.Emulator.Connector.Postgres (PostgreSQLState)
@@ -98,7 +99,7 @@ emulate logger_ config env = do
9899
SourcePostgreSQL _ -> StatePostgres def
99100
SourceMySQL _ -> StateMySQL def
100101
SourceSQLServer _ -> StateSQLServer def
101-
SourceFile _ -> StateFile ()
102+
SourceFile{} -> StateFile def
102103

103104
projectAll queue = forConcurrently_ (c_destinations env) (project queue)
104105

@@ -137,7 +138,7 @@ data SavedState
137138
= StatePostgres PostgreSQLState
138139
| StateMySQL MySQLState
139140
| StateSQLServer SQLServerState
140-
| StateFile ()
141+
| StateFile FileConnectorState
141142
deriving (Generic)
142143
deriving anyclass (ToJSON, FromJSON)
143144

@@ -168,10 +169,11 @@ toConnectorConfig source sstate =
168169
StateSQLServer state ->
169170
return $ ConnectorConfig source sqlserver state StateSQLServer
170171
_ -> incompatible
171-
SourceFile path ->
172+
SourceFile path partitioningField incrementingField ->
172173
case sstate of
173-
StateFile () ->
174-
return $ ConnectorConfig source path () StateFile
174+
StateFile state -> do
175+
fileconn <- mkFileConnector path partitioningField incrementingField
176+
return $ ConnectorConfig source fileconn state StateFile
175177
_ -> incompatible
176178
where
177179
incompatible = throwIO $ ErrorCall $

src/Ambar/Emulator/Config.hs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import qualified Data.Yaml as Yaml
3030
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..))
3131
import Ambar.Emulator.Connector.Postgres (PostgreSQL(..))
3232
import Ambar.Emulator.Connector.MySQL (MySQL(..))
33-
import Ambar.Emulator.Connector.File (FileConnector(..))
3433
import Ambar.Transport (SubmissionError)
3534
import Ambar.Transport.Http (Endpoint, User, Password)
3635

@@ -58,7 +57,7 @@ data DataSource = DataSource
5857
}
5958

6059
data Source
61-
= SourceFile FileConnector
60+
= SourceFile { sf_path :: FilePath, sf_partitioningField :: Text, sf_incrementingField :: Text }
6261
| SourcePostgreSQL PostgreSQL
6362
| SourceMySQL MySQL
6463
| SourceSQLServer SQLServer
@@ -150,7 +149,11 @@ instance FromJSON DataSource where
150149
c_incrementingColumn <- o .: "autoIncrementingColumn"
151150
return $ SourceSQLServer SQLServer{..}
152151

153-
parseFile o = SourceFile . FileConnector <$> (o .: "path")
152+
parseFile o = do
153+
sf_path <- o .: "path"
154+
sf_partitioningField <- o .: "partitioningField"
155+
sf_incrementingField <- o .: "incrementingField"
156+
return $ SourceFile{..}
154157

155158
parseDataDestination
156159
:: Map (Id DataSource) DataSource
Lines changed: 215 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,239 @@
11
module Ambar.Emulator.Connector.File
2-
( FileConnector(..)
3-
) where
2+
( FileConnector
3+
, FileConnectorState
4+
, FileRecord
5+
, mkFileConnector
6+
, write
7+
, c_path
8+
) where
49

510
{-| File connector.
611
Read JSON values from a file.
712
One value per line.
813
-}
914

15+
import Control.Concurrent (MVar, newMVar, withMVar)
16+
import Control.Concurrent.STM
17+
( STM
18+
, TMVar
19+
, TVar
20+
, newTVarIO
21+
, readTVar
22+
, atomically
23+
, writeTVar
24+
, newTMVarIO
25+
, modifyTVar
26+
, retry
27+
, takeTMVar
28+
, putTMVar
29+
)
30+
import Control.Exception (bracket)
31+
import Control.Monad (forever, when)
1032
import qualified Data.Aeson as Json
11-
import Control.Monad (forM_)
12-
import qualified Data.ByteString.Lazy.Char8 as Char8
33+
import qualified Data.Aeson.KeyMap as KeyMap
34+
import qualified Data.ByteString as BS
35+
import qualified Data.ByteString.Char8 as Char8
1336
import qualified Data.ByteString.Lazy as LB
14-
import qualified Data.Text.Lazy as Text
15-
import qualified Data.Text.Lazy.Encoding as Text
37+
import Data.Default (Default)
38+
import Data.Maybe (fromMaybe)
39+
import Data.String (IsString(fromString))
40+
import Data.Text (Text)
41+
import qualified Data.Text.Lazy as LText
42+
import qualified Data.Text.Lazy.Encoding as LText
43+
import qualified Data.Text as Text
44+
import qualified Data.Text.Encoding as Text
45+
import GHC.Generics (Generic)
46+
import GHC.IO.FD (FD)
47+
import System.Directory (getFileSize)
48+
import System.IO
49+
( Handle
50+
, hSeek
51+
, openFile
52+
, hSeek
53+
, hIsEOF
54+
, hClose
55+
, IOMode(..)
56+
, SeekMode(..)
57+
)
58+
import Prettyprinter ((<+>))
1659

1760
import qualified Ambar.Emulator.Connector as C
61+
import Ambar.Emulator.Queue.Partition.File
62+
( openNonLockingWritableFD
63+
, writeFD
64+
)
1865
import Ambar.Emulator.Queue.Topic (modPartitioner)
66+
import Ambar.Emulator.Queue.Topic (Producer)
1967
import qualified Ambar.Emulator.Queue.Topic as Topic
2068
import Utils.Async (withAsyncThrow)
21-
import Utils.Logger (fatal, logInfo)
69+
import Utils.Logger (SimpleLogger, fatal, logInfo)
70+
import Utils.Delay (Duration, delay, millis)
71+
import Utils.Prettyprinter (prettyJSON, renderPretty, commaSeparated)
2272

23-
data FileConnector = FileConnector FilePath
73+
_POLLING_INTERVAL :: Duration
74+
_POLLING_INTERVAL = millis 50
75+
76+
data FileConnector = FileConnector
77+
{ c_path :: FilePath
78+
, c_partitioningField :: Text
79+
, c_incrementingField :: Text
80+
, c_state :: TVar FileConnectorState
81+
, c_readHandle :: TMVar Handle
82+
, c_writeHandle :: MVar FD
83+
, c_getFileSize :: IO Integer
84+
}
85+
86+
-- | We don't close these file descriptors because we consider that
87+
-- this is only used during tests.
88+
mkFileConnector :: FilePath -> Text -> Text -> IO FileConnector
89+
mkFileConnector path partitioningField incrementingField = do
90+
size <- getFileSize path
91+
varState <- newTVarIO (FileConnectorState size 0)
92+
varWriteHandle <- do
93+
fd <- openNonLockingWritableFD path
94+
newMVar fd
95+
varReadHandle <- do
96+
readHandle <- openFile path ReadMode
97+
newTMVarIO readHandle
98+
return $ FileConnector
99+
path
100+
partitioningField
101+
incrementingField
102+
varState
103+
varReadHandle
104+
varWriteHandle
105+
(getFileSize path)
106+
107+
-- Does not work in the presence of external writers to the same file.
108+
write :: FileConnector -> Json.Value -> IO ()
109+
write FileConnector{..} json = do
110+
withMVar c_writeHandle $ \fd -> do
111+
let entry = LB.toStrict (Json.encode json) <> "\n"
112+
entrySize = fromIntegral (BS.length entry)
113+
writeFD fd entry
114+
atomically $ modifyTVar c_state $ \state ->
115+
state { c_fileSize = c_fileSize state + entrySize }
116+
117+
data FileConnectorState = FileConnectorState
118+
{ c_fileSize :: Integer
119+
, c_offset :: Integer
120+
}
121+
deriving (Show, Generic)
122+
deriving anyclass (Json.ToJSON, Json.FromJSON, Default)
24123

25124
newtype FileRecord = FileRecord Json.Value
26125

27126
instance C.Connector FileConnector where
28-
type ConnectorState FileConnector = ()
127+
type ConnectorState FileConnector = FileConnectorState
29128
type ConnectorRecord FileConnector = FileRecord
30129
partitioner = modPartitioner (const 1)
31130
encoder (FileRecord value) = LB.toStrict $ Json.encode value
32-
connect (FileConnector path) logger () producer f =
33-
withAsyncThrow worker $ f (return ())
131+
connect = connect
132+
133+
connect
134+
:: FileConnector
135+
-> SimpleLogger
136+
-> FileConnectorState
137+
-> Producer (FileRecord)
138+
-> (STM FileConnectorState -> IO a)
139+
-> IO a
140+
connect conn@(FileConnector {..}) logger initState producer f = do
141+
h <- atomically $ do
142+
writeTVar c_state initState
143+
takeTMVar c_readHandle
144+
hSeek h AbsoluteSeek (c_offset initState)
145+
atomically $ putTMVar c_readHandle h
146+
withAsyncThrow updateFileSize $
147+
withAsyncThrow worker $
148+
f (readTVar c_state)
149+
where
150+
updateFileSize = forever $ do
151+
newSize <- c_getFileSize
152+
delay _POLLING_INTERVAL -- also serves to wait until any writing finishes
153+
atomically $ do
154+
FileConnectorState fsize offset <- readTVar c_state
155+
when (fsize < newSize) $
156+
writeTVar c_state $ FileConnectorState newSize offset
157+
158+
worker = forever $ do
159+
value <- readNext
160+
let record = FileRecord value
161+
Topic.write producer record
162+
logResult record
163+
164+
logResult record =
165+
logInfo logger $ renderPretty $
166+
"ingested." <+> commaSeparated
167+
[ "incrementing_value:" <+> prettyJSON (incrementingValue conn record)
168+
, "partitioning_value:" <+> prettyJSON (partitioningValue conn record)
169+
]
170+
171+
-- | Blocks until there is something to read.
172+
readNext :: IO Json.Value
173+
readNext =
174+
withReadLock $ \readHandle -> do
175+
bs <- Char8.hGetLine readHandle
176+
value <- case Json.eitherDecode $ LB.fromStrict bs of
177+
Left e -> fatal logger $ unlines
178+
[ "Unable to decode value from source:"
179+
, show e
180+
, Text.unpack $ Text.decodeUtf8 bs
181+
]
182+
Right v -> return v
183+
let entrySize = fromIntegral $ BS.length bs + BS.length "\n"
184+
atomically $ modifyTVar c_state $ \state ->
185+
state { c_offset = c_offset state + entrySize }
186+
return value
187+
188+
withReadLock :: (Handle -> IO a) -> IO a
189+
withReadLock = bracket acquire release
34190
where
35-
worker = do
36-
bs <- Char8.readFile path
37-
forM_ (Char8.lines bs) $ \line -> do
38-
value <- case Json.eitherDecode line of
39-
Left e -> fatal logger $ unlines
40-
[ "Unable to decode value from source:"
41-
, show e
42-
, Text.unpack $ Text.decodeUtf8 bs
43-
]
44-
Right v -> return v
45-
Topic.write producer (FileRecord value)
46-
logInfo logger $ "ingested. " <> Text.decodeUtf8 line
191+
acquire = do
192+
-- wait till there is data to read and take the lock.
193+
(h, offset) <- atomically $ do
194+
FileConnectorState fsize offset <- readTVar c_state
195+
when (fsize <= offset) retry
196+
h <- takeTMVar c_readHandle
197+
return (h, offset)
198+
199+
-- For some reason, if the file we are reading is updated by an external
200+
-- program (like the user manually adding an entry) the file reading library
201+
-- don't detect that EOF has moved. In this case we have to close this handle
202+
-- and open a new one.
203+
eof <- hIsEOF h
204+
if not eof
205+
then return h
206+
else do
207+
hClose h
208+
h' <- openFile c_path ReadMode
209+
hSeek h' AbsoluteSeek offset
210+
return h'
211+
212+
release readHandle = atomically $
213+
putTMVar c_readHandle readHandle
214+
215+
216+
partitioningValue :: FileConnector -> FileRecord -> Json.Value
217+
partitioningValue FileConnector{..} r = getField c_partitioningField r
218+
219+
incrementingValue :: FileConnector -> FileRecord -> Json.Value
220+
incrementingValue FileConnector{..} r = getField c_incrementingField r
221+
222+
getField :: Text -> FileRecord -> Json.Value
223+
getField field (FileRecord json) =
224+
fromMaybe err $ do
225+
o <- getObject json
226+
let key = fromString $ Text.unpack field
227+
v <- KeyMap.lookup key o
228+
return $ v
229+
where
230+
err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json
231+
232+
jsonToTxt :: Json.Value -> Text
233+
jsonToTxt = LText.toStrict . LText.decodeUtf8 . Json.encode
234+
235+
getObject :: Json.Value -> Maybe Json.Object
236+
getObject = \case
237+
Json.Object o -> Just o
238+
_ -> Nothing
47239

0 commit comments

Comments
 (0)