Skip to content

Commit 566e797

Browse files
committed
Example of flow
1 parent 72d0765 commit 566e797

File tree

3 files changed

+305
-1
lines changed

3 files changed

+305
-1
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ To make this behaviour potentially automatic (avoiding manual wires), this node
8787
for downstream nodes to detect this feature, and a truthy `node.tickProvider` for upstream nodes.
8888
Likewise, this node detects upstream nodes using the same back-pressure convention, and automatically sends ticks.
8989

90+
### Example of flow
91+
92+
Example adding a new column in a table, then streaming (split) many lines from that table, batch-updating several lines at a time,
93+
then getting a sample consisting of a few lines:
94+
95+
Example: [flow.json](doc/flow.json)
96+
97+
![Node-RED flow](doc/flow.png)
98+
99+
The *debug* nodes illustrate some relevant information to look at.
100+
90101
## Sequences for split results
91102

92103
When the *Split results* option is enabled (streaming), the messages contain some information following the
@@ -101,7 +112,7 @@ conventions for [*messages sequences*](https://nodered.org/docs/user-guide/messa
101112
count: 6, // total number of message; only available in the last message of a sequence
102113
parts: {}, // optional upstream parts information
103114
},
104-
complete: true, // True only for the last message of a sequence
115+
complete: true, // True only for the last message of a sequence
105116
}
106117
```
107118

doc/flow.json

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
[
2+
{
3+
"id": "36d7a2e7.38e4de",
4+
"type": "inject",
5+
"z": "6bd3da1a.7e2b84",
6+
"name": "Start",
7+
"props": [
8+
{
9+
"p": "payload"
10+
},
11+
{
12+
"p": "topic",
13+
"vt": "str"
14+
}
15+
],
16+
"repeat": "",
17+
"crontab": "",
18+
"once": false,
19+
"onceDelay": 0.1,
20+
"topic": "",
21+
"payloadType": "date",
22+
"x": 190,
23+
"y": 1620,
24+
"wires": [
25+
[
26+
"1b00f74dc3098005"
27+
]
28+
]
29+
},
30+
{
31+
"id": "ee38d447.1c13a8",
32+
"type": "debug",
33+
"z": "6bd3da1a.7e2b84",
34+
"name": "Done",
35+
"active": true,
36+
"tosidebar": true,
37+
"console": false,
38+
"tostatus": true,
39+
"complete": "true",
40+
"targetType": "full",
41+
"statusVal": "complete",
42+
"statusType": "msg",
43+
"x": 1210,
44+
"y": 1620,
45+
"wires": []
46+
},
47+
{
48+
"id": "12f229bfef5ad2a5",
49+
"type": "function",
50+
"z": "6bd3da1a.7e2b84",
51+
"name": "Ready for next lines",
52+
"func": "return [\n msg.complete || msg.abort ? msg : null,\n { tick: true },\n];\n",
53+
"outputs": 2,
54+
"noerr": 0,
55+
"initialize": "",
56+
"finalize": "",
57+
"libs": [],
58+
"x": 980,
59+
"y": 1560,
60+
"wires": [
61+
[
62+
"ee38d447.1c13a8"
63+
],
64+
[
65+
"1b00f74dc3098005"
66+
]
67+
]
68+
},
69+
{
70+
"id": "178252a8d3c54b16",
71+
"type": "function",
72+
"z": "6bd3da1a.7e2b84",
73+
"name": "",
74+
"func": "let payload = `(0, FALSE),`;\nif (msg.payload && msg.payload.length > 0) {\n for (const line of msg.payload) {\n const valid = 'TRUE'; // Call some kind of test\n payload += `(${line['id']}, ${valid}),`;\n }\n}\nmsg.payload = payload.slice(0, - 1);\nreturn msg;\n",
75+
"outputs": 1,
76+
"noerr": 0,
77+
"initialize": "",
78+
"finalize": "",
79+
"libs": [],
80+
"x": 560,
81+
"y": 1620,
82+
"wires": [
83+
[
84+
"6d2073ec4db26f2f"
85+
]
86+
]
87+
},
88+
{
89+
"id": "4fd30ba36702842a",
90+
"type": "debug",
91+
"z": "6bd3da1a.7e2b84",
92+
"name": "Progress",
93+
"active": true,
94+
"tosidebar": true,
95+
"console": false,
96+
"tostatus": true,
97+
"complete": "true",
98+
"targetType": "full",
99+
"statusVal": "parts.index",
100+
"statusType": "msg",
101+
"x": 940,
102+
"y": 1620,
103+
"wires": []
104+
},
105+
{
106+
"id": "1b00f74dc3098005",
107+
"type": "postgresql",
108+
"z": "6bd3da1a.7e2b84",
109+
"name": "SELECT many",
110+
"query": "SELECT * FROM mytable\nORDER BY id ASC\nLIMIT 2000;\n",
111+
"postgreSQLConfig": "20ae1e52d1eef983",
112+
"split": true,
113+
"rowsPerMsg": "100",
114+
"outputs": 1,
115+
"x": 380,
116+
"y": 1620,
117+
"wires": [
118+
[
119+
"178252a8d3c54b16"
120+
]
121+
]
122+
},
123+
{
124+
"id": "6d2073ec4db26f2f",
125+
"type": "postgresql",
126+
"z": "6bd3da1a.7e2b84",
127+
"name": "UPDATE many",
128+
"query": "UPDATE mytable AS c\nSET validity = v.validity\nFROM (VALUES\n\t{{{ msg.payload }}}\n) AS v (id, validity)\nWHERE v.id = c.id;\n",
129+
"postgreSQLConfig": "20ae1e52d1eef983",
130+
"split": false,
131+
"rowsPerMsg": "1",
132+
"outputs": 1,
133+
"x": 740,
134+
"y": 1620,
135+
"wires": [
136+
[
137+
"12f229bfef5ad2a5",
138+
"4fd30ba36702842a"
139+
]
140+
]
141+
},
142+
{
143+
"id": "64a657de3954a4b5",
144+
"type": "debug",
145+
"z": "6bd3da1a.7e2b84",
146+
"name": "Results",
147+
"active": true,
148+
"tosidebar": true,
149+
"console": false,
150+
"tostatus": true,
151+
"complete": "true",
152+
"targetType": "full",
153+
"statusVal": "pgsql.rowCount",
154+
"statusType": "msg",
155+
"x": 560,
156+
"y": 1700,
157+
"wires": []
158+
},
159+
{
160+
"id": "adf069475c5e0ba3",
161+
"type": "postgresql",
162+
"z": "6bd3da1a.7e2b84",
163+
"name": "SELECT",
164+
"query": "SELECT * FROM mytable\nWHERE id < 100;\n",
165+
"postgreSQLConfig": "20ae1e52d1eef983",
166+
"split": false,
167+
"rowsPerMsg": "1",
168+
"outputs": 1,
169+
"x": 360,
170+
"y": 1700,
171+
"wires": [
172+
[
173+
"64a657de3954a4b5"
174+
]
175+
]
176+
},
177+
{
178+
"id": "3134bfc0f12e13c3",
179+
"type": "inject",
180+
"z": "6bd3da1a.7e2b84",
181+
"name": "Start",
182+
"props": [
183+
{
184+
"p": "payload"
185+
},
186+
{
187+
"p": "topic",
188+
"vt": "str"
189+
}
190+
],
191+
"repeat": "",
192+
"crontab": "",
193+
"once": false,
194+
"onceDelay": 0.1,
195+
"topic": "",
196+
"payloadType": "date",
197+
"x": 190,
198+
"y": 1700,
199+
"wires": [
200+
[
201+
"adf069475c5e0ba3"
202+
]
203+
]
204+
},
205+
{
206+
"id": "d04c65ee97e3a273",
207+
"type": "inject",
208+
"z": "6bd3da1a.7e2b84",
209+
"name": "Prepare",
210+
"props": [
211+
{
212+
"p": "payload"
213+
},
214+
{
215+
"p": "topic",
216+
"vt": "str"
217+
}
218+
],
219+
"repeat": "",
220+
"crontab": "",
221+
"once": false,
222+
"onceDelay": 0.1,
223+
"topic": "",
224+
"payloadType": "date",
225+
"x": 180,
226+
"y": 1520,
227+
"wires": [
228+
[
229+
"82b7c689d6682f72"
230+
]
231+
]
232+
},
233+
{
234+
"id": "c5f0b4b2442e3137",
235+
"type": "debug",
236+
"z": "6bd3da1a.7e2b84",
237+
"name": "Done",
238+
"active": true,
239+
"tosidebar": true,
240+
"console": false,
241+
"tostatus": true,
242+
"complete": "true",
243+
"targetType": "full",
244+
"statusVal": "pgsql",
245+
"statusType": "msg",
246+
"x": 550,
247+
"y": 1520,
248+
"wires": []
249+
},
250+
{
251+
"id": "82b7c689d6682f72",
252+
"type": "postgresql",
253+
"z": "6bd3da1a.7e2b84",
254+
"name": "ADD COLUMN",
255+
"query": "ALTER TABLE mytable\n DROP COLUMN IF EXISTS validity;\n\nALTER TABLE mytable\n ADD COLUMN validity BOOLEAN;\n",
256+
"postgreSQLConfig": "20ae1e52d1eef983",
257+
"split": false,
258+
"rowsPerMsg": "10",
259+
"outputs": 1,
260+
"x": 380,
261+
"y": 1520,
262+
"wires": [
263+
[
264+
"c5f0b4b2442e3137"
265+
]
266+
]
267+
},
268+
{
269+
"id": "20ae1e52d1eef983",
270+
"type": "postgreSQLConfig",
271+
"name": "myuser@timescale:5432/iot",
272+
"host": "timescale",
273+
"hostFieldType": "str",
274+
"port": "5432",
275+
"portFieldType": "num",
276+
"database": "iot",
277+
"databaseFieldType": "str",
278+
"ssl": "false",
279+
"sslFieldType": "bool",
280+
"max": "10",
281+
"maxFieldType": "num",
282+
"min": "1",
283+
"minFieldType": "num",
284+
"idle": "1000",
285+
"idleFieldType": "num",
286+
"connectionTimeout": "10000",
287+
"connectionTimeoutFieldType": "num",
288+
"user": "myuser",
289+
"userFieldType": "str",
290+
"password": "???",
291+
"passwordFieldType": "str"
292+
}
293+
]

doc/flow.png

13.8 KB
Loading

0 commit comments

Comments
 (0)