@@ -11,6 +11,7 @@ import (
1111
1212 "github.com/gopcua/opcua"
1313 "github.com/gopcua/opcua/debug"
14+ "github.com/gopcua/opcua/id"
1415 "github.com/gopcua/opcua/monitor"
1516 "github.com/gopcua/opcua/ua"
1617)
@@ -24,6 +25,7 @@ func main() {
2425 keyFile = flag .String ("key" , "" , "Path to private key.pem. Required for security mode/policy != None" )
2526 nodeID = flag .String ("node" , "" , "node id to subscribe to" )
2627 interval = flag .Duration ("interval" , opcua .DefaultSubscriptionInterval , "subscription interval" )
28+ event = flag .Bool ("event" , false , "are you subscribing to events" )
2729 )
2830 flag .BoolVar (& debug .Enable , "debug" , false , "enable debug logging" )
2931 flag .Parse ()
@@ -83,34 +85,122 @@ func main() {
8385 })
8486 wg := & sync.WaitGroup {}
8587
86- // start callback-based subscription
87- wg .Add (1 )
88- go startCallbackSub (ctx , m , * interval , 0 , wg , * nodeID )
88+ fieldNames := []string {"EventId" , "EventType" , "Severity" , "Time" , "Message" }
89+ selects := make ([]* ua.SimpleAttributeOperand , len (fieldNames ))
90+ for i , name := range fieldNames {
91+ selects [i ] = & ua.SimpleAttributeOperand {
92+ TypeDefinitionID : ua .NewNumericNodeID (0 , id .BaseEventType ),
93+ BrowsePath : []* ua.QualifiedName {{NamespaceIndex : 0 , Name : name }},
94+ AttributeID : ua .AttributeIDValue ,
95+ }
96+ }
97+
98+ wheres := & ua.ContentFilter {
99+ Elements : []* ua.ContentFilterElement {
100+ {
101+ FilterOperator : ua .FilterOperatorGreaterThanOrEqual ,
102+ FilterOperands : []* ua.ExtensionObject {
103+ {
104+ EncodingMask : 1 ,
105+ TypeID : & ua.ExpandedNodeID {
106+ NodeID : ua .NewNumericNodeID (0 , id .SimpleAttributeOperand_Encoding_DefaultBinary ),
107+ },
108+ Value : ua.SimpleAttributeOperand {
109+ TypeDefinitionID : ua .NewNumericNodeID (0 , id .BaseEventType ),
110+ BrowsePath : []* ua.QualifiedName {{NamespaceIndex : 0 , Name : "Severity" }},
111+ AttributeID : ua .AttributeIDValue ,
112+ },
113+ },
114+ {
115+ EncodingMask : 1 ,
116+ TypeID : & ua.ExpandedNodeID {
117+ NodeID : ua .NewNumericNodeID (0 , id .LiteralOperand_Encoding_DefaultBinary ),
118+ },
119+ Value : ua.LiteralOperand {
120+ Value : ua .MustVariant (uint16 (0 )),
121+ },
122+ },
123+ },
124+ },
125+ },
126+ }
127+
128+ filter := ua.EventFilter {
129+ SelectClauses : selects ,
130+ WhereClause : wheres ,
131+ }
132+
133+ filterExtObj := ua.ExtensionObject {
134+ EncodingMask : ua .ExtensionObjectBinary ,
135+ TypeID : & ua.ExpandedNodeID {
136+ NodeID : ua .NewNumericNodeID (0 , id .EventFilter_Encoding_DefaultBinary ),
137+ },
138+ Value : filter ,
139+ }
89140
90- // start channel-based subscription
91- wg .Add (1 )
92- go startChanSub (ctx , m , * interval , 0 , wg , * nodeID )
141+ if * event {
142+ // start callback-based subscription
143+ wg .Add (1 )
144+ go startCallbackSub (ctx , m , * interval , 0 , wg , * event , & filterExtObj , * nodeID )
93145
146+ // start channel-based subscription
147+ wg .Add (1 )
148+ go startChanSub (ctx , m , * interval , 0 , wg , * event , & filterExtObj , * nodeID )
149+ } else {
150+ // start callback-based subscription
151+ wg .Add (1 )
152+ go startCallbackSub (ctx , m , * interval , 0 , wg , * event , nil , * nodeID )
153+
154+ // start channel-based subscription
155+ wg .Add (1 )
156+ go startChanSub (ctx , m , * interval , 0 , wg , * event , nil , * nodeID )
157+ }
94158 <- ctx .Done ()
95159 wg .Wait ()
96160}
97161
98- func startCallbackSub (ctx context.Context , m * monitor.NodeMonitor , interval , lag time.Duration , wg * sync.WaitGroup , nodes ... string ) {
99- sub , err := m .Subscribe (
100- ctx ,
101- & opcua.SubscriptionParameters {
162+ func startCallbackSub (ctx context.Context , m * monitor.NodeMonitor , interval , lag time.Duration , wg * sync.WaitGroup , isEvent bool , filter * ua.ExtensionObject , nodes ... string ) {
163+ fieldNames := []string {"EventId" , "EventType" , "Severity" , "Time" , "Message" }
164+
165+ args := monitor.SubscribeArgs {
166+ Params : & opcua.SubscriptionParameters {
102167 Interval : interval ,
103168 },
104- func (s * monitor.Subscription , msg * monitor.DataChangeMessage ) {
105- if msg .Error != nil {
106- log .Printf ("[callback] sub=%d error=%s" , s .SubscriptionID (), msg .Error )
107- } else {
108- log .Printf ("[callback] sub=%d ts=%s node=%s value=%v" , s .SubscriptionID (), msg .SourceTimestamp .UTC ().Format (time .RFC3339 ), msg .NodeID , msg .Value .Value ())
169+ Callback : func (s * monitor.Subscription , msg monitor.Message ) {
170+ switch v := msg .(type ) {
171+ case * monitor.DataChangeMessage :
172+ if v .Error != nil {
173+ log .Printf ("[callback] sub=%d error=%s" , s .SubscriptionID (), v .Error )
174+ } else {
175+ log .Printf ("[callback] sub=%d ts=%s node=%s value=%v" ,
176+ s .SubscriptionID (),
177+ v .SourceTimestamp .UTC ().Format (time .RFC3339 ),
178+ v .NodeID ,
179+ v .Value .Value ())
180+ }
181+ case * monitor.EventMessage :
182+ if v .Error != nil {
183+ log .Printf ("[callback] sub=%d error=%s" , s .SubscriptionID (), v .Error )
184+ } else {
185+ log .Printf ("[callback] sub=%d event details:" , s .SubscriptionID ())
186+ for i , field := range v .EventFields {
187+ if i < len (fieldNames ) {
188+ fieldName := fieldNames [i ]
189+ log .Printf (" %s: %v" , fieldName , field .Value .Value ())
190+ }
191+ }
192+ }
193+ default :
194+ log .Printf ("[callback] sub=%d unknown message type=%T" , s .SubscriptionID (), msg )
109195 }
110196 time .Sleep (lag )
111197 },
112- nodes ... )
198+ EventSub : isEvent ,
199+ Filter : filter ,
200+ Nodes : nodes ,
201+ }
113202
203+ sub , err := m .SubscribeWithArgs (ctx , args )
114204 if err != nil {
115205 log .Fatal (err )
116206 }
@@ -120,9 +210,20 @@ func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag
120210 <- ctx .Done ()
121211}
122212
123- func startChanSub (ctx context.Context , m * monitor.NodeMonitor , interval , lag time.Duration , wg * sync.WaitGroup , nodes ... string ) {
124- ch := make (chan * monitor.DataChangeMessage , 16 )
125- sub , err := m .ChanSubscribe (ctx , & opcua.SubscriptionParameters {Interval : interval }, ch , nodes ... )
213+ func startChanSub (ctx context.Context , m * monitor.NodeMonitor , interval , lag time.Duration , wg * sync.WaitGroup , isEvent bool , filter * ua.ExtensionObject , nodes ... string ) {
214+ ch := make (chan monitor.Message , 16 )
215+
216+ args := monitor.ChanSubscribeArgs {
217+ Params : & opcua.SubscriptionParameters {
218+ Interval : interval ,
219+ },
220+ Channel : ch ,
221+ EventSub : isEvent ,
222+ Filter : filter ,
223+ Nodes : nodes ,
224+ }
225+
226+ sub , err := m .ChanSubscribeWithArgs (ctx , args )
126227
127228 if err != nil {
128229 log .Fatal (err )
@@ -135,10 +236,27 @@ func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag tim
135236 case <- ctx .Done ():
136237 return
137238 case msg := <- ch :
138- if msg .Error != nil {
139- log .Printf ("[channel ] sub=%d error=%s" , sub .SubscriptionID (), msg .Error )
140- } else {
141- log .Printf ("[channel ] sub=%d ts=%s node=%s value=%v" , sub .SubscriptionID (), msg .SourceTimestamp .UTC ().Format (time .RFC3339 ), msg .NodeID , msg .Value .Value ())
239+ switch v := msg .(type ) {
240+ case * monitor.DataChangeMessage :
241+ if v .Error != nil {
242+ log .Printf ("[channel] sub=%d error=%s" , sub .SubscriptionID (), v .Error )
243+ } else {
244+ log .Printf ("[channel] sub=%d ts=%s node=%s value=%v" ,
245+ sub .SubscriptionID (),
246+ v .SourceTimestamp .UTC ().Format (time .RFC3339 ),
247+ v .NodeID ,
248+ v .Value .Value ())
249+ }
250+ case * monitor.EventMessage :
251+ if v .Error != nil {
252+ log .Printf ("[channel] sub=%d error=%s" , sub .SubscriptionID (), v .Error )
253+ } else {
254+ out := v .EventFields [0 ].Value .Value ()
255+ log .Printf ("[channel] sub=%d event fields=%v" ,
256+ sub .SubscriptionID (), out )
257+ }
258+ default :
259+ log .Printf ("[channel] sub=%d unknown message type: %T" , sub .SubscriptionID (), msg )
142260 }
143261 time .Sleep (lag )
144262 }
0 commit comments