1+ package replicate .twophaseexecution ;
2+
3+ import org .apache .logging .log4j .LogManager ;
4+ import org .apache .logging .log4j .Logger ;
5+ import replicate .common .*;
6+ import replicate .net .InetAddressAndPort ;
7+ import replicate .twophaseexecution .messages .*;
8+ import replicate .vsr .CompletionCallback ;
9+ import replicate .wal .Command ;
10+ import replicate .wal .DurableKVStore ;
11+
12+ import java .io .ByteArrayInputStream ;
13+ import java .io .IOException ;
14+ import java .math .BigInteger ;
15+ import java .util .List ;
16+ import java .util .Optional ;
17+ import java .util .concurrent .CompletableFuture ;
18+
19+ /**
20+ * SinglePhaseExecution demonstrates the problematic single-phase execution pattern
21+ * that leads to inconsistencies and motivates the need for consensus algorithms.
22+ *
23+ * This implementation follows the "execute-first, propagate-later" anti-pattern:
24+ * 1. Receives client request
25+ * 2. Executes command immediately on local node
26+ * 3. Responds "Success" to client immediately
27+ * 4. Attempts to propagate to other nodes asynchronously
28+ * 5. If propagation fails or node crashes, system becomes inconsistent
29+ *
30+ * This is exactly the problematic scenario shown in single_phase_execution.puml
31+ */
32+ public class SinglePhaseExecution extends Replica {
33+ private static Logger logger = LogManager .getLogger (SinglePhaseExecution .class );
34+
35+ private final DurableKVStore kvStore ;
36+
37+ public SinglePhaseExecution (String name , Config config , SystemClock clock ,
38+ InetAddressAndPort clientConnectionAddress ,
39+ InetAddressAndPort peerConnectionAddress ,
40+ List <InetAddressAndPort > peerAddresses ) throws IOException {
41+ super (name , config , clock , clientConnectionAddress , peerConnectionAddress , peerAddresses );
42+ this .kvStore = new DurableKVStore (config );
43+ }
44+
45+ @ Override
46+ protected void registerHandlers () {
47+ handlesMessage (MessageId .ProposeRequest , this ::handlePropagate , ProposeRequest .class );
48+ handlesRequestAsync (MessageId .ExcuteCommandRequest , this ::handleExecute , ExecuteCommandRequest .class );
49+ }
50+
51+ /**
52+ * PROBLEMATIC: Single-phase execution pattern
53+ * 1. Execute immediately on local node
54+ * 2. Respond success to client immediately
55+ * 3. Try to propagate asynchronously (fire-and-forget)
56+ */
57+ CompletableFuture <ExecuteCommandResponse > handleExecute (ExecuteCommandRequest request ) {
58+ byte [] commandBytes = request .command ;
59+ Command command = Command .deserialize (new ByteArrayInputStream (commandBytes ));
60+
61+ logger .info ("{} received command: {}" , getName (), command .getClass ().getSimpleName ());
62+
63+ // STEP 1: Execute immediately on local node (PROBLEMATIC!)
64+ boolean executed = executeCommand (command );
65+
66+ // STEP 2: Respond "Success" to client immediately (PROBLEMATIC!)
67+ ExecuteCommandResponse response = new ExecuteCommandResponse (Optional .empty (), executed );
68+
69+ // STEP 3: Try to propagate asynchronously (fire-and-forget)
70+ if (executed ) {
71+ logger .info ("{} executed command locally, now attempting propagation..." , getName ());
72+ propagateAsynchronously (commandBytes );
73+ }
74+
75+ // Return immediate success - this is the problem!
76+ return CompletableFuture .completedFuture (response );
77+ }
78+
79+ /**
80+ * Execute command locally without waiting for consensus
81+ */
82+ private boolean executeCommand (Command command ) {
83+ if (command instanceof CompareAndSwap ) {
84+ CompareAndSwap cas = (CompareAndSwap ) command ;
85+ Optional <String > existingValue = Optional .ofNullable (kvStore .get (cas .getKey ()));
86+
87+ if (existingValue .equals (cas .getExistingValue ())) {
88+ kvStore .put (cas .getKey (), cas .getNewValue ());
89+ logger .info ("{} executed: {} = {} (was {})" , getName (), cas .getKey (), cas .getNewValue (), existingValue .orElse ("null" ));
90+ return true ;
91+ } else {
92+ logger .info ("{} execution failed: expected {} but found {}" , getName (), cas .getExistingValue ().orElse ("null" ), existingValue .orElse ("null" ));
93+ return false ;
94+ }
95+ }
96+
97+ throw new IllegalArgumentException ("Unknown command: " + command );
98+ }
99+
100+ /**
101+ * Attempt to propagate to other nodes asynchronously (fire-and-forget)
102+ * This is where the problems occur - if this fails, nodes become inconsistent
103+ */
104+ private void propagateAsynchronously (byte [] commandBytes ) {
105+ ProposeRequest propagateRequest = new ProposeRequest (commandBytes );
106+
107+ // Use the sendOnewayMessageToOtherReplicas method from parent class
108+ try {
109+ logger .info ("{} attempting to propagate to other replicas" , getName ());
110+ sendOnewayMessageToOtherReplicas (propagateRequest );
111+ } catch (Exception e ) {
112+ logger .warn ("{} failed to propagate: {}" , getName (), e .getMessage ());
113+ // PROBLEM: Propagation failure is silently ignored!
114+ // The local node has already executed and responded success to client
115+ }
116+ }
117+
118+ /**
119+ * Handle propagation from other nodes
120+ */
121+ private void handlePropagate (Message <ProposeRequest > message ) {
122+ ProposeRequest request = message .messagePayload ();
123+ byte [] commandBytes = request .getCommand ();
124+ Command command = Command .deserialize (new ByteArrayInputStream (commandBytes ));
125+
126+ logger .info ("{} received propagation: {}" , getName (), command .getClass ().getSimpleName ());
127+
128+ // Execute the propagated command
129+ executeCommand (command );
130+ }
131+
132+ public String getValue (String key ) {
133+ return kvStore .get (key );
134+ }
135+
136+ /**
137+ * Simulate node crash by closing the kvStore
138+ */
139+ public void close () {
140+ logger .info ("{} is crashing!" , getName ());
141+ kvStore .close ();
142+ shutdown ();
143+ }
144+ }
0 commit comments