Skip to content

Commit dad24e2

Browse files
committed
Implement Event Subscription handling for channels
Signed-off-by: FelixTing <felix@iotechsys.com>
1 parent 689af73 commit dad24e2

File tree

7 files changed

+441
-67
lines changed

7 files changed

+441
-67
lines changed

client_sub.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ func (c *Client) notifySubscription(ctx context.Context, sub *Subscription, noti
312312
sub.notify(ctx, &PublishNotificationData{
313313
SubscriptionID: sub.SubscriptionID,
314314
Value: data.Value,
315+
PublishTime: notif.PublishTime,
315316
})
316317

317318
// Error

examples/monitor/monitor.go

Lines changed: 141 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/gopcua/opcua"
1313
"github.com/gopcua/opcua/debug"
14+
"github.com/gopcua/opcua/id"
1415
"github.com/gopcua/opcua/monitor"
1516
"github.com/gopcua/opcua/ua"
1617
)
@@ -24,6 +25,7 @@ func main() {
2425
keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None")
2526
nodeID = flag.String("node", "", "node id to subscribe to")
2627
interval = flag.Duration("interval", opcua.DefaultSubscriptionInterval, "subscription interval")
28+
event = flag.Bool("event", false, "are you subscribing to events")
2729
)
2830
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging")
2931
flag.Parse()
@@ -83,34 +85,122 @@ func main() {
8385
})
8486
wg := &sync.WaitGroup{}
8587

86-
// start callback-based subscription
87-
wg.Add(1)
88-
go startCallbackSub(ctx, m, *interval, 0, wg, *nodeID)
88+
fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"}
89+
selects := make([]*ua.SimpleAttributeOperand, len(fieldNames))
90+
for i, name := range fieldNames {
91+
selects[i] = &ua.SimpleAttributeOperand{
92+
TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType),
93+
BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}},
94+
AttributeID: ua.AttributeIDValue,
95+
}
96+
}
97+
98+
wheres := &ua.ContentFilter{
99+
Elements: []*ua.ContentFilterElement{
100+
{
101+
FilterOperator: ua.FilterOperatorGreaterThanOrEqual,
102+
FilterOperands: []*ua.ExtensionObject{
103+
{
104+
EncodingMask: 1,
105+
TypeID: &ua.ExpandedNodeID{
106+
NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary),
107+
},
108+
Value: ua.SimpleAttributeOperand{
109+
TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType),
110+
BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: "Severity"}},
111+
AttributeID: ua.AttributeIDValue,
112+
},
113+
},
114+
{
115+
EncodingMask: 1,
116+
TypeID: &ua.ExpandedNodeID{
117+
NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary),
118+
},
119+
Value: ua.LiteralOperand{
120+
Value: ua.MustVariant(uint16(0)),
121+
},
122+
},
123+
},
124+
},
125+
},
126+
}
127+
128+
filter := ua.EventFilter{
129+
SelectClauses: selects,
130+
WhereClause: wheres,
131+
}
132+
133+
filterExtObj := ua.ExtensionObject{
134+
EncodingMask: ua.ExtensionObjectBinary,
135+
TypeID: &ua.ExpandedNodeID{
136+
NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary),
137+
},
138+
Value: filter,
139+
}
89140

90-
// start channel-based subscription
91-
wg.Add(1)
92-
go startChanSub(ctx, m, *interval, 0, wg, *nodeID)
141+
if *event {
142+
// start callback-based subscription
143+
wg.Add(1)
144+
go startCallbackSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID)
93145

146+
// start channel-based subscription
147+
wg.Add(1)
148+
go startChanSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID)
149+
} else {
150+
// start callback-based subscription
151+
wg.Add(1)
152+
go startCallbackSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID)
153+
154+
// start channel-based subscription
155+
wg.Add(1)
156+
go startChanSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID)
157+
}
94158
<-ctx.Done()
95159
wg.Wait()
96160
}
97161

