-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhep-server.js
More file actions
299 lines (255 loc) · 7.65 KB
/
hep-server.js
File metadata and controls
299 lines (255 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
/**
* HEP to InfluxDB Server
*
* This server listens for HEP packets, converts them to InfluxDB Line Protocol format,
* and forwards them to an InfluxDB instance.
*/
import HepToLineProtocolConverter from './hep-proto.js';
import hepjs from 'hep-js';
import axios from 'axios';
import fs from 'fs';
import path from 'path';
class HepToInfluxDBServer {
constructor(config = {}) {
this.config = {
hepPort: config.hepPort || process.env.PORT || 9060,
hepBindAddress: config.hepBindAddress || process.env.HOST || '0.0.0.0',
influxDbUrl: config.influxDbUrl || process.env.INFLUX_DBURL || 'http://localhost:7971',
influxDbDatabase: config.influxDbDatabase || process.env.INFLUXB_DBNAME || 'hep',
batchSize: config.batchSize || process.env.BATCH_SIZE || 1000,
flushInterval: config.flushInterval || process.env.FLUSH_INTERVAL || 5000, // ms
maxBufferSize: config.maxBufferSize || process.env.MAX_BUFFER || 10000,
debug: config.debug || false,
writeToFile: config.writeToFile || false,
outputDir: config.outputDir || './data'
};
this.buffer = [];
this.lastFlushTime = Date.now();
this.converter = new HepToLineProtocolConverter();
this.converter.setDebug(this.config.debug);
// Statistics
this.stats = {
packetsReceived: 0,
packetsConverted: 0,
batchesSent: 0,
conversionErrors: 0,
sendErrors: 0
};
}
/**
* Initialize the server
*/
async initialize() {
try {
// Ensure output directory exists if writing to file
if (this.config.writeToFile) {
await fs.promises.mkdir(this.config.outputDir, { recursive: true });
}
// Start the server
await this.startServer();
// Set up the flush interval
this.flushIntervalId = setInterval(() => {
this.conditionalFlush();
}, this.config.flushInterval);
// Register signal handlers for graceful shutdown
process.on('SIGTERM', this.shutdown.bind(this));
process.on('SIGINT', this.shutdown.bind(this));
console.log(`HEP to InfluxDB Server initialized with config:`, this.config);
// Return this for chaining
return this;
} catch (error) {
console.error('Failed to initialize HEP to InfluxDB Server:', error);
throw error;
}
}
/**
* Start the HEP server
*/
async startServer() {
const host = this.config.hepBindAddress;
const port = this.config.hepPort;
try {
// Create UDP Server
this.udpServer = Bun.udpSocket({
hostname: host,
port: port,
udp: true,
socket: {
data: (socket, data) => this.handleData(data, socket),
error: (socket, error) => console.error('UDP error:', error),
}
});
// Create TCP Server
this.tcpServer = Bun.listen({
hostname: host,
port: port,
socket: {
data: (socket, data) => this.handleData(data, socket),
error: (socket, error) => console.error('TCP error:', error),
}
});
console.log(`HEP Server listening on ${host}:${port} (TCP/UDP)`);
} catch (error) {
console.error(`Failed to start server:`, error);
throw error;
}
}
/**
* Handle incoming HEP data
* @param {Buffer} data - Raw HEP packet data
* @param {*} socket - Socket reference
*/
handleData(data, socket) {
try {
this.stats.packetsReceived++;
// Convert the HEP packet to Line Protocol
const lineProtocol = this.converter.convertPacket(data);
if (lineProtocol) {
this.stats.packetsConverted++;
// Add to buffer
this.buffer.push(lineProtocol);
// Flush if buffer size threshold is reached
if (this.buffer.length >= this.config.batchSize) {
this.flush();
}
}
} catch (error) {
this.stats.conversionErrors++;
if (this.config.debug) {
console.error('Error handling HEP data:', error);
}
}
}
/**
* Flush buffer if conditions are met
*/
conditionalFlush() {
const now = Date.now();
const timeSinceLastFlush = now - this.lastFlushTime;
// Flush if there are items in buffer and interval has passed
if (this.buffer.length > 0 && timeSinceLastFlush >= this.config.flushInterval) {
this.flush();
}
}
/**
* Flush buffer to InfluxDB
*/
async flush() {
if (this.buffer.length === 0) return;
const data = this.buffer.join('\n');
this.buffer = [];
this.lastFlushTime = Date.now();
try {
// Write to InfluxDB
if (!this.config.writeToFile) {
await this.sendToInfluxDB(data);
}
// Write to file if configured
if (this.config.writeToFile) {
await this.writeToFile(data);
}
this.stats.batchesSent++;
if (this.config.debug) {
console.log(`Flushed ${data.split('\n').length} records`);
}
} catch (error) {
this.stats.sendErrors++;
console.error('Error flushing data:', error);
}
}
/**
* Send data to InfluxDB
* @param {string} data - Line Protocol formatted data
*/
async sendToInfluxDB(data) {
const url = `${this.config.influxDbUrl}/write?db=${this.config.influxDbDatabase}`;
try {
const response = await axios.post(url, data, {
headers: {
'Content-Type': 'text/plain'
}
});
if (response.status !== 204) {
console.warn(`InfluxDB returned unexpected status: ${response.status}`);
}
} catch (error) {
console.error(`InfluxDB write error: ${error.message}`);
throw error;
}
}
/**
* Write data to file
* @param {string} data - Line Protocol formatted data
*/
async writeToFile(data) {
try {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const filePath = path.join(this.config.outputDir, `hep_${timestamp}.lp`);
await fs.promises.writeFile(filePath, data);
if (this.config.debug) {
console.log(`Wrote data to file: ${filePath}`);
}
} catch (error) {
console.error(`File write error: ${error.message}`);
throw error;
}
}
/**
* Get server statistics
* @returns {Object} Server statistics
*/
getStats() {
return {
...this.stats,
bufferSize: this.buffer.length,
uptime: process.uptime(),
timestamp: new Date().toISOString()
};
}
/**
* Shutdown the server
*/
async shutdown() {
console.log('Shutting down HEP to InfluxDB server...');
// Stop interval
if (this.flushIntervalId) {
clearInterval(this.flushIntervalId);
}
// Flush remaining data
await this.flush();
// Stop TCP server
if (this.tcpServer) {
try {
this.tcpServer.stop(true);
this.tcpServer.unref();
} catch (error) {
console.error('Error stopping TCP server:', error);
}
}
// Stop UDP server
if (this.udpServer) {
try {
// UDP sockets use close() not stop()
if (this.udpServer.close) this.udpServer.close();
} catch (error) {
console.error('Error stopping UDP server:', error);
}
}
console.log('Server shutdown complete');
// Final stats
console.log('Final statistics:', this.getStats());
process.exit(0);
}
}
// Example usage
if (require.main === module) {
const server = new HepToInfluxDBServer({
debug: true,
writeToFile: false
});
server.initialize().catch(error => {
console.error('Failed to start server:', error);
process.exit(1);
});
}
export default HepToInfluxDBServer;