Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions lib/devices/EtherNetIP.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {log} from "../helpers/log.js";
import {SparkplugNode} from "../sparkplugNode.js";
import {Metrics, serialisationType} from "../helpers/typeHandler.js";
import {Controller} from 'st-ethernet-ip'
import * as net from 'net';

/**
* Define structure of options for device connection
Expand All @@ -22,6 +23,7 @@ export default interface etherNetIPConnDetails {
export class EtherNetIPConnection extends DeviceConnection {
#client: Controller
#connDetails: etherNetIPConnDetails
#socket: net.Socket | null = null;

constructor(type: string, connDetails: etherNetIPConnDetails) {
super(type);
Expand Down Expand Up @@ -79,33 +81,51 @@ export class EtherNetIPConnection extends DeviceConnection {
// Double check that the payload format is correct for EtherNet/IP. We only currently support the Buffer
// payload format.
if (payloadFormat !== "Buffer") {
log("Buffer payload format is required for an EtherNet/IP connection.")
log("Buffer payload format is required for an EtherNet/IP connection.");
return;
}

// Read each metric (that has an address) from the device and for each unique address, go and get the data.
metrics.addresses.filter(e => e && e !== 'undefined').forEach((addr) => {
// Check if the socket is created and open

if (!this.#socket) {
const socketPath = './edge-agent.sock';
this.#socket = net.createConnection(socketPath);

// Handle socket errors and closing:
this.#socket.on('error', (err) => {
console.error('Socket error:', err);
close();
return;
});

process.on('exit', () => {
this.#socket?.destroy();
});
}

metrics.addresses.filter(e => e && e !== 'undefined').forEach((addr) => {
// The metric address selector for EtherNet/IP is in the format "classId,instance,attribute"
// (e.g. "3,108,4")
const splitAddress = addr.split(',');
const classId = parseInt(splitAddress[0]);
const instance = parseInt(splitAddress[1]);
const attribute = parseInt(splitAddress[2]);

this.#client.getAttributeSingle(classId, instance, attribute).then((val: Buffer) => {

let obj: any = {};
obj[addr] = val;

this.emit('data', obj);

}).catch((err: any) => {
log('Error reading metric:');
console.log(err);
});

})
this.#client.getAttributeSingle(classId, instance, attribute)
.then((val: Buffer) => {
let obj: { [key: string]: any } = {};
obj[addr] = val;

// Convert object to JSON string with appropriate encoding
const jsonData = JSON.stringify({obj, parseVals: true});

this.#socket?.write(jsonData + '\0', 'utf8');
})
.catch((err: any) => {
log('Error reading metric:');
console.log(err);
});
});
}

/**
Expand Down
68 changes: 62 additions & 6 deletions lib/translator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {Device, deviceOptions} from "./device.js";
import * as UUIDs from "./uuids.js";
import {EventEmitter} from "events";
import fs from "node:fs";
import * as net from 'net';

/**
* Translator class basically turns config file into instantiated classes
Expand Down Expand Up @@ -190,12 +191,67 @@ export class Translator extends EventEmitter {
})
});

// What to do when the device connection has new data from a device
newConn.on('data', (obj: { [index: string]: any }, parseVals = true) => {
connection.devices?.forEach((devConf: deviceOptions) => {
this.devices[devConf.deviceId]?._handleData(obj, parseVals);
})
})
const server = net.createServer((socket) => {
let buffer = '';

// Handle incoming data from the driver container
socket.on('data', (chunk) => {
buffer += chunk.toString(); // Append incoming data to buffer

// Check for complete messages ending with a newline
const messages = buffer.split('\0');


// Iterate through complete messages (excluding the last potentially incomplete one)
messages.slice(0, messages.length - 1).forEach((message) => {
if (message) {
try {
// Parse JSON string to object, assuming UTF-8 encoding
let obj;
try {
obj = JSON.parse(message);
} catch (err) {
console.error('Error parsing JSON. Current buffer:', message);
return;
}

const data = obj.obj;
const parseVals = obj.parseVals;
connection.devices?.forEach((devConf: deviceOptions) => {

// Go through each key in data and if it has a type of "Buffer" then replace the
// value with a Buffer object containing obj.data
Object.keys(data).forEach((key) => {
if (typeof data[key] === 'object' && data[key].type === 'Buffer') {
data[key] = Buffer.from(data[key].data);
}
});

this.devices[devConf.deviceId]?._handleData(data, parseVals); // Assuming _handleData exists
});
} catch (err) {
console.log(err);
}
}
});

// Update buffer to keep any remaining incomplete message for next data chunk
buffer = messages[messages.length - 1] || ''; // Keep the last potentially incomplete message
});
});

// Delete the socket file if it exists
server.on('error', (err) => {
// @ts-ignore
if (err.code === 'EADDRINUSE') {
fs.unlinkSync('./edge-agent.sock');
server.listen('./edge-agent.sock');
}
});

server.listen('./edge-agent.sock', () => {
console.log('Core container listening on Unix socket');
});

// What to do when device connection dies
newConn.on('close', () => {
Expand Down