1313import java .net .Socket ;
1414import java .util .ArrayList ;
1515import java .util .List ;
16- import java .util .concurrent .ExecutorService ;
17- import java .util .concurrent .Executors ;
16+ import java .util .concurrent .*;
1817
1918/**
2019 * Created by Esteban Luchsinger on 18.03.2016.
@@ -29,8 +28,22 @@ public class SocketNetworkClient extends Client implements NetworkClient, Closea
2928
3029 private volatile boolean isWorking ;
3130
31+ /**
32+ * This executor handles the reading of new messages.
33+ */
3234 private final ExecutorService readingExecutor ;
3335
36+ /**
37+ * This executor handles the sending of messages to the client.
38+ */
39+ private final ExecutorService sendingExecutor ;
40+
41+ /**
42+ * The last async sending will be stored inside of this future.
43+ * The future can be used to wait until the client received the data.
44+ */
45+ private Future <?> lastSentFuture ;
46+
3447 private final List <OnDisconnected > onDisconnectedListeners ;
3548
3649 /**
@@ -67,6 +80,11 @@ public SocketNetworkClient(Socket socket) throws IOException {
6780
6881 this .readingExecutor = Executors .newSingleThreadExecutor ();
6982 this .readingExecutor .submit (this ::listen );
83+
84+ // Initialize the executor as single thread executor.
85+ // A single thread executor ensures that every submit is executed in
86+ // the correct order.
87+ this .sendingExecutor = Executors .newSingleThreadExecutor ();
7088 }
7189
7290 /**
@@ -86,32 +104,79 @@ public void removeOnDisconnectedListener(OnDisconnected listener) {
86104 this .onDisconnectedListeners .remove (listener );
87105 }
88106
89-
90- @ Override
91- public ObjectOutputStream getObjectOutputStream () {
107+ private ObjectOutputStream getObjectOutputStream () {
92108 return this .outputStream ;
93109 }
94110
95- @ Override
96- public ObjectInputStream getObjectInputStream () { return this .inputStream ; }
111+ private ObjectInputStream getObjectInputStream () { return this .inputStream ; }
97112
98113 /**
99- * Sends an object to the connected socket.
114+ * Sends an object to the connected socket (asynchronously) .
100115 * This method will send an object in a non-blocking mode (async).
101116 * @param object Object to send. MUST implement the serializable interface.
102- * @throws IOException
103117 */
104118 @ Override
105- public void send (Object object ) throws IOException {
119+ public void send (Object object ) {
120+ this .lastSentFuture = this .sendingExecutor .submit (() -> {
121+ try {
122+ this .sendSync (object );
123+ } catch (IOException e ) {
124+ this .logger .warn ("Error sending object " + object , e );
125+ }
126+ });
127+ }
128+
129+ /**
130+ * Waits until all object were sent.
131+ * If needed, this method returns immediately.
132+ * The timeout is not defined.
133+ */
134+ @ Override
135+ public void waitForSending () {
136+ if (this .lastSentFuture != null ) {
137+ try {
138+ this .lastSentFuture .get ();
139+ } catch (ExecutionException e ) {
140+ this .logger .warn ("Error sending data to client " + this .toString (), e );
141+ } catch (InterruptedException e ) {
142+ this .logger .warn ("Sending data to client " + this + " was interrupted." , e );
143+ }
144+ }
145+ }
146+
147+ /**
148+ * Waits until all objects were sent.
149+ * If needed, this method returns immediately.
150+ *
151+ * @param timeout Timeout time
152+ * @param timeUnit TimeUnit for the timeout
153+ */
154+ @ Override
155+ public void waitForSending (long timeout , TimeUnit timeUnit ) throws TimeoutException {
156+ if (this .lastSentFuture != null ) {
157+ try {
158+ this .lastSentFuture .get (timeout , timeUnit );
159+ } catch (ExecutionException e ) {
160+ this .logger .warn ("Error sending data to client " + this .toString (), e );
161+ } catch (InterruptedException e ) {
162+ this .logger .warn ("Sending data to client " + this + " was interrupted." , e );
163+ }
164+ }
165+ }
166+
167+ /**
168+ * Sends the desired object synchronously.
169+ * @param object Object to send
170+ * @throws IOException
171+ */
172+ private void sendSync (Object object ) throws IOException {
106173
107174 if (object == null )
108175 throw new NullPointerException ("Object is null." );
109176 if (!(object instanceof Serializable ))
110177 throw new RuntimeException ("The object must implement the serializable interface" );
111178
112179 this .getObjectOutputStream ().writeObject (object );
113-
114- // Todo: Implement multi-threading.
115180 }
116181
117182 /**
@@ -120,18 +185,24 @@ public void send(Object object) throws IOException {
120185 private void listen () {
121186 while (isWorking && !this .socket .isClosed ()) {
122187 try {
123- Object receivedObject = this .inputStream .readObject ();
188+ Object receivedObject = this .getObjectInputStream () .readObject ();
124189
125190 if (receivedObject instanceof RenameCommand ) {
126191 RenameCommand command = (RenameCommand ) receivedObject ;
127192 this .setName (command .getName ());
193+ } else {
194+ // The received object is unknown.
195+ this .logger .info ("Received unknown command from client " + this .getName () +
196+ "\n " + receivedObject .toString ());
128197 }
129198 }
130199 catch (EOFException eofException ) {
131200 // An EOF Exception could be due to the client input stream being closed.
132201 // Try to send a beacon to the client, to check if he is still available.
133202 try {
134- this .send (new KeepAliveBeacon ());
203+ // Sends this beacon synchronously because if the client disconnected,
204+ // it will be the last object sent.
205+ this .sendSync (new KeepAliveBeacon ());
135206 } catch (IOException e ) {
136207 // If the beacon failed, this client disconnected.
137208 try {
@@ -158,6 +229,7 @@ public void close() throws IOException {
158229 }
159230
160231 ExecutorServiceUtils .stopExecutorService (this .readingExecutor );
232+ ExecutorServiceUtils .stopExecutorService (this .sendingExecutor );
161233
162234 this .onDisconnected ();
163235 }
0 commit comments