Skip to content

Commit 3598b14

Browse files
refactor: Rewrite execute functions into on one function
- Enhanced logging in examples to provide feedback on the number of affected rows and results of DDL operations. Signed-off-by: Edmund Miller <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent 593e4f4 commit 3598b14

File tree

5 files changed

+101
-140
lines changed

5 files changed

+101
-140
lines changed

README.md

Lines changed: 33 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri
44

55
The following databases are currently supported:
66

7-
* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
8-
* [DuckDB](https://duckdb.org/)
9-
* [H2](https://www.h2database.com)
10-
* [MySQL](https://www.mysql.com/)
11-
* [MariaDB](https://mariadb.org/)
12-
* [PostgreSQL](https://www.postgresql.org/)
13-
* [SQLite](https://www.sqlite.org/index.html)
7+
- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
8+
- [DuckDB](https://duckdb.org/)
9+
- [H2](https://www.h2database.com)
10+
- [MySQL](https://www.mysql.com/)
11+
- [MariaDB](https://mariadb.org/)
12+
- [PostgreSQL](https://www.postgresql.org/)
13+
- [SQLite](https://www.sqlite.org/index.html)
1414

1515
NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.
1616

@@ -24,7 +24,6 @@ plugins {
2424
}
2525
```
2626

27-
2827
## Configuration
2928

3029
You can configure any number of databases under the `sql.db` configuration scope. For example:
@@ -79,7 +78,7 @@ The following options are available:
7978

8079
`batchSize`
8180
: Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once.
82-
: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
81+
: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
8382

8483
`emitColumns`
8584
: When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel.
@@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5);
104103
INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6);
105104
```
106105

107-
*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
106+
_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
108107

109108
The following options are available:
110109

@@ -125,21 +124,23 @@ The following options are available:
125124

126125
`setup`
127126
: A SQL statement that is executed before inserting the data, e.g. to create the target table.
128-
: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run.
127+
: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run.
129128

130129
## SQL Execution Functions
131130

132-
This plugin provides the following functions for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations.
131+
This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations.
133132

134133
### sqlExecute
135134

136-
The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For example:
135+
The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns `null`.
136+
137+
For example:
137138

138139
```nextflow
139140
include { sqlExecute } from 'plugin/nf-sqldb'
140141
141-
// Create a table
142-
sqlExecute(
142+
// Create a table (returns null for DDL operations)
143+
def createResult = sqlExecute(
143144
db: 'foo',
144145
statement: '''
145146
CREATE TABLE IF NOT EXISTS sample_table (
@@ -149,51 +150,24 @@ sqlExecute(
149150
)
150151
'''
151152
)
153+
println "Create result: $createResult" // null
152154
153-
// Insert data
154-
sqlExecute(
155+
// Insert data (returns 1 for number of rows affected)
156+
def insertedRows = sqlExecute(
155157
db: 'foo',
156158
statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)"
157159
)
158-
159-
// Delete data
160-
sqlExecute(
161-
db: 'foo',
162-
statement: "DELETE FROM sample_table WHERE id = 1"
163-
)
164-
```
165-
166-
The following options are available:
167-
168-
`db`
169-
: The database handle. It must be defined under `sql.db` in the Nextflow configuration.
170-
171-
`statement`
172-
: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set.
173-
174-
### executeUpdate
175-
176-
The `executeUpdate` function is similar to `sqlExecute`, but it returns the number of rows affected by the SQL statement. This is particularly useful for DML operations like `INSERT`, `UPDATE`, and `DELETE` where you need to know how many rows were affected. For example:
177-
178-
```nextflow
179-
include { executeUpdate } from 'plugin/nf-sqldb'
180-
181-
// Insert data and get the number of rows inserted
182-
def insertedRows = executeUpdate(
183-
db: 'foo',
184-
statement: "INSERT INTO sample_table (id, name, value) VALUES (2, 'beta', 20.5)"
185-
)
186160
println "Inserted $insertedRows row(s)"
187161
188-
// Update data and get the number of rows updated
189-
def updatedRows = executeUpdate(
162+
// Update data (returns number of rows updated)
163+
def updatedRows = sqlExecute(
190164
db: 'foo',
191-
statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'beta'"
165+
statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'"
192166
)
193167
println "Updated $updatedRows row(s)"
194168
195-
// Delete data and get the number of rows deleted
196-
def deletedRows = executeUpdate(
169+
// Delete data (returns number of rows deleted)
170+
def deletedRows = sqlExecute(
197171
db: 'foo',
198172
statement: "DELETE FROM sample_table WHERE value > 25"
199173
)
@@ -206,25 +180,27 @@ The following options are available:
206180
: The database handle. It must be defined under `sql.db` in the Nextflow configuration.
207181

208182
`statement`
209-
: The SQL statement to execute. This should be a DML statement that can return a count of affected rows.
183+
: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set.
210184

211-
## Differences Between Dataflow Operators and Execution Functions
185+
## Differences Between Dataflow Operators and Execution Function
212186

213187
The plugin provides two different ways to interact with databases:
214188

215189
1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels.
190+
216191
- `fromQuery`: Queries data from a database and returns a channel that emits the results.
217192
- `sqlInsert`: Takes data from a channel and inserts it into a database.
218193

219-
2. **Execution Functions** (`sqlExecute` and `executeUpdate`): These are designed for direct SQL statement execution that doesn't require channel integration.
220-
- `sqlExecute`: Executes a SQL statement without returning any data.
221-
- `executeUpdate`: Executes a SQL statement and returns the count of affected rows.
194+
2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration.
195+
- `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null.
222196

223197
Use **Dataflow Operators** when you need to:
198+
224199
- Query data that will flow into your pipeline processing
225200
- Insert data from your pipeline processing into a database
226201

227-
Use **Execution Functions** when you need to:
202+
Use **Execution Function** when you need to:
203+
228204
- Perform database setup (creating tables, schemas, etc.)
229205
- Execute administrative commands
230206
- Perform one-off operations (deleting all records, truncating a table, etc.)
@@ -262,4 +238,4 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an
262238

263239
Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously.
264240

265-
In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.
241+
In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.

examples/sql-execution/main.nf

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#!/usr/bin/env nextflow
22

33
/*
4-
* Example script demonstrating how to use the SQL sqlExecute and executeUpdate functions
4+
* Example script demonstrating how to use the SQL sqlExecute function
55
*/
66

7-
include { sqlExecute; executeUpdate } from 'plugin/nf-sqldb'
7+
include { sqlExecute } from 'plugin/nf-sqldb'
88
include { fromQuery } from 'plugin/nf-sqldb'
99

1010
// Define database configuration in nextflow.config file
@@ -13,13 +13,13 @@ include { fromQuery } from 'plugin/nf-sqldb'
1313
workflow {
1414
log.info """
1515
=========================================
16-
SQL Execution Functions Example
16+
SQL Execution Function Example
1717
=========================================
1818
"""
1919

20-
// Step 1: Create a table
20+
// Step 1: Create a table (DDL operation returns null)
2121
log.info "Creating a sample table..."
22-
def createResult = executeUpdate(
22+
def createResult = sqlExecute(
2323
db: 'demo',
2424
statement: '''
2525
CREATE TABLE IF NOT EXISTS TEST_TABLE (
@@ -31,9 +31,9 @@ workflow {
3131
)
3232
log.info "Create table result: $createResult"
3333

34-
// Step 2: Insert some data
34+
// Step 2: Insert some data (DML operation returns affected row count)
3535
log.info "Inserting data..."
36-
executeUpdate(
36+
def insertCount = sqlExecute(
3737
db: 'demo',
3838
statement: '''
3939
INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES
@@ -43,27 +43,30 @@ workflow {
4343
(4, 'delta', 40.9);
4444
'''
4545
)
46+
log.info "Inserted $insertCount rows"
4647

47-
// Step 3: Update some data
48+
// Step 3: Update some data (DML operation returns affected row count)
4849
log.info "Updating data..."
49-
executeUpdate(
50+
def updateCount = sqlExecute(
5051
db: 'demo',
5152
statement: '''
5253
UPDATE TEST_TABLE
5354
SET VALUE = VALUE * 2
5455
WHERE ID = 2;
5556
'''
5657
)
58+
log.info "Updated $updateCount rows"
5759

58-
// Step 4: Delete some data
60+
// Step 4: Delete some data (DML operation returns affected row count)
5961
log.info "Deleting data..."
60-
executeUpdate(
62+
def deleteCount = sqlExecute(
6163
db: 'demo',
6264
statement: '''
6365
DELETE FROM TEST_TABLE
6466
WHERE ID = 4;
6567
'''
6668
)
69+
log.info "Deleted $deleteCount rows"
6770

6871
// Step 5: Query results
6972
log.info "Querying results..."

plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,14 @@ class ChannelSqlExtension extends PluginExtensionPoint {
145145

146146
/**
147147
* Execute a SQL statement that does not return a result set (DDL/DML statements)
148+
* For DML statements (INSERT, UPDATE, DELETE), it returns the number of affected rows
149+
* For DDL statements (CREATE, ALTER, DROP), it returns null
148150
*
149151
* @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute)
152+
* @return The number of rows affected by the SQL statement for DML operations, null for DDL operations
150153
*/
151154
@Function
152-
void sqlExecute(Map params) {
155+
Integer sqlExecute(Map params) {
153156
CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS)
154157

155158
final String dbName = params.db as String ?: 'default'
@@ -173,7 +176,18 @@ class ChannelSqlExtension extends PluginExtensionPoint {
173176

174177
try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) {
175178
try (Statement stm = conn.createStatement()) {
176-
stm.execute(normalizeStatement(statement))
179+
String normalizedStatement = normalizeStatement(statement)
180+
181+
// For DDL statements (CREATE, ALTER, DROP, etc.), execute() returns true if the first result is a ResultSet
182+
// For DML statements (INSERT, UPDATE, DELETE), executeUpdate() returns the number of rows affected
183+
boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*")
184+
185+
if (isDDL) {
186+
stm.execute(normalizedStatement)
187+
return null
188+
} else {
189+
return stm.executeUpdate(normalizedStatement)
190+
}
177191
}
178192
}
179193
catch (Exception e) {
@@ -182,47 +196,6 @@ class ChannelSqlExtension extends PluginExtensionPoint {
182196
}
183197
}
184198

185-
/**
186-
* Execute a SQL statement that does not return a result set (DDL/DML statements)
187-
* and returns the number of affected rows
188-
*
189-
* @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute)
190-
* @return The number of rows affected by the SQL statement
191-
*/
192-
@Function
193-
int executeUpdate(Map params) {
194-
CheckHelper.checkParams('executeUpdate', params, EXECUTE_PARAMS)
195-
196-
final String dbName = params.db as String ?: 'default'
197-
final String statement = params.statement as String
198-
199-
if (!statement)
200-
throw new IllegalArgumentException("Missing required parameter 'statement'")
201-
202-
final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db'))
203-
final SqlDataSource dataSource = sqlConfig.getDataSource(dbName)
204-
205-
if (dataSource == null) {
206-
def msg = "Unknown db name: $dbName"
207-
def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames()
208-
if (choices?.size() == 1)
209-
msg += " - Did you mean: ${choices.get(0)}?"
210-
else if (choices)
211-
msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n'
212-
throw new IllegalArgumentException(msg)
213-
}
214-
215-
try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) {
216-
try (Statement stm = conn.createStatement()) {
217-
return stm.executeUpdate(normalizeStatement(statement))
218-
}
219-
}
220-
catch (Exception e) {
221-
log.error("Error executing SQL update statement: ${e.message}", e)
222-
throw e
223-
}
224-
}
225-
226199
/**
227200
* Normalizes a SQL statement by adding a semicolon if needed
228201
*

0 commit comments

Comments
 (0)