Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ func (a bySecurityLevel) Len() int { return len(a) }
func (a bySecurityLevel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a bySecurityLevel) Less(i, j int) bool { return a[i].SecurityLevel < a[j].SecurityLevel }

type ClientInterface interface {
Browse(context.Context, *ua.BrowseRequest) (*ua.BrowseResponse, error)
BrowseNext(context.Context, *ua.BrowseNextRequest) (*ua.BrowseNextResponse, error)

Node(*ua.NodeID) *Node
NodeFromExpandedNodeID(*ua.ExpandedNodeID) *Node

Read(context.Context, *ua.ReadRequest) (*ua.ReadResponse, error)
Send(context.Context, ua.Request, func(ua.Response) error) error

ForgetSubscription(context.Context, uint32)
RegisterSubscription_NeedsSubMuxLock(*Subscription) error

RequestTimeout() time.Duration
}

// Client is a high-level client for an OPC/UA server.
// It establishes a secure channel and a session.
type Client struct {
Expand Down Expand Up @@ -989,14 +1005,14 @@ func (c *Client) sendWithTimeout(ctx context.Context, req ua.Request, timeout ti
// Node returns a node object which accesses its attributes
// through this client connection.
func (c *Client) Node(id *ua.NodeID) *Node {
return &Node{ID: id, c: c}
return NewNode(id, c)
}

// NodeFromExpandedNodeID returns a node object which accesses its attributes
// through this client connection. This is usually needed when working with node ids returned
// from browse responses by the server.
func (c *Client) NodeFromExpandedNodeID(id *ua.ExpandedNodeID) *Node {
return &Node{ID: ua.NewNodeIDFromExpandedNodeID(id), c: c}
return NewNode(ua.NewNodeIDFromExpandedNodeID(id), c)
}

// FindServers finds the servers available at an endpoint
Expand Down Expand Up @@ -1342,6 +1358,10 @@ func (c *Client) UpdateNamespaces(ctx context.Context) error {
return nil
}

func (c *Client) RequestTimeout() time.Duration {
return c.cfg.sechan.RequestTimeout
}

// safeAssign implements a type-safe assign from T to *T.
func safeAssign(t, ptrT interface{}) error {
if reflect.TypeOf(t) != reflect.TypeOf(ptrT).Elem() {
Expand Down
6 changes: 3 additions & 3 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, a
}
}

// registerSubscription_NeedsSubMuxLock registers a subscription
func (c *Client) registerSubscription_NeedsSubMuxLock(sub *Subscription) error {
// RegisterSubscription_NeedsSubMuxLock registers a subscription
func (c *Client) RegisterSubscription_NeedsSubMuxLock(sub *Subscription) error {
if sub.SubscriptionID == 0 {
return ua.StatusBadSubscriptionIDInvalid
}
Expand All @@ -225,7 +225,7 @@ func (c *Client) registerSubscription_NeedsSubMuxLock(sub *Subscription) error {
return nil
}

func (c *Client) forgetSubscription(ctx context.Context, id uint32) {
func (c *Client) ForgetSubscription(ctx context.Context, id uint32) {
c.subMux.Lock()
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
c.subMux.Unlock()
Expand Down
6 changes: 5 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ type Node struct {
// ID is the node id of the node.
ID *ua.NodeID

c *Client
c ClientInterface
}

func NewNode(id *ua.NodeID, c ClientInterface) *Node {
return &Node{ID: id, c: c}
}

func (n *Node) String() string {
Expand Down
10 changes: 5 additions & 5 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Subscription struct {
itemsMu sync.Mutex
lastSeq uint32
nextSeq uint32
c *Client
c ClientInterface
}

type SubscriptionParameters struct {
Expand Down Expand Up @@ -82,7 +82,7 @@ type PublishNotificationData struct {
// from the client and the server.
func (s *Subscription) Cancel(ctx context.Context) error {
stats.Subscription().Add("Cancel", 1)
s.c.forgetSubscription(ctx, s.SubscriptionID)
s.c.ForgetSubscription(ctx, s.SubscriptionID)
return s.delete(ctx)
}

Expand Down Expand Up @@ -321,8 +321,8 @@ func (s *Subscription) publishTimeout() time.Duration {
if timeout > uasc.MaxTimeout {
return uasc.MaxTimeout
}
if timeout < s.c.cfg.sechan.RequestTimeout {
return s.c.cfg.sechan.RequestTimeout
if requestTimeout := s.c.RequestTimeout(); timeout < requestTimeout {
return requestTimeout
}
return timeout
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func (s *Subscription) recreate_create(ctx context.Context) error {
s.lastSeq = 0
s.nextSeq = 1

if err := s.c.registerSubscription_NeedsSubMuxLock(s); err != nil {
if err := s.c.RegisterSubscription_NeedsSubMuxLock(s); err != nil {
return err
}
dlog.Printf("subscription registered")
Expand Down