16
16
*
17
17
*
18
18
*/
19
+ use actix_web:: rt:: spawn;
19
20
use datafusion:: arrow;
20
21
use datafusion:: arrow:: datatypes:: Schema ;
21
22
use datafusion:: arrow:: error:: ArrowError ;
@@ -24,17 +25,19 @@ use datafusion::arrow::json;
24
25
use datafusion:: arrow:: json:: reader:: infer_json_schema;
25
26
use datafusion:: arrow:: record_batch:: RecordBatch ;
26
27
use lazy_static:: lazy_static;
27
- use log:: error;
28
28
use std:: collections:: HashMap ;
29
29
use std:: fs:: OpenOptions ;
30
30
use std:: io:: BufReader ;
31
+ use std:: ops:: { Deref , DerefMut } ;
31
32
use std:: sync:: Arc ;
32
33
use std:: sync:: Mutex ;
33
34
use std:: sync:: MutexGuard ;
34
35
use std:: sync:: RwLock ;
35
36
36
37
use crate :: metadata;
38
+ use crate :: metadata:: LOCK_EXPECT ;
37
39
use crate :: option:: CONFIG ;
40
+ use crate :: s3;
38
41
use crate :: storage:: ObjectStorage ;
39
42
40
43
use self :: error:: EventError ;
@@ -190,7 +193,7 @@ pub struct Event {
190
193
// Events holds the schema related to a each event for a single log stream
191
194
192
195
impl Event {
193
- pub async fn process ( & self , storage : & impl ObjectStorage ) -> Result < ( ) , EventError > {
196
+ pub async fn process ( & self ) -> Result < ( ) , EventError > {
194
197
let inferred_schema = self . infer_schema ( ) ?;
195
198
196
199
let event = self . get_reader ( inferred_schema. clone ( ) ) ;
@@ -207,12 +210,11 @@ impl Event {
207
210
} else {
208
211
// if stream schema is none then it is first event,
209
212
// process first event and store schema in obect store
210
- self . process_first_event ( event, inferred_schema, storage)
211
- . await ?
213
+ self . process_first_event :: < s3:: S3 , _ > ( event, inferred_schema) ?
212
214
} ;
213
215
214
216
if let Err ( e) = metadata:: STREAM_INFO . check_alerts ( self ) . await {
215
- error ! ( "Error checking for alerts. {:?}" , e) ;
217
+ log :: error!( "Error checking for alerts. {:?}" , e) ;
216
218
}
217
219
218
220
Ok ( ( ) )
@@ -221,25 +223,67 @@ impl Event {
221
223
// This is called when the first event of a log stream is received. The first event is
222
224
// special because we parse this event to generate the schema for the log stream. This
223
225
// schema is then enforced on rest of the events sent to this log stream.
224
- async fn process_first_event < R : std:: io:: Read > (
226
+ fn process_first_event < S : ObjectStorage , R : std:: io:: Read > (
225
227
& self ,
226
228
mut event : json:: Reader < R > ,
227
229
schema : Schema ,
228
- storage : & impl ObjectStorage ,
229
230
) -> Result < u64 , EventError > {
230
- let rb = event . next ( ) ? . ok_or ( EventError :: MissingRecord ) ? ;
231
- let stream_name = & self . stream_name ;
232
-
233
- // Store record batch on local cache
234
- STREAM_WRITERS :: create_entry ( stream_name . clone ( ) , & rb ) . unwrap ( ) ;
231
+ // note for functions _schema_with_map and _set_schema_with_map,
232
+ // these are to be called while holding a write lock specifically.
233
+ // this guarantees two things
234
+ // - no other metadata operation can happen inbetween
235
+ // - map always have an entry for this stream
235
236
236
- // Put the inferred schema to object store
237
- storage. put_schema ( stream_name. clone ( ) , & schema) . await ?;
237
+ let stream_name = & self . stream_name ;
238
238
239
- // set the schema in memory for this stream
240
- metadata:: STREAM_INFO . set_schema ( stream_name, schema) ?;
239
+ let mut stream_metadata = metadata:: STREAM_INFO . write ( ) . expect ( LOCK_EXPECT ) ;
240
+ // if the metadata is not none after acquiring lock
241
+ // then some other thread has already completed this function.
242
+ if _schema_with_map ( stream_name, & stream_metadata) . is_some ( ) {
243
+ // drop the lock
244
+ drop ( stream_metadata) ;
245
+ // Try to post event usual way
246
+ log:: info!( "first event is redirected to process_event" ) ;
247
+ self . process_event ( event)
248
+ } else {
249
+ // stream metadata is still none,
250
+ // this means this execution should be considered as first event.
251
+
252
+ // Store record batch on local cache
253
+ log:: info!( "creating local writer for this first event" ) ;
254
+ let rb = event. next ( ) ?. ok_or ( EventError :: MissingRecord ) ?;
255
+ STREAM_WRITERS :: append_to_local ( stream_name, & rb) ?;
256
+
257
+ log:: info!( "schema is set in memory map for logstream {}" , stream_name) ;
258
+ _set_schema_with_map ( stream_name, schema. clone ( ) , & mut stream_metadata) ;
259
+ // drop mutex before going across await point
260
+ drop ( stream_metadata) ;
261
+
262
+ log:: info!(
263
+ "setting schema on objectstore for logstream {}" ,
264
+ stream_name
265
+ ) ;
266
+ let storage = S :: new ( ) ;
267
+
268
+ let stream_name = stream_name. clone ( ) ;
269
+ spawn ( async move {
270
+ if let Err ( e) = storage. put_schema ( stream_name. clone ( ) , & schema) . await {
271
+ // If this call has failed then currently there is no right way to make local state consistent
272
+ // this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync.
273
+ // Reasoning -
274
+ // - After dropping lock many events may process through
275
+ // - Processed events may sync before metadata deletion
276
+ log:: error!(
277
+ "Parseable failed to upload schema to objectstore due to error {}" ,
278
+ e
279
+ ) ;
280
+ log:: error!( "Please manually delete this logstream and create a new one." ) ;
281
+ metadata:: STREAM_INFO . delete_stream ( & stream_name) ;
282
+ }
283
+ } ) ;
241
284
242
- Ok ( 0 )
285
+ Ok ( 0 )
286
+ }
243
287
}
244
288
245
289
// event process all events after the 1st event. Concatenates record batches
@@ -273,6 +317,31 @@ impl Event {
273
317
}
274
318
}
275
319
320
+ // Special functions which reads from metadata map while holding the lock
321
+ #[ inline]
322
+ pub fn _schema_with_map (
323
+ stream_name : & str ,
324
+ map : & impl Deref < Target = HashMap < String , metadata:: LogStreamMetadata > > ,
325
+ ) -> Option < Schema > {
326
+ map. get ( stream_name)
327
+ . expect ( "map has entry for this stream name" )
328
+ . schema
329
+ . to_owned ( )
330
+ }
331
+
332
+ #[ inline]
333
+ // Special functions which writes to metadata map while holding the lock
334
+ pub fn _set_schema_with_map (
335
+ stream_name : & str ,
336
+ schema : Schema ,
337
+ map : & mut impl DerefMut < Target = HashMap < String , metadata:: LogStreamMetadata > > ,
338
+ ) {
339
+ map. get_mut ( stream_name)
340
+ . expect ( "map has entry for this stream name" )
341
+ . schema
342
+ . replace ( schema) ;
343
+ }
344
+
276
345
pub mod error {
277
346
use crate :: metadata:: error:: stream_info:: MetadataError ;
278
347
use crate :: storage:: ObjectStorageError ;
0 commit comments