Skip to content

Commit ac9ea2b

Browse files
author
David Gómez Matarrodona
committed
SQL Server Input
1 parent 51d497d commit ac9ea2b

File tree

4 files changed

+1491
-43
lines changed

4 files changed

+1491
-43
lines changed

lib/config/inputs.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const INPUTS = {
1919
redis : require("../input/redis"),
2020
amqp : require("../input/amqp"),
2121
mongo : require("../input/mongo"),
22+
sqlserver : require("../input/sqlserver"),
2223
//zmq : require("../input/zmq"),
2324
ws : require("../input/ws"),
2425
elastic : require("../input/elastic")

lib/input/sqlserver.js

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
const
2+
logger = require('../logger'),
3+
Input = require('.'),
4+
extend = require('extend'),
5+
jsexpr = require('jsexpr'),
6+
Semaphore = require('../semaphore'),
7+
Watermark = require("../watermark"),
8+
{timer} = require('../util'),
9+
Queue = require('../queue'),
10+
sql = require('mssql/msnodesqlv8');
11+
12+
const IVAL_WM = 2000;
13+
const MAX_BUFF = 5000;
14+
const DEFAULTS = {
15+
query : 'select 1 as number',
16+
mode : 'watermark',
17+
options : {
18+
server: 'localhost',
19+
pool: {
20+
max: 10,
21+
min: 0,
22+
idleTimeoutMillis: 30000
23+
},
24+
options: {
25+
encrypt: false, // for azure
26+
trustServerCertificate: true // change to true for local dev / self-signed certs
27+
}
28+
}
29+
};
30+
31+
class SQLServerInput extends Input {
32+
constructor(id,type) {
33+
super(id,type);
34+
35+
this.wmival = null;
36+
this.connected = null;
37+
this.queue = new Queue(MAX_BUFF);
38+
this.cwm = null;
39+
this.req = null;
40+
}
41+
42+
async configure(config,callback) {
43+
this.config = config = extend(true,{},DEFAULTS, config || {});
44+
this.options = config.options || DEFAULTS.options;
45+
this.query = jsexpr.expr(config.query || DEFAULTS.query);
46+
this.sem = new Semaphore(config.maxCursors || DEFAULTS.maxCursors);
47+
this.wmmode = this.config.mode;
48+
49+
this.watermark = new Watermark(config.$datadir);
50+
await this.watermark.start();
51+
this.wm = await this.watermark.get(this.id);
52+
53+
if(this.wmmode == 'start' || !this.wm.last) {
54+
this.wm.last = config.watermark || DEFAULTS.watermark;
55+
}
56+
57+
if(callback)
58+
callback();
59+
}
60+
61+
get mode() {
62+
return Input.MODE.pull;
63+
}
64+
65+
async fetchData(time) {
66+
if(time && !this.sem.available())
67+
return await timer(time);
68+
69+
await this.sem.take();
70+
71+
let query = this.query(this.wm.last);
72+
let req = this.req = await this.server.request();
73+
req.stream = true;
74+
75+
req.on('row',row=>{
76+
logger.silly(`${this.id} row fetch:`,row);
77+
this.wm.last = row;
78+
this.queue.push(row);
79+
});
80+
81+
req.on('done',async()=>{
82+
logger.silly(`${this.id} Query completed`);
83+
await this.saveWatermark();
84+
this.sem.leave()
85+
});
86+
87+
req.on('error',async(err)=>{
88+
logger.error(`${this.id} Query error`,err);
89+
await this.saveWatermark();
90+
this.sem.leave();
91+
});
92+
93+
req.query(query);
94+
logger.silly(`${this.id}: Query: ${query}`);
95+
}
96+
97+
async saveWatermark() {
98+
try {
99+
await this.watermark.save(this.wm);
100+
logger.silly(`${this.id}: Watermark saved`);
101+
}catch(err) {
102+
logger.error(err);
103+
}
104+
}
105+
106+
async connect() {
107+
let connected = false;
108+
109+
while(!connected) {
110+
try {
111+
await sql.connect
112+
this.server = await sql.connect(this.options);
113+
this.server.on('error', () => logger.warn(`${this.id}: SQLServer Error: `,this.options));
114+
connected = true;
115+
}catch(err) {
116+
logger.error(`${this.id}: Cannot stablish connection to SQLServer :`, this.options);
117+
logger.error(err);
118+
await timer(2000);
119+
}
120+
}
121+
}
122+
123+
async start(callback) {
124+
this.connected = this.connect();
125+
126+
this.ival = setInterval(()=>{
127+
try {
128+
if(this.queue.size() >= MAX_BUFF && !this.req.paused) {
129+
logger.warn(`${this.id} : Query paused!`);
130+
this.req.pause();
131+
}
132+
else if(this.queue.size() < MAX_BUFF/2 && this.req.paused) {
133+
logger.warn(`${this.id} : Query resumed!`);
134+
this.req.resume();
135+
}
136+
}catch(err) {}
137+
});
138+
139+
if(callback) callback();
140+
}
141+
142+
async next(callback) {
143+
await this.connected;
144+
145+
while(!this.queue.size()) {
146+
// Perform query or wait if query is already running
147+
await this.fetchData(1000);
148+
149+
// Wait while query is executing or no data is provided
150+
while(!this.sem.available() && !this.queue.size()) {
151+
logger.silly(`${this.id} : Waiting to have results`);
152+
await timer(100);
153+
}
154+
}
155+
156+
// Take one element
157+
let data = await this.queue.pop();
158+
159+
// Return data
160+
if(callback) {
161+
callback(null,{
162+
id : this.id,
163+
type : this.type,
164+
database : this.options.database,
165+
originalMessage: data
166+
});
167+
}
168+
}
169+
170+
async stop(callback) {
171+
if(callback) callback();
172+
}
173+
174+
async pause(callback) {
175+
if(callback) callback();
176+
}
177+
178+
async resume(callback) {
179+
if(callback) callback();
180+
}
181+
182+
key(entry) {
183+
return `${entry.input}:${entry.type}@${entry.database}`;
184+
}
185+
}
186+
187+
if(module.parent) {
188+
module.exports = SQLServerInput;
189+
}
190+
else {
191+
logger.setLevel('debug');
192+
193+
async function test() {
194+
const config = {
195+
query : 'select TOP (1000) [Index],[Height_Inches],[Weight_Pounds] from hw_25000 where [Index] > ${Index}',
196+
watermark : {Index:0},
197+
mode : 'watermark',
198+
options : {
199+
//domain : "GRUPOICA",
200+
user: "david.gomez",
201+
password: "",
202+
database: "LogsDB",
203+
server: 'GICA-MAD-671',
204+
pool: {
205+
max: 10,
206+
min: 0,
207+
idleTimeoutMillis: 30000
208+
},
209+
options: {
210+
trustedConnection: true,
211+
encrypt: false, // for azure
212+
trustServerCertificate: true // change to true for local dev / self-signed certs
213+
}
214+
}
215+
}
216+
217+
let input = new SQLServerInput('sqlserver','sqlsever');
218+
await input.configure(config);
219+
await input.start();
220+
while(true) {
221+
await input.next((err,entry)=>{
222+
console.log(entry);
223+
});
224+
}
225+
}
226+
227+
test();
228+
229+
230+
/*
231+
async function test() {
232+
const sqlConfig = {
233+
//domain : "GRUPOICA",
234+
user: "david.gomez",
235+
password: "",
236+
database: "LogsDB",
237+
server: 'GICA-MAD-671',
238+
pool: {
239+
max: 10,
240+
min: 0,
241+
idleTimeoutMillis: 30000
242+
},
243+
options: {
244+
trustedConnection: true,
245+
encrypt: false, // for azure
246+
trustServerCertificate: true // change to true for local dev / self-signed certs
247+
}
248+
}
249+
250+
try {
251+
let pool = await sql.connect(sqlConfig);
252+
let req = await pool.request();
253+
req.stream = true;
254+
req.on('row', row => {
255+
console.log('ROW',row);
256+
});
257+
258+
req.query('select * from hw_25000');
259+
}catch(err) {
260+
console.error(err);
261+
}
262+
}
263+
264+
test();
265+
*/
266+
/*
267+
let input = new SQLServerInput("mongo","mongo");
268+
logger.setLevel('debug');
269+
input.configure({
270+
$datadir:'/tmp/nsyslog',
271+
url : 'mongodb://localhost/logicalog',
272+
query : {line:{$gt:'${line}'}},
273+
watermark : {line:0},
274+
maxCursors : 10
275+
},()=>{
276+
input.start(()=>{
277+
function next() {
278+
input.next((err,item)=>{
279+
if(err) {
280+
logger.error(err);
281+
process.exit(1);
282+
}
283+
else {
284+
logger.debug(item);
285+
setImmediate(next,1000);
286+
}
287+
});
288+
}
289+
next();
290+
});
291+
});
292+
*/
293+
}

0 commit comments

Comments
 (0)