Skip to content

Commit 8c7bccc

Browse files
lcaresiajverce
andauthored
[FEATURE] Add SQL Proxy support to Snowflake #12342 (#12511)
* Removing files from commit * Fix correct method to execute query * Update components/snowflake/snowflake.app.mjs Co-authored-by: Jay Vercellone <[email protected]> * Bump versions * Bump versions --------- Co-authored-by: Jay Vercellone <[email protected]>
1 parent 8e80461 commit 8c7bccc

File tree

24 files changed

+74
-52
lines changed

24 files changed

+74
-52
lines changed

components/snowflake/actions/execute-sql-query/execute-sql-query.mjs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import snowflake from "../../snowflake.app.mjs";
22

33
export default {
44
name: "Execute SQL Query",
5-
version: "0.1.2",
5+
version: "0.2.0",
66
key: "snowflake-execute-sql-query",
77
description: "Execute a custom Snowflake query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.",
88
type: "action",
@@ -18,13 +18,14 @@ export default {
1818
},
1919
},
2020
async run({ $ }) {
21-
const data = await this.snowflake.collectRows({
22-
sqlText: this.sql.query,
23-
binds: this.sql.params,
24-
});
21+
const args = this.snowflake.executeQueryAdapter(this.sql);
22+
23+
const data = await this.snowflake.executeQuery(args);
24+
2525
$.export("$summary", `Returned ${data.length} ${data.length === 1
2626
? "row"
2727
: "rows"}`);
28+
2829
return data;
2930
},
3031
};