98-
func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
99-
sub, err := m.Subscribe(
100-
ctx,
101-
&opcua.SubscriptionParameters{
162+
func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) {
163+
fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"}
164+
165+
args := monitor.SubscribeArgs{
166+
Params: &opcua.SubscriptionParameters{
102167
Interval: interval,
103168
},
104-
func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
105-
if msg.Error != nil {
106-
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), msg.Error)
107-
} else {
108-
log.Printf("[callback] sub=%d ts=%s node=%s value=%v", s.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value())
169+
Callback: func(s *monitor.Subscription, msg monitor.Message) {
170+
switch v := msg.(type) {
171+
case *monitor.DataChangeMessage:
172+
if v.Error != nil {
173+
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error)
174+
} else {
175+
log.Printf("[callback] sub=%d ts=%s node=%s value=%v",
176+
s.SubscriptionID(),
177+
v.SourceTimestamp.UTC().Format(time.RFC3339),
178+
v.NodeID,
179+
v.Value.Value())
180+
}
181+
case *monitor.EventMessage:
182+
if v.Error != nil {
183+
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error)
184+
} else {
185+
log.Printf("[callback] sub=%d event details:", s.SubscriptionID())
186+
for i, field := range v.EventFields {
187+
if i < len(fieldNames) {
188+
fieldName := fieldNames[i]
189+
log.Printf(" %s: %v", fieldName, field.Value.Value())
190+
}
191+
}
192+
}
193+
default:
194+
log.Printf("[callback] sub=%d unknown message type=%T", s.SubscriptionID(), msg)
109195
}
110196
time.Sleep(lag)
111197
},
112-
nodes...)
198+
EventSub: isEvent,
199+
Filter: filter,
200+
Nodes: nodes,
201+
}
113202

203+
sub, err := m.SubscribeWithArgs(ctx, args)
114204
if err != nil {
115205
log.Fatal(err)
116206
}
@@ -120,9 +210,20 @@ func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag
120210
<-ctx.Done()
121211
}
122212

123-
func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
124-
ch := make(chan *monitor.DataChangeMessage, 16)
125-
sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, nodes...)
213+
func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) {
214+
ch := make(chan monitor.Message, 16)
215+
216+
args := monitor.ChanSubscribeArgs{
217+
Params: &opcua.SubscriptionParameters{
218+
Interval: interval,
219+
},
220+
Channel: ch,
221+
EventSub: isEvent,
222+
Filter: filter,
223+
Nodes: nodes,
224+
}
225+
226+
sub, err := m.ChanSubscribeWithArgs(ctx, args)
126227

127228
if err != nil {
128229
log.Fatal(err)
@@ -135,10 +236,27 @@ func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag tim
135236
case <-ctx.Done():
136237
return
137238
case msg := <-ch:
138-
if msg.Error != nil {
139-
log.Printf("[channel ] sub=%d error=%s", sub.SubscriptionID(), msg.Error)
140-
} else {
141-
log.Printf("[channel ] sub=%d ts=%s node=%s value=%v", sub.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value())
239+
switch v := msg.(type) {
240+
case *monitor.DataChangeMessage:
241+
if v.Error != nil {
242+
log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error)
243+
} else {
244+
log.Printf("[channel] sub=%d ts=%s node=%s value=%v",
245+
sub.SubscriptionID(),
246+
v.SourceTimestamp.UTC().Format(time.RFC3339),
247+
v.NodeID,
248+
v.Value.Value())
249+
}
250+
case *monitor.EventMessage:
251+
if v.Error != nil {
252+
log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error)
253+
} else {
254+
out := v.EventFields[0].Value.Value()
255+
log.Printf("[channel] sub=%d event fields=%v",
256+
sub.SubscriptionID(), out)
257+
}
258+
default:
259+
log.Printf("[channel] sub=%d unknown message type: %T", sub.SubscriptionID(), msg)
142260
}
143261
time.Sleep(lag)
144262
}

examples/subscribe/subscribe.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ func main() {
141141

142142
func valueRequest(nodeID *ua.NodeID) *ua.MonitoredItemCreateRequest {
143143
handle := uint32(42)
144-
return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle)
144+
return opcua.NewDefaultMonitoredItemCreateRequest(opcua.MonitoredItemCreateRequestArgs{
145+
NodeID: nodeID,
146+
AttributeID: ua.AttributeIDValue,
147+
ClientHandle: handle,
148+
})
145149
}
146150

147151
func eventRequest(nodeID *ua.NodeID) (*ua.MonitoredItemCreateRequest, []string) {

examples/trigger/trigger.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ func main() {
9090
}
9191

9292
miCreateRequests := []*ua.MonitoredItemCreateRequest{
93-
opcua.NewMonitoredItemCreateRequestWithDefaults(triggeringNode, ua.AttributeIDValue, 42),
93+
opcua.NewDefaultMonitoredItemCreateRequest(opcua.MonitoredItemCreateRequestArgs{
94+
NodeID: triggeringNode,
95+
AttributeID: ua.AttributeIDValue,
96+
ClientHandle: 42,
97+
}),
9498
{
9599
ItemToMonitor: &ua.ReadValueID{
96100
NodeID: triggeredNode,

0 commit comments

Comments
 (0)