1
+ using System ;
2
+ using System . Threading ;
3
+ using System . Threading . Tasks ;
4
+ using System . Management . Automation ;
5
+ using System . Collections . Concurrent ;
6
+ using System . Runtime . ExceptionServices ;
7
+ using Microsoft . SqlServer . XEvent . XELite ;
8
+
9
+ namespace Dataplat . Dbatools . Commands
10
+ {
11
+ /// <summary>
12
+ /// Implements the <c>Read-XEvent</c> internal command
13
+ /// </summary>
14
+ [ Cmdlet ( "Read" , "XEvent" , DefaultParameterSetName = "Default" , RemotingCapability = RemotingCapability . PowerShell ) ]
15
+ public class ReadXEvent : PSCmdlet
16
+ {
17
+ #region Parameters
18
+ /// <summary>
19
+ /// The FileName of the .XEL
20
+ /// </summary>
21
+ [ Parameter ( ValueFromPipeline = true ) ]
22
+ [ Alias ( "FullName" ) ]
23
+ public string FileName ;
24
+
25
+ /// <summary>
26
+ /// The ConnectionString to to SQL Instance
27
+ /// </summary>
28
+ [ Parameter ( ) ]
29
+ public string ConnectionString ;
30
+
31
+ /// <summary>
32
+ /// The session name of the XE
33
+ /// </summary>
34
+ [ Parameter ( ) ]
35
+ public string SessionName ;
36
+ #endregion Parameters
37
+
38
+ #region Private Methods
39
+
40
+ private readonly CancellationTokenSource CancelToken = new CancellationTokenSource ( ) ;
41
+
42
+ private void ParseFile ( string FileName )
43
+ {
44
+ new XEFileEventStreamer ( FileName ) . ReadEventStream ( delegate ( )
45
+ {
46
+ return Task . CompletedTask ;
47
+ } ,
48
+ delegate ( IXEvent xevent )
49
+ {
50
+ WriteObject ( xevent ) ;
51
+ return Task . CompletedTask ;
52
+ } ,
53
+ CancelToken . Token
54
+ ) . Wait ( ) ;
55
+ }
56
+
57
+ private void ParseStream ( string ConnectionString , string SessionName )
58
+ {
59
+ XELiveEventStreamer XELiveEventStreamer = new XELiveEventStreamer ( ConnectionString , SessionName ) ;
60
+ BlockingCollection < object > queue = new BlockingCollection < object > ( 255 ) ;
61
+
62
+ Task task = XELiveEventStreamer . ReadEventStream ( delegate ( )
63
+ {
64
+ queue . Add ( null , CancelToken . Token ) ;
65
+ return Task . CompletedTask ;
66
+ } ,
67
+ delegate ( IXEvent xevent )
68
+ {
69
+ queue . Add ( xevent , CancelToken . Token ) ;
70
+ return Task . CompletedTask ;
71
+ } ,
72
+ CancelToken . Token
73
+ ) ;
74
+
75
+ task . ContinueWith ( delegate ( Task newtask )
76
+ {
77
+ queue . CompleteAdding ( ) ;
78
+ }
79
+ ) ;
80
+
81
+ while ( true )
82
+ {
83
+ try
84
+ {
85
+ object item = queue . Take ( CancelToken . Token ) ;
86
+
87
+ if ( item != null )
88
+ {
89
+ WriteObject ( item ) ;
90
+ }
91
+ }
92
+ catch ( OperationCanceledException )
93
+ {
94
+ break ;
95
+ }
96
+ catch ( InvalidOperationException ex )
97
+ {
98
+ if ( ! CancelToken . IsCancellationRequested )
99
+ {
100
+ ExceptionDispatchInfo . Capture ( ex ) . Throw ( ) ;
101
+ }
102
+ break ;
103
+ }
104
+ }
105
+ }
106
+
107
+ #endregion Private Methods
108
+
109
+ #region Command Implementation
110
+ /// <summary>
111
+ /// Implements the begin action of the command
112
+ /// </summary>
113
+ protected override void BeginProcessing ( )
114
+ {
115
+
116
+ }
117
+
118
+ /// <summary>
119
+ /// Implements the process action of the command
120
+ /// </summary>
121
+ protected override void ProcessRecord ( )
122
+ {
123
+ if ( FileName != null )
124
+ {
125
+ ParseFile ( FileName ) ;
126
+ }
127
+ else
128
+ {
129
+ // ps checks to ensure both ConnectionString and SessionName exist
130
+ ParseStream ( ConnectionString , SessionName ) ;
131
+ }
132
+ }
133
+
134
+ /// <summary>
135
+ /// Implements the end action of the command
136
+ /// </summary>
137
+ protected override void EndProcessing ( )
138
+ {
139
+ }
140
+ #endregion Command Implementation
141
+ }
142
+ }
0 commit comments