@@ -39,148 +39,157 @@ public async Task RunAsync(CancellationToken cancel)
3939 throw ;
4040 }
4141
42- // We cancel processing if TShark exits or we get our own higher level CT signaled.
43- var cancelProcessingCts = CancellationTokenSource . CreateLinkedTokenSource ( cancel ) ;
44- var stdoutFinished = new SemaphoreSlim ( 0 , 1 ) ;
45- var stderrFinished = new SemaphoreSlim ( 0 , 1 ) ;
42+ _log . Info ( "Starting TZSP packet stream processing." ) ;
4643
47- void ConsumeStandardOutput ( Stream stdout )
44+ // TShark will exit after N packets have been processed, to enable us to cleanup temp files.
45+ // We just run it in a loop until cancelled or until TShark fails.
46+ while ( ! cancel . IsCancellationRequested )
4847 {
49- // Text mode output, each line consisting of:
50- // 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header)
51- // 2. A space character.
52- // 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header)
53- // 4. A space character.
54- // 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header.
55- // If type of data is UDP header, we need to parse the port ourselves.
48+ // Sometimes (not always) TShark cleans up on its own.
49+ // Better safe than sorry, though!
50+ DeleteTemporaryFiles ( ) ;
5651
57- try
58- {
59- var reader = new StreamReader ( stdout , Encoding . UTF8 , leaveOpen : true ) ;
52+ // We cancel processing if TShark exits or we get our own higher level CT signaled.
53+ using var cancelProcessingCts = CancellationTokenSource . CreateLinkedTokenSource ( cancel ) ;
54+ var stdoutFinished = new SemaphoreSlim ( 0 , 1 ) ;
55+ var stderrFinished = new SemaphoreSlim ( 0 , 1 ) ;
6056
61- while ( true )
57+ void ConsumeStandardOutput ( Stream stdout )
58+ {
59+ // Text mode output, each line consisting of:
60+ // 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header)
61+ // 2. A space character.
62+ // 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header)
63+ // 4. A space character.
64+ // 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header.
65+ // If type of data is UDP header, we need to parse the port ourselves.
66+
67+ try
6268 {
63- var line = reader . ReadLineAsync ( )
64- . WithAbandonment ( cancelProcessingCts . Token )
65- . WaitAndUnwrapExceptions ( ) ;
69+ var reader = new StreamReader ( stdout , Encoding . UTF8 , leaveOpen : true ) ;
6670
67- if ( line == null )
68- break ; // End of stream.
71+ while ( true )
72+ {
73+ var line = reader . ReadLineAsync ( )
74+ . WithAbandonment ( cancelProcessingCts . Token )
75+ . WaitAndUnwrapExceptions ( ) ;
6976
70- string packetBytesHex ;
71- string packetType ;
77+ if ( line == null )
78+ break ; // End of stream.
7279
73- var parts = line . Split ( ' ' ) ;
74- if ( parts . Length != 3 )
75- throw new NotSupportedException ( "Output line did not have expected number of components." ) ;
80+ string packetBytesHex ;
81+ string packetType ;
7682
77- // On some systems there are colons. On others there are not!
78- // Language/version differences? Whatever, get rid of them.
79- packetBytesHex = parts [ 0 ] . Replace ( ":" , "" ) ;
80- packetType = parts [ 1 ] ;
83+ var parts = line . Split ( ' ' ) ;
84+ if ( parts . Length != 3 )
85+ throw new NotSupportedException ( "Output line did not have expected number of components." ) ;
8186
82- var packetBytes = Helpers . Convert . HexStringToByteArray ( packetBytesHex ) ;
87+ // On some systems there are colons. On others there are not!
88+ // Language/version differences? Whatever, get rid of them.
89+ packetBytesHex = parts [ 0 ] . Replace ( ":" , "" ) ;
90+ packetType = parts [ 1 ] ;
8391
84- try
85- {
86- if ( packetType == "eth:ethertype:ip:data" )
87- {
88- ProcessTzspPacketWithUdpHeader ( packetBytes ) ;
89- }
90- else if ( packetType == "eth:ethertype:ip:udp:data" )
92+ var packetBytes = Helpers . Convert . HexStringToByteArray ( packetBytesHex ) ;
93+
94+ try
9195 {
92- var listenPort = ushort . Parse ( parts [ 2 ] ) ;
93- ProcessTzspPacket ( packetBytes , listenPort ) ;
96+ if ( packetType == "eth:ethertype:ip:data" )
97+ {
98+ ProcessTzspPacketWithUdpHeader ( packetBytes ) ;
99+ }
100+ else if ( packetType == "eth:ethertype:ip:udp:data" )
101+ {
102+ var listenPort = ushort . Parse ( parts [ 2 ] ) ;
103+ ProcessTzspPacket ( packetBytes , listenPort ) ;
104+ }
105+ else
106+ {
107+ throw new NotSupportedException ( "Unexpected packet type: " + packetType ) ;
108+ }
94109 }
95- else
110+ catch ( Exception ex )
96111 {
97- throw new NotSupportedException ( "Unexpected packet type : " + packetType ) ;
112+ _log . Error ( "Ignoring unsupported packet : " + Helpers . Debug . GetAllExceptionMessages ( ex ) ) ;
98113 }
99114 }
100- catch ( Exception ex )
101- {
102- _log . Error ( "Ignoring unsupported packet: " + Helpers . Debug . GetAllExceptionMessages ( ex ) ) ;
103- }
104115 }
105- }
106- catch ( OperationCanceledException )
107- {
108- // It's OK, we were cancelled because processing is finished.
109- }
110- catch ( Exception ex )
111- {
112- // If we get here, something is fatally wrong with parsing logic or TShark output.
113- _log . Error ( Helpers . Debug . GetAllExceptionMessages ( ex ) ) ;
114-
115- // This should not happen, so stop everything. Gracefully, so we flush logs.
116- Environment . ExitCode = - 1 ;
117- Program . MasterCancellation . Cancel ( ) ;
118- }
119- finally
120- {
121- stdoutFinished . Release ( ) ;
122- }
123- } ;
116+ catch ( OperationCanceledException )
117+ {
118+ // It's OK, we were cancelled because processing is finished.
119+ }
120+ catch ( Exception ex )
121+ {
122+ // If we get here, something is fatally wrong with parsing logic or TShark output.
123+ _log . Error ( Helpers . Debug . GetAllExceptionMessages ( ex ) ) ;
124124
125- void ConsumeStandardError ( Stream stderr )
126- {
127- // Only errors should show up here. We will simply log them for now
128- // - only if tshark exits do we consider it a fatal error.
125+ // This should not happen, so stop everything. Gracefully, so we flush logs.
126+ Environment . ExitCode = - 1 ;
127+ Program . MasterCancellation . Cancel ( ) ;
128+ }
129+ finally
130+ {
131+ stdoutFinished . Release ( ) ;
132+ }
133+ } ;
129134
130- try
135+ void ConsumeStandardError ( Stream stderr )
131136 {
132- var reader = new StreamReader ( stderr , Encoding . UTF8 , leaveOpen : true ) ;
137+ // Only errors should show up here. We will simply log them for now
138+ // - only if tshark exits do we consider it a fatal error.
133139
134- while ( true )
140+ try
135141 {
136- var line = reader . ReadLineAsync ( )
137- . WithAbandonment ( cancelProcessingCts . Token )
138- . WaitAndUnwrapExceptions ( ) ;
142+ var reader = new StreamReader ( stderr , Encoding . UTF8 , leaveOpen : true ) ;
143+
144+ while ( true )
145+ {
146+ var line = reader . ReadLineAsync ( )
147+ . WithAbandonment ( cancelProcessingCts . Token )
148+ . WaitAndUnwrapExceptions ( ) ;
139149
140- if ( line == null )
141- break ; // End of stream.
150+ if ( line == null )
151+ break ; // End of stream.
142152
143- _log . Error ( line ) ;
153+ _log . Error ( line ) ;
154+ }
144155 }
145- }
146- catch ( OperationCanceledException )
156+ catch ( OperationCanceledException )
157+ {
158+ // It's OK, we were cancelled because processing is finished.
159+ }
160+ finally
161+ {
162+ stderrFinished . Release ( ) ;
163+ }
164+ } ;
165+
166+ var tsharkCommand = new ExternalTool
147167 {
148- // It's OK, we were cancelled because processing is finished.
149- }
150- finally
168+ ExecutablePath = Constants . TsharkExecutableName ,
169+ ResultHeuristics = ExternalToolResultHeuristics . Linux ,
170+ Arguments = @$ "-i ""{ ListenInterface } "" -f ""{ MakeTsharkFilterString ( ) } "" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q -c { Constants . PacketsPerIteration } ",
171+ StandardOutputConsumer = ConsumeStandardOutput ,
172+ StandardErrorConsumer = ConsumeStandardError
173+ } ;
174+
175+ var tshark = tsharkCommand . Start ( ) ;
176+ var result = await tshark . GetResultAsync ( cancel ) ;
177+ cancelProcessingCts . Cancel ( ) ;
178+
179+ // Wait for output processing threads to finish, so error messages are printed to logs before we exit.
180+ _log . Debug ( "TShark finished iteration. Waiting for data processing threads to clean up and flush logs." ) ;
181+ await stderrFinished . WaitAsync ( ) ;
182+ await stdoutFinished . WaitAsync ( ) ;
183+
184+ if ( ! cancel . IsCancellationRequested && ! result . Succeeded )
151185 {
152- stderrFinished . Release ( ) ;
186+ _log . Error ( "TShark exited with an error result. Review logs above to understand the details of the failure." ) ;
187+ Environment . ExitCode = - 1 ;
188+ break ;
153189 }
154- } ;
155-
156- var tsharkCommand = new ExternalTool
157- {
158- ExecutablePath = Constants . TsharkExecutableName ,
159- ResultHeuristics = ExternalToolResultHeuristics . Linux ,
160- Arguments = @$ "-i ""{ ListenInterface } "" -f ""{ MakeTsharkFilterString ( ) } "" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q",
161- StandardOutputConsumer = ConsumeStandardOutput ,
162- StandardErrorConsumer = ConsumeStandardError
163- } ;
164-
165- var tshark = tsharkCommand . Start ( ) ;
166-
167- _log . Info ( "Starting TZSP packet stream processing." ) ;
168-
169- var result = await tshark . GetResultAsync ( cancel ) ;
170- cancelProcessingCts . Cancel ( ) ;
171-
172- if ( ! cancel . IsCancellationRequested && ! result . Succeeded )
173- {
174- _log . Error ( "TShark exited with an error result. Review logs above to understand the details of the failure." ) ;
175- Environment . ExitCode = - 1 ;
176190 }
177191
178192 await _metricServer . StopAsync ( ) ;
179-
180- // Wait for output processing threads to finish, so error messages are printed to logs before we exit.
181- _log . Debug ( "Waiting for data processing threads to clean up and flush logs." ) ;
182- await stderrFinished . WaitAsync ( ) ;
183- await stdoutFinished . WaitAsync ( ) ;
184193 }
185194
186195 private static async Task VerifyTshark ( CancellationToken cancel )
@@ -202,6 +211,24 @@ private static async Task VerifyTshark(CancellationToken cancel)
202211 throw new NotSupportedException ( "Unrecognized TShark version/build." ) ;
203212 }
204213
214+ private static void DeleteTemporaryFiles ( )
215+ {
216+ var files = Directory . GetFiles ( Path . GetTempPath ( ) , "wireshark_*.pcapng" ) ;
217+
218+ foreach ( var file in files )
219+ {
220+ try
221+ {
222+ File . Delete ( file ) ;
223+ _log . Debug ( $ "Deleted temporary file: { file } ") ;
224+ }
225+ catch
226+ {
227+ // It's fine - maybe it is in use by a parallel TShark instance!
228+ }
229+ }
230+ }
231+
205232 private static readonly IPNetwork MulticastNetwork = IPNetwork . Parse ( "224.0.0.0/4" ) ;
206233 private static readonly IPNetwork [ ] PrivateUseNetworks = new [ ]
207234 {
0 commit comments