22
33use std:: {
44 collections:: { HashMap , HashSet } ,
5- fs,
5+ fmt , fs,
66 io:: BufReader ,
77 path:: PathBuf ,
88 sync:: { Arc , Mutex } ,
@@ -18,7 +18,7 @@ use indy_utils::{
1818 crypto:: chacha20poly1305_ietf:: { self , Key as MasterKey } ,
1919 secret,
2020} ;
21- use log:: { info, trace} ;
21+ use log:: { error , info, trace, warn } ;
2222use serde:: { Deserialize , Serialize } ;
2323use serde_json:: Value as SValue ;
2424
@@ -44,6 +44,14 @@ mod cache;
4444mod export_import;
4545mod wallet;
4646
47+ #[ derive( Debug ) ]
48+ pub struct MigrationResult {
49+ migrated : u32 ,
50+ skipped : u32 ,
51+ duplicated : u32 ,
52+ failed : u32 ,
53+ }
54+
4755pub struct WalletService {
4856 storage_types : Mutex < HashMap < String , Arc < dyn WalletStorageType > > > ,
4957 wallets : Mutex < HashMap < WalletHandle , Arc < Wallet > > > ,
@@ -354,7 +362,7 @@ impl WalletService {
354362 ) -> IndyResult < ( ) > {
355363 let wallet = self . get_wallet ( wallet_handle) . await ?;
356364 wallet
357- . add ( type_, name, value, tags)
365+ . add ( type_, name, value, tags, true )
358366 . await
359367 . map_err ( |err| WalletService :: _map_wallet_storage_error ( err, type_, name) )
360368 }
@@ -706,7 +714,7 @@ impl WalletService {
706714 old_wh : WalletHandle ,
707715 new_wh : WalletHandle ,
708716 mut migrate_fn : impl FnMut ( Record ) -> Result < Option < Record > , E > ,
709- ) -> IndyResult < ( ) >
717+ ) -> IndyResult < MigrationResult >
710718 where
711719 E : std:: fmt:: Display ,
712720 {
@@ -716,53 +724,111 @@ impl WalletService {
716724 let mut records = old_wallet. get_all ( ) . await ?;
717725 let total = records. get_total_count ( ) ?;
718726 info ! ( "Migrating {total:?} records" ) ;
719- let mut num_records = 0 ;
727+ let mut num_record = 0 ;
728+ let mut migration_result = MigrationResult {
729+ migrated : 0 ,
730+ skipped : 0 ,
731+ duplicated : 0 ,
732+ failed : 0 ,
733+ } ;
720734
721- while let Some ( WalletRecord {
722- type_,
723- id,
724- value,
725- tags,
726- } ) = records. next ( ) . await ?
727- {
728- num_records += 1 ;
729- if num_records % 1000 == 1 {
730- info ! ( "Migrating wallet record number {num_records} / {total:?}" ) ;
735+ while let Some ( source_record) = records. next ( ) . await ? {
736+ num_record += 1 ;
737+ if num_record % 1000 == 1 {
738+ warn ! (
739+ "Migrating wallet record number {num_record} / {total:?}, intermediary \
740+ migration result: ${migration_result:?}"
741+ ) ;
731742 }
743+ trace ! ( "Migrating record: {:?}" , source_record) ;
744+ let unwrapped_type_ = match & source_record. type_ {
745+ None => {
746+ warn ! (
747+ "Skipping item missing 'type' field, record ({num_record}): \
748+ {source_record:?}"
749+ ) ;
750+ migration_result. skipped += 1 ;
751+ continue ;
752+ }
753+ Some ( type_) => type_. clone ( ) ,
754+ } ;
755+ let unwrapped_value = match & source_record. value {
756+ None => {
757+ warn ! (
758+ "Skipping item missing 'value' field, record ({num_record}): \
759+ {source_record:?}"
760+ ) ;
761+ migration_result. skipped += 1 ;
762+ continue ;
763+ }
764+ Some ( value) => value. clone ( ) ,
765+ } ;
766+ let unwrapped_tags = match & source_record. tags {
767+ None => HashMap :: new ( ) ,
768+ Some ( tags) => tags. clone ( ) ,
769+ } ;
770+
732771 let record = Record {
733- type_ : type_. ok_or_else ( || {
734- err_msg (
735- IndyErrorKind :: InvalidState ,
736- "No type fetched for exported record" ,
737- )
738- } ) ?,
739- id,
740- value : value. ok_or_else ( || {
741- err_msg (
742- IndyErrorKind :: InvalidState ,
743- "No value fetched for exported record" ,
744- )
745- } ) ?,
746- tags : tags. ok_or_else ( || {
747- err_msg (
748- IndyErrorKind :: InvalidState ,
749- "No tags fetched for exported record" ,
750- )
751- } ) ?,
772+ type_ : unwrapped_type_,
773+ id : source_record. id . clone ( ) ,
774+ value : unwrapped_value,
775+ tags : unwrapped_tags,
752776 } ;
753777
754- if let Some ( record) = migrate_fn ( record)
755- . map_err ( |e| IndyError :: from_msg ( IndyErrorKind :: InvalidStructure , e. to_string ( ) ) ) ?
778+ let migrated_record = match migrate_fn ( record) {
779+ Ok ( record) => match record {
780+ None => {
781+ warn ! ( "Skipping non-migratable record ({num_record}): {source_record:?}" ) ;
782+ migration_result. skipped += 1 ;
783+ continue ;
784+ }
785+ Some ( record) => record,
786+ } ,
787+ Err ( err) => {
788+ warn ! (
789+ "Skipping item due failed item migration, record ({num_record}): \
790+ {source_record:?}, err: {err}"
791+ ) ;
792+ migration_result. failed += 1 ;
793+ continue ;
794+ }
795+ } ;
796+
797+ match new_wallet
798+ . add (
799+ & migrated_record. type_ ,
800+ & migrated_record. id ,
801+ & migrated_record. value ,
802+ & migrated_record. tags ,
803+ false ,
804+ )
805+ . await
756806 {
757- new_wallet
758- . add ( & record. type_ , & record. id , & record. value , & record. tags )
759- . await ?;
807+ Err ( err) => match err. kind ( ) {
808+ IndyErrorKind :: WalletItemAlreadyExists => {
809+ trace ! (
810+ "Record type: {migrated_record:?} already exists in destination \
811+ wallet, skipping"
812+ ) ;
813+ migration_result. duplicated += 1 ;
814+ continue ;
815+ }
816+ _ => {
817+ error ! (
818+ "Error adding record {migrated_record:?} to destination wallet: \
819+ {err:?}"
820+ ) ;
821+ migration_result. failed += 1 ;
822+ return Err ( err) ;
823+ }
824+ } ,
825+ Ok ( ( ) ) => {
826+ migration_result. migrated += 1 ;
827+ }
760828 }
761829 }
762-
763- info ! ( "{num_records} / {total:?} records have been migrated!" ) ;
764-
765- Ok ( ( ) )
830+ warn ! ( "Migration of total {total:?} records completed, result: ${migration_result:?}" ) ;
831+ Ok ( migration_result)
766832 }
767833
768834 pub async fn export_wallet (
@@ -1073,7 +1139,7 @@ pub struct MetadataRaw {
10731139 pub keys : Vec < u8 > ,
10741140}
10751141
1076- #[ derive( Debug , Clone , Serialize , Deserialize , PartialEq , Eq ) ]
1142+ #[ derive( Clone , Serialize , Deserialize , PartialEq , Eq ) ]
10771143pub struct WalletRecord {
10781144 #[ serde( rename = "type" ) ]
10791145 type_ : Option < String > ,
@@ -1082,6 +1148,17 @@ pub struct WalletRecord {
10821148 tags : Option < Tags > ,
10831149}
10841150
1151+ impl fmt:: Debug for WalletRecord {
1152+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
1153+ f. debug_struct ( "WalletRecord" )
1154+ . field ( "type_" , & self . type_ )
1155+ . field ( "id" , & self . id )
1156+ . field ( "value" , & self . value . as_ref ( ) . map ( |_| "******" ) )
1157+ . field ( "tags" , & self . tags )
1158+ . finish ( )
1159+ }
1160+ }
1161+
10851162impl Ord for WalletRecord {
10861163 fn cmp ( & self , other : & Self ) -> :: std:: cmp:: Ordering {
10871164 ( & self . type_ , & self . id ) . cmp ( & ( & other. type_ , & other. id ) )
0 commit comments