@@ -67,23 +67,20 @@ Create a folder called `db` and inside this folder create `citus.js` file with t
67
67
* file: db/citus.js
68
68
*/
69
69
70
- const { Pool , Client } = require (' pg' );
70
+ const { Pool } = require (' pg' );
71
71
72
72
const pool = new Pool ({
73
+ max: 300 ,
74
+ connectionTimeoutMillis: 5000 ,
75
+
73
76
host: ' c.citustest.postgres.database.azure.com' ,
74
77
port: 5432 ,
75
78
user: ' citus' ,
76
79
password: ' Password123$' ,
77
80
database: ' citus' ,
78
81
ssl: true ,
79
- connectionTimeoutMillis: 0 ,
80
- idleTimeoutMillis: 0 ,
81
- min: 10 ,
82
- max: 20 ,
83
82
});
84
83
85
- pool .connect ();
86
-
87
84
module .exports = {
88
85
pool,
89
86
};
@@ -101,21 +98,26 @@ const { pool } = require('./db/citus');
101
98
102
99
async function queryDatabase () {
103
100
const queryString = `
104
- DROP TABLE IF EXISTS pharmacy;
105
- CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
106
- INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
107
- INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
108
- INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
109
- CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
101
+ DROP TABLE IF EXISTS pharmacy;
102
+ CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
103
+ INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
104
+ INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
105
+ INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
106
+ CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
110
107
` ;
111
108
112
109
try {
110
+ /* Real application code would probably request a dedicated client with
111
+ pool.connect() and run multiple queries with the client. In this example, we're
112
+ running only one query, so we use the pool.query() helper method to run it on
113
+ the first available idle client. */
114
+
113
115
await pool .query (queryString);
114
116
console .log (' Created the Pharmacy table and inserted rows.' );
115
117
} catch (err) {
116
118
console .log (err .stack );
117
119
} finally {
118
- client .end ();
120
+ pool .end ();
119
121
}
120
122
}
121
123
@@ -197,7 +199,7 @@ Use the following code to connect and read the data using a UPDATE SQL statement
197
199
* file: update.js
198
200
*/
199
201
200
- const { client } = require (' ./db/citus' );
202
+ const { pool } = require (' ./db/citus' );
201
203
202
204
async function queryDatabase () {
203
205
const queryString = `
@@ -280,22 +282,30 @@ const { pool } = require('./db/citus');
280
282
async function importCsvDatabase () {
281
283
return new Promise ((resolve , reject ) => {
282
284
const queryString = `
283
- COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
284
- ` ;
285
+ COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
286
+ ` ;
285
287
286
288
fileStream .on (' error' , reject);
287
289
288
- const stream = pool
289
- .query (copyFrom (queryString))
290
- .on (' error' , reject)
291
- .on (' end' , () => {
292
- reject (new Error (' Connection closed!' ));
290
+ pool
291
+ .connect ()
292
+ .then (client => {
293
+ const stream = client
294
+ .query (copyFrom (queryString))
295
+ .on (' error' , reject)
296
+ .on (' end' , () => {
297
+ reject (new Error (' Connection closed!' ));
298
+ })
299
+ .on (' finish' , () => {
300
+ client .release ();
301
+ resolve ();
302
+ });
303
+
304
+ fileStream .pipe (stream);
293
305
})
294
- .on ( ' finish ' , () => {
295
- resolve ( );
306
+ .catch ( err => {
307
+ reject ( new Error (err) );
296
308
});
297
-
298
- fileStream .pipe (stream);
299
309
});
300
310
}
301
311
@@ -321,44 +331,52 @@ The following code is an example for copying in-memory data to a table.
321
331
322
332
``` javascript
323
333
/**
324
- * file: copyinmemory.js
325
- */
334
+ * file: copyinmemory.js
335
+ */
326
336
327
337
const through2 = require (' through2' );
328
338
const copyFrom = require (' pg-copy-streams' ).from ;
329
339
const { pool } = require (' ./db/citus' );
330
340
331
341
async function importInMemoryDatabase () {
332
342
return new Promise ((resolve , reject ) => {
333
- const stream = pool
334
- .query (copyFrom (' COPY pharmacy FROM STDIN' ))
335
- .on (' error' , reject)
336
- .on (' end' , () => {
337
- reject (new Error (' Connection closed!' ));
343
+ pool
344
+ .connect ()
345
+ .then (client => {
346
+ const stream = client
347
+ .query (copyFrom (' COPY pharmacy FROM STDIN' ))
348
+ .on (' error' , reject)
349
+ .on (' end' , () => {
350
+ reject (new Error (' Connection closed!' ));
351
+ })
352
+ .on (' finish' , () => {
353
+ client .release ();
354
+ resolve ();
355
+ });
356
+
357
+ const internDataset = [
358
+ [' 100' , ' Target' , ' Sunnyvale' , ' California' , ' 94001' ],
359
+ [' 101' , ' CVS' , ' San Francisco' , ' California' , ' 94002' ],
360
+ ];
361
+
362
+ let started = false ;
363
+ const internStream = through2 .obj ((arr , _enc , cb ) => {
364
+ const rowText = (started ? ' \n ' : ' ' ) + arr .join (' \t ' );
365
+ started = true ;
366
+ cb (null , rowText);
367
+ });
368
+
369
+ internStream .on (' error' , reject).pipe (stream);
370
+
371
+ internDataset .forEach ((record ) => {
372
+ internStream .write (record);
373
+ });
374
+
375
+ internStream .end ();
338
376
})
339
- .on ( ' finish ' , () => {
340
- resolve ( );
377
+ .catch ( err => {
378
+ reject ( new Error (err) );
341
379
});
342
-
343
- const internDataset = [
344
- [' 100' , ' Target' , ' Sunnyvale' , ' California' , ' 94001' ],
345
- [' 101' , ' CVS' , ' San Francisco' , ' California' , ' 94002' ],
346
- ];
347
-
348
- let started = false ;
349
- const internStream = through2 .obj ((arr , _enc , cb ) => {
350
- const rowText = (started ? ' \n ' : ' ' ) + arr .join (' \t ' );
351
- started = true ;
352
- cb (null , rowText);
353
- });
354
-
355
- internStream .on (' error' , reject).pipe (stream);
356
-
357
- internDataset .forEach ((record ) => {
358
- internStream .write (record);
359
- });
360
-
361
- internStream .end ();
362
380
});
363
381
}
364
382
(async () => {
0 commit comments