@@ -609,21 +609,31 @@ def test_storage_server_replication(all_nodes: list[Daemon]):
609609 print (f"Killing SN { swarm_sn_for_pubkey [0 ].name } 's storage@{ swarm_sn_for_pubkey [0 ].storage_server_https_port } " )
610610 swarm_sn_for_pubkey [0 ].stop_storage_server ()
611611
612- # Send a message to 1 storage server to test message replication w/ 1 dead node
613- store_params = {
614- "method" : "store" ,
615- "params" : {
616- "pubkey" : ed25519_pkey_hex ,
617- "timestamp" : int (time .time ()) * 1000 ,
618- "data" : base64 .b64encode (f"test msg is replicated w/ 1 dead node" .encode ('utf-8' )).decode (),
619- "ttl" : f"{ 30_000 * 60 } " , # 30 minutes
612+ # Send messages to 1 storage server to test message replication w/ 1 dead node
613+ for index in range (4 ):
614+ store_params = {
615+ "method" : "store" ,
616+ "params" : {
617+ "pubkey" : ed25519_pkey_hex ,
618+ "timestamp" : int (time .time ()) * 1000 ,
619+ "data" : base64 .b64encode (f"{ index } test msg is replicated w/ 1 dead node" .encode ('utf-8' )).decode (),
620+ "ttl" : f"{ 30_000 * 60 } " , # 30 minutes
621+ }
620622 }
621- }
622- expected_msg_count += 1
623- print (f"Store another message into SN { swarm_sn_for_pubkey [1 ].name } 's storage@{ swarm_sn_for_pubkey [1 ].storage_server_https_port } /storage_rpc/v1" )
624- response = swarm_sn_for_pubkey [1 ].storage_rpc (path = "/storage_rpc/v1" , params = store_params )
625- assert response .status_code == 200
626- time .sleep (2 ) # Sleep abit to allow the message to get replicated across the swarm
623+ expected_msg_count += 1
624+
625+ print (f"Store another message into SN { swarm_sn_for_pubkey [1 ].name } 's storage@{ swarm_sn_for_pubkey [1 ].storage_server_https_port } /storage_rpc/v1" )
626+ response = swarm_sn_for_pubkey [1 ].storage_rpc (path = "/storage_rpc/v1" , params = store_params )
627+ assert response .status_code == 200
628+
629+ time .sleep (5 ) # Sleep abit to allow the message to get replicated across the swarm
630+
631+ # Kill SN 1's storage server to test the serialisation and deserialisation of retryable requests
632+ print (f"Killing SN { swarm_sn_for_pubkey [1 ].name } 's storage@{ swarm_sn_for_pubkey [0 ].storage_server_https_port } to test serialisation of retryable request to { swarm_sn_for_pubkey [0 ].name } 's storage@{ swarm_sn_for_pubkey [0 ].storage_server_https_port } " )
633+ swarm_sn_for_pubkey [1 ].stop_storage_server ()
634+
635+ time .sleep (5 ) # Sleep then restart the server
636+ swarm_sn_for_pubkey [1 ].start_storage_server ()
627637
628638 # Start up the server we killed
629639 swarm_sn_for_pubkey [0 ].start_storage_server ()
@@ -635,8 +645,11 @@ def test_storage_server_replication(all_nodes: list[Daemon]):
635645 prev_msg_replicated_to_sn_count = 0
636646 sn_retrieve_list = swarm_sn_for_pubkey [:]
637647 attempts = 0
638- while attempts < 3 :
648+ while attempts < 99 :
639649 index = 0
650+ if attempts > 0 :
651+ print (f" Attempt #{ attempts } " )
652+
640653 while index < len (sn_retrieve_list ):
641654 retrieve_ts = int (time .time ()) * 1000
642655 retrieve_sig_payload = "retrieve" .encode ('utf-8' ) + str (retrieve_ts ).encode ('utf-8' )
@@ -674,12 +687,52 @@ def test_storage_server_replication(all_nodes: list[Daemon]):
674687 if msg_replicated_to_sn_count == len (swarm_sn_for_pubkey ):
675688 break
676689
677- time .sleep (2 )
690+ time .sleep (5 )
678691 attempts += 1
679692
680693 if msg_replicated_to_sn_count != len (swarm_sn_for_pubkey ):
681694 print (f" Message replication failed after { attempts } attempts. { msg_replicated_to_sn_count } /{ len (swarm_sn_for_pubkey )} received the message" )
682695
696+ sn = swarm_sn_for_pubkey [2 ]
697+ print (f" Kill storage@{ sn .storage_server_https_port } and delete their DB. That server's handshake with the swarm should request a DB dump due to a deleted DB" )
698+
699+ sn .stop_storage_server ()
700+ sn_db_path = pathlib .Path ("{}/storage/storage.db" .format (sn .datadir ))
701+ sn_db_path .unlink ()
702+
703+ print (f" Starting storage@{ sn .storage_server_https_port } aftering deleting their DB, checking if we synced the messages" )
704+ sn .start_storage_server ()
705+
706+ # Sleep abit to give some time for the SS to startup
707+ time .sleep (3 )
708+
709+ while attempts < 99 :
710+ if attempts > 0 :
711+ print (f" Attempt #{ attempts } " )
712+
713+ retrieve_ts = int (time .time ()) * 1000
714+ retrieve_sig_payload = "retrieve" .encode ('utf-8' ) + str (retrieve_ts ).encode ('utf-8' )
715+ retrieve_params = {
716+ "method" : "retrieve" ,
717+ "params" : {
718+ "pubkey" : ed25519_pkey_hex ,
719+ "timestamp" : retrieve_ts ,
720+ "signature" : base64 .b64encode (ed25519_skey .sign (retrieve_sig_payload )).decode (),
721+ }
722+ }
723+ response = sn .storage_rpc (path = "/storage_rpc/v1" , params = retrieve_params )
724+ if response .status_code == 200 :
725+ json = response .json ();
726+ msgs_array = json ["messages" ]
727+ print (f" { sn .name } 's storage@{ sn .storage_server_https_port } (HTTP status { response .status_code } ) message count { len (msgs_array )} /{ expected_msg_count } " )
728+ if len (msgs_array ) == expected_msg_count :
729+ break
730+ else :
731+ print (f" { sn .name } 's storage@{ sn .storage_server_https_port } (HTTP status { response .status_code } )" )
732+
733+ time .sleep (5 )
734+ attempts += 1
735+
683736
684737def print_unicode_table (rows : List [List [str ]]) -> None :
685738 # Calculate maximum width for each column
0 commit comments