components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export default {
66
key: "snowflake-insert-multiple-rows",
77
name: "Insert Multiple Rows",
88
description: "Insert multiple rows into a table",
9-
version: "0.1.1",
9+
version: "0.1.2",
1010
props: {
1111
snowflake,
1212
database: {

components/snowflake/actions/insert-row/insert-row.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export default {
55
key: "snowflake-insert-row",
66
name: "Insert Single Row",
77
description: "Insert a row into a table",
8-
version: "1.1.1",
8+
version: "1.1.2",
99
props: {
1010
snowflake,
1111
database: {

components/snowflake/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/snowflake",
3-
"version": "0.12.2",
3+
"version": "0.13.0",
44
"description": "Pipedream Snowflake Components",
55
"main": "snowflake.app.mjs",
66
"keywords": [
@@ -14,7 +14,7 @@
1414
"access": "public"
1515
},
1616
"dependencies": {
17-
"@pipedream/platform": "^2.0.0",
17+
"@pipedream/platform": "^3.0.0",
1818
"asn1.js": "^5.0.0",
1919
"lodash-es": "^4.17.21",
2020
"@pipedream/snowflake-sdk": "^1.0.8",

components/snowflake/snowflake.app.mjs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { createPrivateKey } from "crypto";
22
import { snowflake } from "@pipedream/snowflake-sdk";
33
import { promisify } from "util";
4-
import { sqlProp } from "@pipedream/platform";
4+
import {
5+
sqlProxy, sqlProp,
6+
} from "@pipedream/platform";
57

68
snowflake.configure({
79
logLevel: "WARN",
@@ -88,6 +90,7 @@ export default {
8890
},
8991
},
9092
methods: {
93+
...sqlProxy.methods,
9194
...sqlProp.methods,
9295
async _getConnection() {
9396
if (this.connection) {
@@ -169,7 +172,7 @@ export default {
169172
const binds = [
170173
this.$auth.schema,
171174
];
172-
const rows = await this.collectRows({
175+
const rows = await this.executeQuery({
173176
sqlText,
174177
binds,
175178
});
@@ -186,13 +189,31 @@ export default {
186189
return acc;
187190
}, {});
188191
},
189-
async getRows(statement) {
192+
proxyAdapter(preparedStatement = {}) {
193+
const {
194+
sqlText: query = "",
195+
binds: params = [],
196+
} = preparedStatement;
197+
return {
198+
query,
199+
params,
200+
};
201+
},
202+
executeQueryAdapter(proxyArgs = {}) {
203+
const {
204+
query: sqlText = "",
205+
params: binds = [],
206+
} = proxyArgs;
207+
return {
208+
sqlText,
209+
binds,
210+
};
211+
},
212+
async executeQuery(statement) {
190213
const connection = await this._getConnection();
191214
const executedStatement = connection.execute(statement);
192-
return executedStatement.streamRows();
193-
},
194-
async collectRows(statement) {
195-
const rowStream = await this.getRows(statement);
215+
216+
const rowStream = await executedStatement.streamRows();
196217
const rows = [];
197218
for await (const row of rowStream) {
198219
rows.push(row);
@@ -203,19 +224,19 @@ export default {
203224
database, schema,
204225
}) {
205226
let sqlText = `SHOW TABLES IN SCHEMA ${database}.${schema}`;
206-
return this.collectRows({
227+
return this.executeQuery({
207228
sqlText,
208229
});
209230
},
210231
async listDatabases() {
211232
const sqlText = "SHOW DATABASES";
212-
return this.collectRows({
233+
return this.executeQuery({
213234
sqlText,
214235
});
215236
},
216237
async listSchemas(database) {
217238
const sqlText = "SHOW SCHEMAS IN DATABASE IDENTIFIER(:1)";
218-
return this.collectRows({
239+
return this.executeQuery({
219240
sqlText,
220241
binds: [
221242
database,
@@ -224,26 +245,26 @@ export default {
224245
},
225246
async listWarehouses() {
226247
const sqlText = "SHOW WAREHOUSES";
227-
return this.collectRows({
248+
return this.executeQuery({
228249
sqlText,
229250
});
230251
},
231252
async listUsers() {
232253
const sqlText = "SHOW USERS";
233-
return this.collectRows({
254+
return this.executeQuery({
234255
sqlText,
235256
});
236257
},
237258
async maxQueryHistoryTimestamp() {
238259
const sqlText = "SELECT MAX(START_TIME) AS max_ts FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY";
239-
const maxTs = await this.collectRows({
260+
const maxTs = await this.executeQuery({
240261
sqlText,
241262
});
242263
return +new Date(maxTs[0]?.MAX_TS);
243264
},
244265
async maxTaskHistoryTimestamp() {
245266
const sqlText = "SELECT MAX(QUERY_START_TIME) AS max_ts FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())";
246-
const maxTs = await this.collectRows({
267+
const maxTs = await this.executeQuery({
247268
sqlText,
248269
});
249270
return +new Date(maxTs[0]?.MAX_TS);
@@ -257,7 +278,7 @@ export default {
257278
sqlText,
258279
binds,
259280
};
260-
return this.collectRows(statement);
281+
return this.executeQuery(statement);
261282
},
262283
// Query Snowflake query history.
263284
// start and endTime bound the query. Both expect epoch ms timestamps.
@@ -273,7 +294,7 @@ export default {
273294
sqlText,
274295
};
275296
console.log(`Running query: ${sqlText}`);
276-
return this.collectRows(statement);
297+
return this.executeQuery(statement);
277298
},
278299
async getFailedTasksInDatabase({
279300
startTime, database, schemas, taskName,
@@ -311,7 +332,7 @@ export default {
311332
sqlText,
312333
binds,
313334
};
314-
return this.collectRows(statement);
335+
return this.executeQuery(statement);
315336
},
316337
async getFailedTasksInWarehouse({
317338
startTime, endTime, warehouse,
@@ -331,7 +352,7 @@ export default {
331352
warehouse,
332353
],
333354
};
334-
return this.collectRows(statement);
355+
return this.executeQuery(statement);
335356
},
336357
async getChangesForSpecificObject(startTime, endTime, objectType) {
337358
const filters = `QUERY_TYPE != 'SELECT' AND (QUERY_TEXT ILIKE '%CREATE ${objectType}%' OR QUERY_TEXT ILIKE '%ALTER ${objectType}%' OR QUERY_TEXT ILIKE '%DROP ${objectType}%')`;
@@ -345,15 +366,15 @@ export default {
345366
sqlText,
346367
binds,
347368
};
348-
return this.collectRows(statement);
369+
return this.executeQuery(statement);
349370
},
350371
async insertRows(tableName, columns, binds) {
351372
const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES (${columns.map(() => "?").join(", ")});`;
352373
const statement = {
353374
sqlText,
354375
binds,
355376
};
356-
return this.collectRows(statement);
377+
return this.executeQuery(statement);
357378
},
358379
},
359380
};

components/snowflake/sources/change-to-warehouse/change-to-warehouse.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export default {
1717
// eslint-disable-next-line
1818
name: "New, Updated, or Deleted Warehouse",
1919
description: "Emit new events when a warehouse is created, altered, or dropped",
20-
version: "0.1.1",
20+
version: "0.1.2",
2121
async run() {
2222
await this.watchObjectsAndEmitChanges("WAREHOUSE", this.warehouses, this.queryTypes);
2323
},

components/snowflake/sources/common-delete.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export default {
4343
return array;
4444
},
4545
async fetchData() {
46-
return this.snowflake.getRows({
46+
return this.snowflake.executeQuery({
4747
sqlText: this.getSqlText(),
4848
});
4949
},

components/snowflake/sources/common-table-scan.mjs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export default {
1515
propDefinition: [
1616
common.props.snowflake,
1717
"schema",
18-
(c) => ({
18+
(c) => ({
1919
database: c.database,
2020
}),
2121
],
@@ -146,7 +146,7 @@ export default {
146146
sqlText,
147147
binds,
148148
};
149-
const rowStream = await this.snowflake.getRows(statement);
149+
const rowStream = await this.snowflake.executeQuery(statement);
150150
for await (const row of rowStream) {
151151
return row[this.uniqueKey];
152152
}

components/snowflake/sources/common-update.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export default {
2626
return this.db.set("dbValues", values);
2727
},
2828
async fetchData() {
29-
return this.snowflake.getRows({
29+
return this.snowflake.executeQuery({
3030
sqlText: this.getSqlText(),
3131
});
3232
},

components/snowflake/sources/common.mjs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export default {
2929
return result;
3030
},
3131
async processCollection(statement, timestamp) {
32-
const rowStream = await this.snowflake.getRows(statement);
32+
const rowStream = await this.snowflake.executeQuery(statement);
3333
const rows = await this.streamToArray(rowStream);
3434
this.$emit(rows, this.generateMetaForCollection({
3535
timestamp,
@@ -38,7 +38,7 @@ export default {
3838
async processSingle(statement, timestamp) {
3939
let lastResultId;
4040
let rowCount = 0;
41-
const rowStream = await this.snowflake.getRows(statement);
41+
const rowStream = await this.snowflake.executeQuery(statement);
4242
for await (const row of rowStream) {
4343
const meta = this.generateMeta({
4444
row,

0 commit comments

Comments
 (0)