@@ -41,7 +41,6 @@ use itertools::kmerge_by;
41
41
use lazy_static:: __Deref;
42
42
use relative_path:: RelativePath ;
43
43
use relative_path:: RelativePathBuf ;
44
- use serde:: Serialize ;
45
44
use serde_json:: Value ;
46
45
47
46
use std:: {
@@ -149,9 +148,29 @@ pub trait ObjectStorage: Sync + 'static {
149
148
& self ,
150
149
stream_name : & str ,
151
150
) -> Result < HashMap < String , Arc < Schema > > , ObjectStorageError > {
152
- let schema = self . get_object ( & schema_path ( stream_name) ) . await ?;
153
- let schema = serde_json:: from_slice ( & schema) . expect ( "schema map is valid json" ) ;
154
- Ok ( schema)
151
+ let schema_map = self . get_object ( & schema_path ( stream_name) ) . await ?;
152
+ if let Ok ( schema_map) = serde_json:: from_slice ( & schema_map) {
153
+ Ok ( schema_map)
154
+ } else {
155
+ // fix for schema metadata serialize
156
+ let mut schema_map: serde_json:: Value =
157
+ serde_json:: from_slice ( & schema_map) . expect ( "valid json" ) ;
158
+
159
+ for schema in schema_map
160
+ . as_object_mut ( )
161
+ . expect ( "schema map is json object" )
162
+ . values_mut ( )
163
+ {
164
+ let map = schema. as_object_mut ( ) . unwrap ( ) ;
165
+ map. insert (
166
+ "metadata" . to_string ( ) ,
167
+ Value :: Object ( serde_json:: Map :: new ( ) ) ,
168
+ ) ;
169
+ }
170
+
171
+ Ok ( serde_json:: from_value ( schema_map)
172
+ . expect ( "should be deserializable after alteration" ) )
173
+ }
155
174
}
156
175
157
176
async fn get_alerts ( & self , stream_name : & str ) -> Result < Alerts , ObjectStorageError > {
@@ -380,7 +399,7 @@ impl MergedRecordReader {
380
399
}
381
400
382
401
#[ inline( always) ]
383
- fn to_bytes ( any : & ( impl ?Sized + Serialize ) ) -> Bytes {
402
+ fn to_bytes ( any : & ( impl ?Sized + serde :: Serialize ) ) -> Bytes {
384
403
serde_json:: to_vec ( any)
385
404
. map ( |any| any. into ( ) )
386
405
. expect ( "serialize cannot fail" )
0 commit comments