diff --git a/lib/devices/EtherNetIP.ts b/lib/devices/EtherNetIP.ts index a3fb9f7..885ef29 100644 --- a/lib/devices/EtherNetIP.ts +++ b/lib/devices/EtherNetIP.ts @@ -8,6 +8,7 @@ import {log} from "../helpers/log.js"; import {SparkplugNode} from "../sparkplugNode.js"; import {Metrics, serialisationType} from "../helpers/typeHandler.js"; import {Controller} from 'st-ethernet-ip' +import * as net from 'net'; /** * Define structure of options for device connection @@ -22,6 +23,7 @@ export default interface etherNetIPConnDetails { export class EtherNetIPConnection extends DeviceConnection { #client: Controller #connDetails: etherNetIPConnDetails + #socket: net.Socket | null = null; constructor(type: string, connDetails: etherNetIPConnDetails) { super(type); @@ -79,13 +81,29 @@ export class EtherNetIPConnection extends DeviceConnection { // Double check that the payload format is correct for EtherNet/IP. We only currently support the Buffer // payload format. if (payloadFormat !== "Buffer") { - log("Buffer payload format is required for an EtherNet/IP connection.") + log("Buffer payload format is required for an EtherNet/IP connection."); return; } - // Read each metric (that has an address) from the device and for each unique address, go and get the data. - metrics.addresses.filter(e => e && e !== 'undefined').forEach((addr) => { + // Check if the socket is created and open + + if (!this.#socket) { + const socketPath = './edge-agent.sock'; + this.#socket = net.createConnection(socketPath); + + // Handle socket errors and closing: + this.#socket.on('error', (err) => { + console.error('Socket error:', err); + close(); + return; + }); + + process.on('exit', () => { + this.#socket?.destroy(); + }); + } + metrics.addresses.filter(e => e && e !== 'undefined').forEach((addr) => { // The metric address selector for EtherNet/IP is in the format "classId,instance,attribute" // (e.g. "3,108,4") const splitAddress = addr.split(','); @@ -93,19 +111,21 @@ export class EtherNetIPConnection extends DeviceConnection { const instance = parseInt(splitAddress[1]); const attribute = parseInt(splitAddress[2]); - this.#client.getAttributeSingle(classId, instance, attribute).then((val: Buffer) => { - - let obj: any = {}; - obj[addr] = val; - - this.emit('data', obj); - - }).catch((err: any) => { - log('Error reading metric:'); - console.log(err); - }); - - }) + this.#client.getAttributeSingle(classId, instance, attribute) + .then((val: Buffer) => { + let obj: { [key: string]: any } = {}; + obj[addr] = val; + + // Convert object to JSON string with appropriate encoding + const jsonData = JSON.stringify({obj, parseVals: true}); + + this.#socket?.write(jsonData + '\0', 'utf8'); + }) + .catch((err: any) => { + log('Error reading metric:'); + console.log(err); + }); + }); } /** diff --git a/lib/translator.ts b/lib/translator.ts index 4e4d1fd..7b9170b 100644 --- a/lib/translator.ts +++ b/lib/translator.ts @@ -31,6 +31,7 @@ import {Device, deviceOptions} from "./device.js"; import * as UUIDs from "./uuids.js"; import {EventEmitter} from "events"; import fs from "node:fs"; +import * as net from 'net'; /** * Translator class basically turns config file into instantiated classes @@ -190,12 +191,67 @@ export class Translator extends EventEmitter { }) }); - // What to do when the device connection has new data from a device - newConn.on('data', (obj: { [index: string]: any }, parseVals = true) => { - connection.devices?.forEach((devConf: deviceOptions) => { - this.devices[devConf.deviceId]?._handleData(obj, parseVals); - }) - }) + const server = net.createServer((socket) => { + let buffer = ''; + + // Handle incoming data from the driver container + socket.on('data', (chunk) => { + buffer += chunk.toString(); // Append incoming data to buffer + + // Check for complete messages ending with a newline + const messages = buffer.split('\0'); + + + // Iterate through complete messages (excluding the last potentially incomplete one) + messages.slice(0, messages.length - 1).forEach((message) => { + if (message) { + try { + // Parse JSON string to object, assuming UTF-8 encoding + let obj; + try { + obj = JSON.parse(message); + } catch (err) { + console.error('Error parsing JSON. Current buffer:', message); + return; + } + + const data = obj.obj; + const parseVals = obj.parseVals; + connection.devices?.forEach((devConf: deviceOptions) => { + + // Go through each key in data and if it has a type of "Buffer" then replace the + // value with a Buffer object containing obj.data + Object.keys(data).forEach((key) => { + if (typeof data[key] === 'object' && data[key].type === 'Buffer') { + data[key] = Buffer.from(data[key].data); + } + }); + + this.devices[devConf.deviceId]?._handleData(data, parseVals); // Assuming _handleData exists + }); + } catch (err) { + console.log(err); + } + } + }); + + // Update buffer to keep any remaining incomplete message for next data chunk + buffer = messages[messages.length - 1] || ''; // Keep the last potentially incomplete message + }); + }); + + // Delete the socket file if it exists + server.on('error', (err) => { + // @ts-ignore + if (err.code === 'EADDRINUSE') { + fs.unlinkSync('./edge-agent.sock'); + server.listen('./edge-agent.sock'); + } + }); + + server.listen('./edge-agent.sock', () => { + console.log('Core container listening on Unix socket'); + }); // What to do when device connection dies newConn.on('close', () => {