@@ -12,6 +12,13 @@ class RuuviReactorImpl: RuuviReactor {
1212 private let sqliteContext : SQLiteContext
1313 private let sqlitePersistence : RuuviPersistence
1414 private let errorReporter : RuuviErrorReporter
15+ private let stateLock = NSLock ( )
16+ private let entityCombine : RuuviTagSubjectCombine
17+
18+ private var recordCombines = [ String: RuuviTagRecordSubjectCombine] ( )
19+ private var lastRecordCombines = [ String: RuuviTagLastRecordSubjectCombine] ( )
20+ private var latestRecordCombines = [ String: RuuviTagLatestRecordSubjectCombine] ( )
21+ private var sensorSettingsCombines = [ String: SensorSettingsCombine] ( )
1522
1623 init (
1724 sqliteContext: SQLiteContext ,
@@ -21,38 +28,48 @@ class RuuviReactorImpl: RuuviReactor {
2128 self . sqliteContext = sqliteContext
2229 self . sqlitePersistence = sqlitePersistence
2330 self . errorReporter = errorReporter
31+ self . entityCombine = RuuviTagSubjectCombine (
32+ sqlite: sqliteContext,
33+ errorReporter: errorReporter
34+ )
2435 }
2536
26- private lazy var entityCombine = RuuviTagSubjectCombine (
27- sqlite: sqliteContext,
28- errorReporter: errorReporter
29- )
30- private lazy var recordCombines = [ String: RuuviTagRecordSubjectCombine] ( )
31- private lazy var lastRecordCombines = [ String: RuuviTagLastRecordSubjectCombine] ( )
32- private lazy var latestRecordCombines = [ String: RuuviTagLatestRecordSubjectCombine] ( )
33- private lazy var sensorSettingsCombines = [ String: SensorSettingsCombine] ( )
37+ private func synchronized< T> ( _ block: ( ) -> T ) -> T {
38+ stateLock. lock ( )
39+ defer { stateLock. unlock ( ) }
40+ return block ( )
41+ }
3442
3543 func observe(
3644 _ luid: LocalIdentifier ,
3745 _ block: @escaping ( [ AnyRuuviTagSensorRecord ] ) -> Void
3846 ) -> RuuviReactorToken {
39- var recordCombine : RuuviTagRecordSubjectCombine
40- if let combine = recordCombines [ luid. value] {
41- recordCombine = combine
42- } else {
47+ let recordCombine : RuuviTagRecordSubjectCombine = synchronized {
48+ if let combine = recordCombines [ luid. value] {
49+ return combine
50+ }
4351 let combine = RuuviTagRecordSubjectCombine (
4452 luid: luid,
4553 macId: nil ,
4654 sqlite: sqliteContext,
4755 errorReporter: errorReporter
4856 )
4957 recordCombines [ luid. value] = combine
50- recordCombine = combine
58+ return combine
5159 }
60+
5261 let cancellable = recordCombine. subject. sink { values in
5362 block ( values)
5463 }
55- if !recordCombine. isServing {
64+
65+ let shouldStart = synchronized {
66+ if recordCombine. isServing {
67+ return false
68+ }
69+ recordCombine. isServing = true
70+ return true
71+ }
72+ if shouldStart {
5673 recordCombine. start ( )
5774 }
5875 return RuuviReactorToken {
@@ -96,23 +113,32 @@ class RuuviReactorImpl: RuuviReactor {
96113 let result = [ sqliteRecord ] . compactMap { $0? . any } . last
97114 block ( . update( result) )
98115 } )
99- var recordCombine : RuuviTagLastRecordSubjectCombine
100- if let combine = lastRecordCombines [ ruuviTag. id] {
101- recordCombine = combine
102- } else {
116+ let recordCombine : RuuviTagLastRecordSubjectCombine = synchronized {
117+ if let combine = lastRecordCombines [ ruuviTag. id] {
118+ return combine
119+ }
103120 let combine = RuuviTagLastRecordSubjectCombine (
104121 luid: ruuviTag. luid,
105122 macId: ruuviTag. macId,
106123 sqlite: sqliteContext,
107124 errorReporter: errorReporter
108125 )
109126 lastRecordCombines [ ruuviTag. id] = combine
110- recordCombine = combine
127+ return combine
111128 }
129+
112130 let cancellable = recordCombine. subject. sink { record in
113131 block ( . update( record) )
114132 }
115- if !recordCombine. isServing {
133+
134+ let shouldStart = synchronized {
135+ if recordCombine. isServing {
136+ return false
137+ }
138+ recordCombine. isServing = true
139+ return true
140+ }
141+ if shouldStart {
116142 recordCombine. start ( )
117143 }
118144 return RuuviReactorToken {
@@ -129,23 +155,32 @@ class RuuviReactorImpl: RuuviReactor {
129155 let result = [ sqliteRecord ] . compactMap { $0? . any } . last
130156 block ( . update( result) )
131157 } )
132- var recordCombine : RuuviTagLatestRecordSubjectCombine
133- if let combine = latestRecordCombines [ ruuviTag. id] {
134- recordCombine = combine
135- } else {
158+ let recordCombine : RuuviTagLatestRecordSubjectCombine = synchronized {
159+ if let combine = latestRecordCombines [ ruuviTag. id] {
160+ return combine
161+ }
136162 let combine = RuuviTagLatestRecordSubjectCombine (
137163 luid: ruuviTag. luid,
138164 macId: ruuviTag. macId,
139165 sqlite: sqliteContext,
140166 errorReporter: errorReporter
141167 )
142168 latestRecordCombines [ ruuviTag. id] = combine
143- recordCombine = combine
169+ return combine
144170 }
171+
145172 let cancellable = recordCombine. subject. sink { record in
146173 block ( . update( record) )
147174 }
148- if !recordCombine. isServing {
175+
176+ let shouldStart = synchronized {
177+ if recordCombine. isServing {
178+ return false
179+ }
180+ recordCombine. isServing = true
181+ return true
182+ }
183+ if shouldStart {
149184 recordCombine. start ( )
150185 }
151186 return RuuviReactorToken {
@@ -162,19 +197,20 @@ class RuuviReactorImpl: RuuviReactor {
162197 block ( . initial( [ sensorSettings] ) )
163198 }
164199 }
165- var sensorSettingsCombine : SensorSettingsCombine
166- if let combine = sensorSettingsCombines [ ruuviTag. id] {
167- sensorSettingsCombine = combine
168- } else {
200+ let sensorSettingsCombine : SensorSettingsCombine = synchronized {
201+ if let combine = sensorSettingsCombines [ ruuviTag. id] {
202+ return combine
203+ }
169204 let combine = SensorSettingsCombine (
170205 luid: ruuviTag. luid,
171206 macId: ruuviTag. macId,
172207 sqlite: sqliteContext,
173208 errorReporter: errorReporter
174209 )
175210 sensorSettingsCombines [ ruuviTag. id] = combine
176- sensorSettingsCombine = combine
211+ return combine
177212 }
213+
178214 let insert = sensorSettingsCombine. insertSubject. sink { value in
179215 block ( . insert( value) )
180216 }
0 commit comments