Skip to content

Implement Event Subscription handling for channels#746

Open
JonathanWhiteUK wants to merge 1 commit intogopcua:mainfrom
IOTechSystems:DAN-11-branch
Open

Implement Event Subscription handling for channels#746
JonathanWhiteUK wants to merge 1 commit intogopcua:mainfrom
IOTechSystems:DAN-11-branch

Conversation

@JonathanWhiteUK
Copy link
Copy Markdown
Contributor

Hi, I have added event channel handling. This is a breaking change as it changes the signatures of some of the functions to allow for handling of both event messages and data change messages. Thanks :)

@SCZwilling
Copy link
Copy Markdown

SCZwilling commented Nov 25, 2024

Hello, I was trying to use your implementation of event subscription but i am getting error- "The attribute is not supported for the specified Node. StatusBadAttributeIDInvalid (0x80350000)". I am subscribing to an object node which has below configuration-

Attribute | Value
Nodeld | ins=2;s=Sinumerik
NamespaceIndex 2 |  
IdentifierType | String
Identifier | Sinumerik
NodeClass | Object
BrowseName | 2, "Sinumerik"
DisplayName | "en-US", "Sinumerik"
Description | "en-US", "An object with several features of Sinumerik"
EventNotifier | SubscribeToEvents

I have tested this node on UA Expert and there i am able to see the events and alarms. I believe i need to change some configuration to subscribe events on this node, could you help with it @Jwcode-uk ?

@magiconair
Copy link
Copy Markdown
Contributor

Thank you @Jwcode-uk. I can see that one function in subscription.go gets an additional parameter. I think we could use a different pattern for functions like this. Not sure if we have more than just this one. Might be worthwhile to look. Would like to keep this consistent.

Not sure if this is complex enough to use the Option pattern for this constructor. What do you think?

// Deprectated: Use NewDefaultMonitoredItemCreateRequest instead. Will be removed with 0.8.0
func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua.AttributeID, clientHandle uint32) *ua.MonitoredItemCreateRequest {
	return NewDefaultMonitoredItemCreateRequest(MICreateRequestArgs{
		NodeID:       nodeID,
		AttributeID:  attributeID,
		ClientHandle: clientHandle,
	})
}

type MonitoredItemCreateRequestArgs struct {
	NodeID       *ua.NodeID
	AttributeID  ua.AttributeID
	ClientHandle uint32
	Filter       *ua.ExtensionObject
}

func NewDefaultMonitoredItemCreateRequest(args MonitoredItemCreateRequestArgs) *ua.MonitoredItemCreateRequest {
	if args.AttributeID == 0 {
		args.AttributeID = ua.AttributeIDValue
	}
	return &ua.MonitoredItemCreateRequest{
		ItemToMonitor: &ua.ReadValueID{
			NodeID:       args.NodeID,
			AttributeID:  args.AttributeID,
			DataEncoding: &ua.QualifiedName{},
		},
		MonitoringMode: ua.MonitoringModeReporting,
		RequestedParameters: &ua.MonitoringParameters{
			ClientHandle:     args.ClientHandle,
			DiscardOldest:    true,
			Filter:           args.Filter,
			QueueSize:        10,
			SamplingInterval: 0.0,
		},
	}
}

Copy link
Copy Markdown
Contributor

@magiconair magiconair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass through the code and would like to see a change so that this implementation does not break existing code. See the comment on ChanSubscribe for a suggestion. I've also left some smaller style comments.

Some more comments would be useful since it isn't immediately obvious to me what the code does.

Lets do the refactor first - unless that isn't possible - and then I'll do another pass over the implementation and probably have a couple more questions.

dataValue := &ua.DataValue{
Value: field,
Status: ua.StatusOK,
SourceTimestamp: time.Now(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't all dataValues have the same timestamp? Isn't there a timestamp in the item (e.g. ServerTimestamp)?


// internal func to read from internal channel and write to client provided channel
func (s *Subscription) pump(ctx context.Context, notifyCh chan<- *DataChangeMessage, cb MsgHandler) {
func (s *Subscription) pump(ctx context.Context, notifyCh chan<- Message, cb MsgHandler, sub bool) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub seems unused here and the name also collides with other usages of sub in this file where it usually denotes a *Subscription


// AddNodes adds nodes defined by their string representation
func (s *Subscription) AddNodes(ctx context.Context, nodes ...string) error {
func (s *Subscription) AddNodes(ctx context.Context, eventSub bool, filter *ua.ExtensionObject, nodes ...string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does eventSub do here? Also, see comment on ChanSubscribe for a way to add this functionality without breaking the signatures.

return nil, nil
}

toAdd := make([]*ua.MonitoredItemCreateRequest, 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var toAdd []*ua.MonitoredItemCreateRequest

out.EventFields = make([]*ua.DataValue, len(item.EventFields))

for i, field := range item.EventFields {
// Create a new DataValue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove this comment as it does not have value

}
}

if notifyCh != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer switch statements over if/else, e.g.

switch {
    case notifyCh != nil:
        select { ... }

    case cb != nil:
        cb(sb, out)
        atomic.AddUint64(&s.delivered, 1)

    default:
       panic(...)
}

MonitoringMode: ua.MonitoringModeReporting,
RequestedParameters: &ua.MonitoringParameters{
ClientHandle: clientHandle,
ClientHandle: args.ClientHandle,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the other parameters be configurable as well while providing sensible defaults?

// will also cause the subscription to stop, but `Unsubscribe` must still be called.
func (m *NodeMonitor) ChanSubscribe(ctx context.Context, params *opcua.SubscriptionParameters, ch chan<- *DataChangeMessage, nodes ...string) (*Subscription, error) {
sub, err := newSubscription(ctx, m, params, 16, nodes...)
func (m *NodeMonitor) ChanSubscribe(ctx context.Context, params *opcua.SubscriptionParameters, ch chan<- Message, eventSub bool, filter *ua.ExtensionObject, nodes ...string) (*Subscription, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to think that an args parameter might be useful here. We did something similar with the WithContext versions of all functions where we dropped the WithContext suffix in a later version. So maybe we can refactor this change so that we add ChanSubscribeWithArgs(ctx context.Context, args ChanSubscribeArgs) (*Subscription, error) and drop the WithArgs in 0.8.0. Add a deprecation note to the other version. What do you think?

@magiconair magiconair added this to the v0.6.6 milestone Jan 22, 2025
@magiconair magiconair modified the milestones: v0.7.1, v0.7.2 Feb 27, 2025
@magiconair magiconair modified the milestones: v0.7.2, v0.7.4 Mar 17, 2025
@magiconair magiconair modified the milestones: v0.7.4, v0.8.0 Apr 7, 2025
@magiconair magiconair modified the milestones: v0.8.0, v0.8.1 Apr 24, 2025
@FelixTing FelixTing force-pushed the DAN-11-branch branch 3 times, most recently from 6837b18 to dad24e2 Compare July 30, 2025 02:00
@FelixTing
Copy link
Copy Markdown

@magiconair Thank you for your review. I've synced the branch with the main branch and updated the code based on your suggestions. Could you please take another look when you have a moment? Thanks!

@cloudxxx8
Copy link
Copy Markdown

this PR is ready for review again

Signed-off-by: FelixTing <felix@iotechsys.com>
@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to inactivity. It will be closed in 7 days if no further activity occurs.

@github-actions github-actions bot added the Stale label Mar 29, 2026
@cloudxxx8
Copy link
Copy Markdown

may we review this PR for the next release?

@github-actions github-actions bot removed the Stale label Mar 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants