18
18
*/
19
19
use datafusion:: arrow;
20
20
use datafusion:: arrow:: datatypes:: Schema ;
21
+ use datafusion:: arrow:: error:: ArrowError ;
21
22
use datafusion:: arrow:: ipc:: writer:: StreamWriter ;
22
23
use datafusion:: arrow:: json;
23
24
use datafusion:: arrow:: json:: reader:: infer_json_schema;
@@ -34,9 +35,9 @@ use std::sync::RwLock;
34
35
35
36
use crate :: metadata;
36
37
use crate :: option:: CONFIG ;
37
- use crate :: response;
38
38
use crate :: storage:: ObjectStorage ;
39
- use crate :: Error ;
39
+
40
+ use self :: error:: EventError ;
40
41
41
42
type LocalWriter = Mutex < Option < StreamWriter < std:: fs:: File > > > ;
42
43
type LocalWriterGuard < ' a > = MutexGuard < ' a , Option < StreamWriter < std:: fs:: File > > > ;
@@ -51,13 +52,13 @@ impl STREAM_WRITERS {
51
52
fn append_to_local ( stream : & str , record : & RecordBatch ) -> Result < ( ) , StreamWriterError > {
52
53
let hashmap_guard = STREAM_WRITERS
53
54
. read ( )
54
- . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
55
+ . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
55
56
56
57
match hashmap_guard. get ( stream) {
57
58
Some ( localwriter) => {
58
59
let mut writer_guard = localwriter
59
60
. lock ( )
60
- . map_err ( |_| StreamWriterError :: MutexPoisioned ) ?;
61
+ . map_err ( |_| StreamWriterError :: MutexPoisoned ) ?;
61
62
62
63
// if it's some writer then we write without dropping any lock
63
64
// hashmap cannot be brought mutably at any point until this finishes
@@ -85,7 +86,7 @@ impl STREAM_WRITERS {
85
86
fn create_entry ( stream : String , record : & RecordBatch ) -> Result < ( ) , StreamWriterError > {
86
87
let mut hashmap_guard = STREAM_WRITERS
87
88
. write ( )
88
- . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
89
+ . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
89
90
90
91
let file = OpenOptions :: new ( )
91
92
. append ( true )
@@ -109,7 +110,7 @@ impl STREAM_WRITERS {
109
110
pub fn delete_entry ( stream : & str ) -> Result < ( ) , StreamWriterError > {
110
111
let mut hashmap_guard = STREAM_WRITERS
111
112
. write ( )
112
- . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
113
+ . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
113
114
114
115
hashmap_guard. remove ( stream) ;
115
116
@@ -143,14 +144,14 @@ impl STREAM_WRITERS {
143
144
pub fn unset_entry ( stream : & str ) -> Result < ( ) , StreamWriterError > {
144
145
let guard = STREAM_WRITERS
145
146
. read ( )
146
- . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
147
+ . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
147
148
let stream_writer = match guard. get ( stream) {
148
149
Some ( writer) => writer,
149
150
None => return Ok ( ( ) ) ,
150
151
} ;
151
152
stream_writer
152
153
. lock ( )
153
- . map_err ( |_| StreamWriterError :: MutexPoisioned ) ?
154
+ . map_err ( |_| StreamWriterError :: MutexPoisoned ) ?
154
155
. take ( ) ;
155
156
156
157
Ok ( ( ) )
@@ -163,10 +164,10 @@ pub enum StreamWriterError {
163
164
Writer ( arrow:: error:: ArrowError ) ,
164
165
#[ error( "Io Error when creating new file: {0}" ) ]
165
166
Io ( std:: io:: Error ) ,
166
- #[ error( "RwLock was poisioned " ) ]
167
- RwPoisioned ,
168
- #[ error( "Mutex was poisioned " ) ]
169
- MutexPoisioned ,
167
+ #[ error( "RwLock was poisoned " ) ]
168
+ RwPoisoned ,
169
+ #[ error( "Mutex was poisoned " ) ]
170
+ MutexPoisoned ,
170
171
}
171
172
172
173
fn data_file_path ( stream_name : & str ) -> String {
@@ -189,24 +190,17 @@ pub struct Event {
189
190
// Events holds the schema related to a each event for a single log stream
190
191
191
192
impl Event {
192
- pub async fn process (
193
- & self ,
194
- storage : & impl ObjectStorage ,
195
- ) -> Result < response:: EventResponse , Error > {
196
- let inferred_schema = self . infer_schema ( ) . map_err ( |e| {
197
- error ! ( "Failed to infer schema for event. {:?}" , e) ;
198
- e
199
- } ) ?;
193
+ pub async fn process ( & self , storage : & impl ObjectStorage ) -> Result < ( ) , EventError > {
194
+ let inferred_schema = self . infer_schema ( ) ?;
200
195
201
196
let event = self . get_reader ( inferred_schema. clone ( ) ) ;
202
197
203
198
let stream_schema = metadata:: STREAM_INFO . schema ( & self . stream_name ) ?;
204
- let is_first_event = stream_schema. is_none ( ) ;
205
199
206
200
if let Some ( existing_schema) = stream_schema {
207
201
// validate schema before processing the event
208
202
if existing_schema != inferred_schema {
209
- return Err ( Error :: SchemaMismatch ( self . stream_name . clone ( ) ) ) ;
203
+ return Err ( EventError :: SchemaMismatch ( self . stream_name . clone ( ) ) ) ;
210
204
} else {
211
205
self . process_event ( event) ?
212
206
}
@@ -221,16 +215,7 @@ impl Event {
221
215
error ! ( "Error checking for alerts. {:?}" , e) ;
222
216
}
223
217
224
- let msg = if is_first_event {
225
- format ! (
226
- "Intial Event recieved for log stream {}, schema uploaded successfully" ,
227
- & self . stream_name,
228
- )
229
- } else {
230
- format ! ( "Event recieved for log stream {}" , & self . stream_name)
231
- } ;
232
-
233
- Ok ( response:: EventResponse { msg } )
218
+ Ok ( ( ) )
234
219
}
235
220
236
221
// This is called when the first event of a log stream is received. The first event is
@@ -241,56 +226,42 @@ impl Event {
241
226
mut event : json:: Reader < R > ,
242
227
schema : Schema ,
243
228
storage : & impl ObjectStorage ,
244
- ) -> Result < u64 , Error > {
245
- let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
229
+ ) -> Result < u64 , EventError > {
230
+ let rb = event. next ( ) ?. ok_or ( EventError :: MissingRecord ) ?;
246
231
let stream_name = & self . stream_name ;
247
232
248
233
// Store record batch on local cache
249
234
STREAM_WRITERS :: create_entry ( stream_name. clone ( ) , & rb) . unwrap ( ) ;
250
235
251
236
// Put the inferred schema to object store
252
- storage
253
- . put_schema ( stream_name. clone ( ) , & schema)
254
- . await
255
- . map_err ( |e| response:: EventError {
256
- msg : format ! (
257
- "Failed to upload schema for log stream {} due to err: {}" ,
258
- stream_name, e
259
- ) ,
260
- } ) ?;
237
+ storage. put_schema ( stream_name. clone ( ) , & schema) . await ?;
261
238
262
239
// set the schema in memory for this stream
263
- metadata:: STREAM_INFO
264
- . set_schema ( stream_name, schema)
265
- . map_err ( |e| response:: EventError {
266
- msg : format ! (
267
- "Failed to set schema for log stream {} due to err: {}" ,
268
- stream_name, e
269
- ) ,
270
- } ) ?;
240
+ metadata:: STREAM_INFO . set_schema ( stream_name, schema) ?;
271
241
272
242
Ok ( 0 )
273
243
}
274
244
275
245
// event process all events after the 1st event. Concatenates record batches
276
246
// and puts them in memory store for each event.
277
- fn process_event < R : std:: io:: Read > ( & self , mut event : json:: Reader < R > ) -> Result < u64 , Error > {
278
- let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
247
+ fn process_event < R : std:: io:: Read > (
248
+ & self ,
249
+ mut event : json:: Reader < R > ,
250
+ ) -> Result < u64 , EventError > {
251
+ let rb = event. next ( ) ?. ok_or ( EventError :: MissingRecord ) ?;
279
252
let stream_name = & self . stream_name ;
280
253
281
- STREAM_WRITERS :: append_to_local ( stream_name, & rb) . unwrap ( ) ;
254
+ STREAM_WRITERS :: append_to_local ( stream_name, & rb) ? ;
282
255
283
256
Ok ( 0 )
284
257
}
285
258
286
259
// inferSchema is a constructor to Schema
287
260
// returns raw arrow schema type and arrow schema to string type.
288
- fn infer_schema ( & self ) -> Result < Schema , Error > {
261
+ fn infer_schema ( & self ) -> Result < Schema , ArrowError > {
289
262
let reader = self . body . as_bytes ( ) ;
290
263
let mut buf_reader = BufReader :: new ( reader) ;
291
- let inferred_schema = infer_json_schema ( & mut buf_reader, None ) ?;
292
-
293
- Ok ( inferred_schema)
264
+ infer_json_schema ( & mut buf_reader, None )
294
265
}
295
266
296
267
fn get_reader ( & self , arrow_schema : arrow:: datatypes:: Schema ) -> json:: Reader < & [ u8 ] > {
@@ -301,3 +272,27 @@ impl Event {
301
272
)
302
273
}
303
274
}
275
+
276
+ pub mod error {
277
+ use crate :: metadata:: error:: stream_info:: MetadataError ;
278
+ use crate :: storage:: ObjectStorageError ;
279
+ use datafusion:: arrow:: error:: ArrowError ;
280
+
281
+ use super :: StreamWriterError ;
282
+
283
+ #[ derive( Debug , thiserror:: Error ) ]
284
+ pub enum EventError {
285
+ #[ error( "Missing Record from event body" ) ]
286
+ MissingRecord ,
287
+ #[ error( "Stream Writer Failed: {0}" ) ]
288
+ StreamWriter ( #[ from] StreamWriterError ) ,
289
+ #[ error( "Metadata Error: {0}" ) ]
290
+ Metadata ( #[ from] MetadataError ) ,
291
+ #[ error( "Stream Writer Failed: {0}" ) ]
292
+ Arrow ( #[ from] ArrowError ) ,
293
+ #[ error( "Schema Mismatch: {0}" ) ]
294
+ SchemaMismatch ( String ) ,
295
+ #[ error( "Schema Mismatch: {0}" ) ]
296
+ ObjectStorage ( #[ from] ObjectStorageError ) ,
297
+ }
298
+ }
0 commit comments