Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions lib/opc_ua/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ defmodule OpcUA.Common do
end
end

@doc """
Batch reads 'value' attribute of multiple nodes in a single OPC-UA request.
Input: list of {%NodeId{}, index} tuples.
Returns {:ok, [{:ok, value} | {:error, reason}]} or {:error, reason}.
"""
@spec read_node_values(GenServer.server(), [{%NodeId{}, integer()}]) ::
{:ok, list()} | {:error, binary()}
def read_node_values(pid, node_ids_with_index) when is_list(node_ids_with_index) do
if(@mix_env != :test) do
GenServer.call(pid, {:read, {:values, node_ids_with_index}})
else
GenServer.call(pid, {:read, {:values, node_ids_with_index}}, :infinity)
end
end

@doc """
Reads 'value' attribute of a node in the server.
Note: If the value is an array you can search a scalar using `index` parameter.
Expand Down Expand Up @@ -639,6 +654,12 @@ defmodule OpcUA.Common do
{:noreply, state}
end

def handle_call({:read, {:values, node_ids_with_index}}, caller_info, state) do
c_args = Enum.map(node_ids_with_index, fn {node_id, index} -> {to_c(node_id), index} end)
call_port(state, :read_node_values, caller_info, c_args)
{:noreply, state}
end

def handle_call({:read, {:value_by_index, {node_id, index}}}, caller_info, state) do
c_args = {to_c(node_id), index}
call_port(state, :read_node_value_by_index, caller_info, c_args)
Expand Down Expand Up @@ -852,6 +873,20 @@ defmodule OpcUA.Common do
state
end

defp handle_c_response({:read_node_values, caller_metadata, {:ok, results}}, state) do
parsed = Enum.map(results, fn
{:ok, value} -> {:ok, parse_batch_value(value)}
{:error, _} = error -> error
end)
GenServer.reply(caller_metadata, {:ok, parsed})
state
end

defp handle_c_response({:read_node_values, caller_metadata, {:error, _} = error}, state) do
GenServer.reply(caller_metadata, error)
state
end

defp handle_c_response({:read_node_value_by_index, caller_metadata, value_response}, state) do
response = parse_value(value_response)
GenServer.reply(caller_metadata, response)
Expand Down Expand Up @@ -969,6 +1004,11 @@ defmodule OpcUA.Common do

defp parse_value(response), do: response

# Parse a single value from batch read response.
# The C layer encodes variant values directly (scalars, arrays, :nil atom).
defp parse_batch_value(:nil), do: nil
defp parse_batch_value(value), do: value

defp parse_c_value({ns_index, type, name, name_space_uri, server_index}),
do:
ExpandedNodeId.new(
Expand Down
97 changes: 96 additions & 1 deletion src/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -3932,7 +3932,102 @@ void handle_read_node_value(void *entity, bool entity_type, const char *req, int
UA_Variant_delete(value);
}

/*
/*
* Batch read 'value' attribute of multiple nodes using UA_Client_Service_read.
* Input: list of {node_id_tuple, index} tuples
* Output: {:ok, [{:ok, value} | {:error, reason}]} | {:error, reason}
*/
void handle_read_node_values(void *entity, bool entity_type, const char *req, int *req_index)
{
int list_count;
if(ei_decode_list_header(req, req_index, &list_count) < 0)
errx(EXIT_FAILURE, ":handle_read_node_values requires a list");

int node_count = list_count;

if(node_count == 0 || node_count > 200) {
send_error_response("einval");
return;
}

// Only client batch read is supported
if(!entity_type) {
// Skip remaining decode
send_error_response("not_supported");
return;
}

UA_ReadValueId *nodesToRead = (UA_ReadValueId *)UA_Array_new(
node_count, &UA_TYPES[UA_TYPES_READVALUEID]);

for(int i = 0; i < node_count; i++) {
int term_size;
if(ei_decode_tuple_header(req, req_index, &term_size) < 0 || term_size != 2)
errx(EXIT_FAILURE, "read_node_values: each element must be a 2-tuple");

nodesToRead[i].nodeId = assemble_node_id(req, req_index);
nodesToRead[i].attributeId = UA_ATTRIBUTEID_VALUE;
nodesToRead[i].indexRange = UA_STRING_NULL;

unsigned long data_index;
ei_decode_ulong(req, req_index, &data_index);
}

// Decode list tail
ei_decode_list_header(req, req_index, &list_count);

UA_ReadRequest readRequest;
UA_ReadRequest_init(&readRequest);
readRequest.nodesToRead = nodesToRead;
readRequest.nodesToReadSize = node_count;

UA_ReadResponse readResponse = UA_Client_Service_read((UA_Client *)entity, readRequest);

// Use dynamic buffer for batch response (max 64KB, within uint16_t limit)
size_t resp_buf_size = ERLCMD_BUF_SIZE * 2;
char *resp = (char *)malloc(resp_buf_size);
int resp_index = sizeof(uint16_t);
resp[resp_index++] = response_id;
ei_encode_version(resp, &resp_index);
ei_encode_tuple_header(resp, &resp_index, 3);
encode_caller_metadata(resp, &resp_index);

if(readResponse.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
ei_encode_tuple_header(resp, &resp_index, 2);
ei_encode_atom(resp, &resp_index, "error");
const char *status = UA_StatusCode_name(readResponse.responseHeader.serviceResult);
ei_encode_binary(resp, &resp_index, status, strlen(status));
} else {
ei_encode_tuple_header(resp, &resp_index, 2);
ei_encode_atom(resp, &resp_index, "ok");

int result_count = (int)readResponse.resultsSize;
ei_encode_list_header(resp, &resp_index, result_count);

for(int i = 0; i < result_count; i++) {
UA_DataValue *dv = &readResponse.results[i];
if(dv->status == UA_STATUSCODE_GOOD && dv->hasValue) {
ei_encode_tuple_header(resp, &resp_index, 2);
ei_encode_atom(resp, &resp_index, "ok");
encode_variant_struct(resp, &resp_index, &dv->value);
} else {
ei_encode_tuple_header(resp, &resp_index, 2);
ei_encode_atom(resp, &resp_index, "error");
const char *status = UA_StatusCode_name(dv->status);
ei_encode_binary(resp, &resp_index, status, strlen(status));
}
}
ei_encode_empty_list(resp, &resp_index);
}

erlcmd_send(resp, resp_index);

free(resp);
UA_ReadResponse_clear(&readResponse);
UA_Array_delete(nodesToRead, node_count, &UA_TYPES[UA_TYPES_READVALUEID]);
}

/*
* Read 'value' of a node in the server.
*/
void handle_read_node_value_by_index(void *entity, bool entity_type, const char *req, int *req_index)
Expand Down
3 changes: 2 additions & 1 deletion src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,5 @@ void handle_read_node_executable(void *entity, bool entity_type, const char *req
void handle_read_node_event_notifier(void *entity, bool entity_type, const char *req, int *req_index);
void handle_read_node_value(void *entity, bool entity_type, const char *req, int *req_index);
void handle_read_node_value_by_index(void *entity, bool entity_type, const char *req, int *req_index);
void handle_read_node_value_by_data_type(void *entity, bool entity_type, const char *req, int *req_index);
void handle_read_node_value_by_data_type(void *entity, bool entity_type, const char *req, int *req_index);
void handle_read_node_values(void *entity, bool entity_type, const char *req, int *req_index);
1 change: 1 addition & 0 deletions src/opc_ua_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ static struct request_handler request_handlers[] = {
// TODO: Add UA_Server_writeArrayDimensions, inverse name (read)
{"write_node_value", handle_write_node_value},
{"read_node_value", handle_read_node_value},
{"read_node_values", handle_read_node_values},
{"read_node_value_by_index", handle_read_node_value_by_index},
{"read_node_value_by_data_type", handle_read_node_value_by_data_type},
{"write_node_node_id", handle_write_node_node_id},
Expand Down
122 changes: 122 additions & 0 deletions test/client_tests/batch_read_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule ClientBatchReadTest do
use ExUnit.Case, async: false

alias OpcUA.{NodeId, Server, QualifiedName, Client}

setup do
{:ok, s_pid} = Server.start_link()
:ok = Server.set_default_config(s_pid)
:ok = Server.set_port(s_pid, 4010)

{:ok, ns_index} = Server.add_namespace(s_pid, "BatchTest")

parent_id = NodeId.new(ns_index: ns_index, identifier_type: "string", identifier: "BatchParent")

:ok =
Server.add_object_node(s_pid,
requested_new_node_id: parent_id,
parent_node_id: NodeId.new(ns_index: 0, identifier_type: "integer", identifier: 85),
reference_type_node_id:
NodeId.new(ns_index: 0, identifier_type: "integer", identifier: 35),
browse_name: QualifiedName.new(ns_index: ns_index, name: "BatchParent"),
type_definition: NodeId.new(ns_index: 0, identifier_type: "integer", identifier: 58)
)

node_ids =
for i <- 1..5 do
node_id =
NodeId.new(
ns_index: ns_index,
identifier_type: "string",
identifier: "Var_#{i}"
)

:ok =
Server.add_variable_node(s_pid,
requested_new_node_id: node_id,
parent_node_id: parent_id,
reference_type_node_id:
NodeId.new(ns_index: 0, identifier_type: "integer", identifier: 47),
browse_name: QualifiedName.new(ns_index: ns_index, name: "Var #{i}"),
type_definition: NodeId.new(ns_index: 0, identifier_type: "integer", identifier: 63)
)

:ok = Server.write_node_access_level(s_pid, node_id, 3)
node_id
end

:ok = Server.start(s_pid)

{:ok, c_pid} = Client.start_link()
:ok = Client.set_config(c_pid)
:ok = Client.connect_by_url(c_pid, url: "opc.tcp://localhost:4010/")

%{c_pid: c_pid, s_pid: s_pid, ns_index: ns_index, node_ids: node_ids}
end

test "batch read multiple values", %{c_pid: c_pid, node_ids: node_ids} do
Enum.with_index(node_ids, fn node_id, i ->
:ok = Client.write_node_value(c_pid, node_id, 10, (i + 1) * 10.0)
end)

batch = Enum.map(node_ids, fn node_id -> {node_id, 0} end)
{:ok, results} = Client.read_node_values(c_pid, batch)

assert length(results) == 5
assert Enum.at(results, 0) == {:ok, 10.0}
assert Enum.at(results, 1) == {:ok, 20.0}
assert Enum.at(results, 2) == {:ok, 30.0}
assert Enum.at(results, 3) == {:ok, 40.0}
assert Enum.at(results, 4) == {:ok, 50.0}
end

test "batch read with nil values (unset)", %{c_pid: c_pid, node_ids: node_ids} do
batch = Enum.map(node_ids, fn node_id -> {node_id, 0} end)
{:ok, results} = Client.read_node_values(c_pid, batch)

assert length(results) == 5
assert Enum.all?(results, fn r -> r == {:ok, nil} end)
end

test "batch read with nonexistent node returns per-node error", %{
c_pid: c_pid,
ns_index: ns_index,
node_ids: node_ids
} do
# Write a value to the first node so we can verify it reads ok
:ok = Client.write_node_value(c_pid, Enum.at(node_ids, 0), 6, 42)

bad_node =
NodeId.new(ns_index: ns_index, identifier_type: "string", identifier: "NonExistent")

batch = [{Enum.at(node_ids, 0), 0}, {bad_node, 0}]

{:ok, results} = Client.read_node_values(c_pid, batch)
assert length(results) == 2
assert {:ok, _} = Enum.at(results, 0)
assert {:error, _} = Enum.at(results, 1)
end

test "batch read single node", %{c_pid: c_pid, node_ids: node_ids} do
node_id = Enum.at(node_ids, 0)
:ok = Client.write_node_value(c_pid, node_id, 10, 42.0)

{:ok, results} = Client.read_node_values(c_pid, [{node_id, 0}])
assert results == [{:ok, 42.0}]
end

test "batch read mixed data types", %{c_pid: c_pid, node_ids: node_ids} do
:ok = Client.write_node_value(c_pid, Enum.at(node_ids, 0), 0, true)
:ok = Client.write_node_value(c_pid, Enum.at(node_ids, 1), 6, 42)
:ok = Client.write_node_value(c_pid, Enum.at(node_ids, 2), 10, 3.14)
:ok = Client.write_node_value(c_pid, Enum.at(node_ids, 3), 11, "hello")

batch = Enum.map(Enum.take(node_ids, 4), fn node_id -> {node_id, 0} end)
{:ok, results} = Client.read_node_values(c_pid, batch)

assert {:ok, true} = Enum.at(results, 0)
assert {:ok, 42} = Enum.at(results, 1)
assert {:ok, 3.14} = Enum.at(results, 2)
assert {:ok, "hello"} = Enum.at(results, 3)
end
end