@@ -33,14 +33,18 @@ class SendState : IFileTransferState
3333
3434 private bool forceTCP => services . MessageHandler . FTTransportLayer == "Tcp" ;
3535 private ServiceHub services => ServiceHub . Instance ;
36-
36+ private AutoResetEvent signal = new AutoResetEvent ( false ) ;
37+ int exit = 0 ;
3738 public SendState ( string [ ] files , Guid selectedPeer )
3839 {
3940 AssociatedPeer = selectedPeer ;
4041 StateId = Guid . NewGuid ( ) ;
42+ Thread thread = new Thread ( SendRoutine ) ;
43+ thread . Start ( ) ;
4144 sw = Stopwatch . StartNew ( ) ;
4245 ThreadPool . UnsafeQueueUserWorkItem ( ( s ) =>
4346 ExtractDirectoryTree ( files ) , null ) ;
47+
4448 }
4549
4650 private void ExtractDirectoryTree ( string [ ] files )
@@ -98,89 +102,135 @@ public void Cancel(string why)
98102 {
99103 Cancelled ? . Invoke ( new Completion ( ) { AdditionalInfo = why , Directory = tree . seed } ) ;
100104 }
101- private readonly object mtex = new object ( ) ;
102- private void SendNextChunk ( )
105+
106+ private void SendRoutine ( )
103107 {
104- //try
105- //{
106- // if (Interlocked.Increment(ref currentChunk) == fileDatas.Count)
107- // {
108- // // done
109- // Interlocked.Exchange(ref AllSent, 1);
110- // CheckFinalisation();
111- // }
112- // else if (Interlocked.CompareExchange(ref currentChunk, 0, 0) < fileDatas.Count)
113- // {
114- // var fileData = fileDatas[currentChunk];
115- // fileData.ReadBytes();
116- // CalcHash(fileData);
108+ var buffer = new byte [ PersistentSettingConfig . Instance . ChunkSize ] ;
117109
118- // int numUnacked = Interlocked.Increment(ref unAcked);
110+ while ( true )
111+ {
112+ signal . WaitOne ( ) ;
113+ if ( Interlocked . CompareExchange ( ref exit , 0 , 0 ) == 1 )
114+ return ;
119115
120- // var envelope = fileData.ConvertToMessageEnvelope(out byte[] chunkBuffer);
121- // envelope.MessageId = StateId;
122- // services.MessageHandler.SendAsyncMessage(AssociatedPeer, envelope, forceTCP);
116+ while ( true )
117+ {
118+ if ( Interlocked . CompareExchange ( ref exit , 0 , 0 ) == 1 )
119+ return ;
123120
124- // PublishStatus(fileData);
125- // fileData.Release();
121+ var cc = Interlocked . Increment ( ref currentChunk ) ;
122+ if ( cc == fileDatas . Count )
123+ {
124+ // done
125+ Interlocked . Exchange ( ref AllSent , 1 ) ;
126+ CheckFinalisation ( ) ;
127+ break ;
128+ }
129+ else if ( cc > fileDatas . Count )
130+ break ;
131+
132+ var fileData = fileDatas [ cc ] ;
133+ fileData . ReadBytesInto ( buffer , 0 , out int cnt ) ;
134+ CalcHash ( fileData ) ;
126135
127- // if (Interlocked.Add(ref totalOnWire, fileData.count) < WindowSize)
128- // {
129- // SendNextChunk();
130- // }
131- // }
132- //}
133- //catch (Exception e)
134- //{
135- // CancelExplicit("Exception Occurred : " + e.ToString());
136- //}
137136
137+ var envelope = fileData . ConvertToMessageEnvelope ( out byte [ ] chunkBuffer ) ;
138+ envelope . MessageId = StateId ;
139+ services . MessageHandler . SendAsyncMessage ( AssociatedPeer , envelope , forceTCP ) ;
138140
139- //-------------------------------------------
141+ PublishStatus ( fileData ) ;
142+ fileData . Release ( ) ;
140143
144+ if ( Interlocked . Add ( ref totalOnWire , fileData . count ) > WindowSize )
145+ break ;
146+
147+
148+ }
149+
150+ }
151+ }
152+ private readonly object mtex = new object ( ) ;
153+ private void SendNextChunk ( )
154+ {
155+ signal . Set ( ) ;
156+ return ;
141157 try
142158 {
143-
144- while ( true )
159+ if ( Interlocked . Increment ( ref currentChunk ) == fileDatas . Count )
160+ {
161+ // done
162+ Interlocked . Exchange ( ref AllSent , 1 ) ;
163+ CheckFinalisation ( ) ;
164+ }
165+ else if ( Interlocked . CompareExchange ( ref currentChunk , 0 , 0 ) < fileDatas . Count )
145166 {
146- lock ( mtex )
167+ var fileData = fileDatas [ currentChunk ] ;
168+ fileData . ReadBytes ( ) ;
169+ CalcHash ( fileData ) ;
170+
171+ int numUnacked = Interlocked . Increment ( ref unAcked ) ;
172+
173+ var envelope = fileData . ConvertToMessageEnvelope ( out byte [ ] chunkBuffer ) ;
174+ envelope . MessageId = StateId ;
175+ services . MessageHandler . SendAsyncMessage ( AssociatedPeer , envelope , forceTCP ) ;
176+
177+ PublishStatus ( fileData ) ;
178+ fileData . Release ( ) ;
179+
180+ if ( Interlocked . Add ( ref totalOnWire , fileData . count ) < WindowSize )
147181 {
148- var cc = Interlocked . Increment ( ref currentChunk ) ;
149- if ( cc == fileDatas . Count )
150- {
151- // done
152- Interlocked . Exchange ( ref AllSent , 1 ) ;
153- CheckFinalisation ( ) ;
154- break ;
155- }
156- else if ( cc > fileDatas . Count )
157- break ;
158-
159- var fileData = fileDatas [ cc ] ;
160- fileData . ReadBytes ( ) ;
161- CalcHash ( fileData ) ;
162-
163- int numUnacked = Interlocked . Increment ( ref unAcked ) ;
164-
165- var envelope = fileData . ConvertToMessageEnvelope ( out byte [ ] chunkBuffer ) ;
166- envelope . MessageId = StateId ;
167- services . MessageHandler . SendAsyncMessage ( AssociatedPeer , envelope , forceTCP ) ;
168-
169- PublishStatus ( fileData ) ;
170- fileData . Release ( ) ;
171-
172- if ( Interlocked . Add ( ref totalOnWire , fileData . count ) > WindowSize )
173- break ;
182+ SendNextChunk ( ) ;
174183 }
175-
176184 }
177-
178185 }
179186 catch ( Exception e )
180187 {
181188 CancelExplicit ( "Exception Occurred : " + e . ToString ( ) ) ;
182189 }
183190
191+ //try
192+ //{
193+
194+ // while (true)
195+ // {
196+ // lock (mtex)
197+ // {
198+ // var cc = Interlocked.Increment(ref currentChunk);
199+ // if (cc == fileDatas.Count)
200+ // {
201+ // // done
202+ // Interlocked.Exchange(ref AllSent, 1);
203+ // CheckFinalisation();
204+ // break;
205+ // }
206+ // else if (cc > fileDatas.Count)
207+ // break;
208+
209+ // var fileData = fileDatas[cc];
210+ // fileData.ReadBytes();
211+ // CalcHash(fileData);
212+
213+ // int numUnacked = Interlocked.Increment(ref unAcked);
214+
215+ // var envelope = fileData.ConvertToMessageEnvelope(out byte[] chunkBuffer);
216+ // envelope.MessageId = StateId;
217+ // services.MessageHandler.SendAsyncMessage(AssociatedPeer, envelope, forceTCP);
218+
219+ // PublishStatus(fileData);
220+ // fileData.Release();
221+
222+ // if (Interlocked.Add(ref totalOnWire, fileData.count) > WindowSize)
223+ // break;
224+ // }
225+
226+ // }
227+
228+ //}
229+ //catch (Exception e)
230+ //{
231+ // CancelExplicit("Exception Occurred : " + e.ToString());
232+ //}
233+
184234 }
185235 //private void SendNextChunk2()
186236 //{
@@ -262,6 +312,7 @@ public void CancelExplicit(string why)
262312 } ;
263313
264314 services . MessageHandler . SendAsyncMessage ( AssociatedPeer , msg , forceTCP ) ;
315+ Cleanup ( ) ;
265316 }
266317
267318 private void PublishStatus ( FileChunk fileData )
@@ -298,7 +349,15 @@ private void HandleCompletion()
298349 response . MessageId = StateId ;
299350 response . KeyValuePairs = new Dictionary < string , string > ( ) { { name , null } } ;
300351 services . MessageHandler . SendAsyncMessage ( AssociatedPeer , response , forceTCP ) ;
352+
353+ Interlocked . Exchange ( ref exit , 1 ) ;
354+ signal . Set ( ) ;
301355 }
302356
357+ internal void Cleanup ( )
358+ {
359+ Interlocked . Exchange ( ref exit , 1 ) ;
360+ signal . Set ( ) ;
361+ }
303362 }
304363}
0 commit comments