forked from posit-dev/querychat
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_source.R
More file actions
199 lines (181 loc) · 5.74 KB
/
data_source.R
File metadata and controls
199 lines (181 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#' Create a data source for querychat
#'
#' Generic function to create a data source for querychat. This function
#' dispatches to appropriate methods based on input.
#'
#' @param x A data frame or DBI connection
#' @param table_name The name to use for the table in the data source. Can be:
#' - A character string (e.g., "table_name")
#' - Or, for tables contained within catalogs or schemas, a [DBI::Id()] object (e.g., `DBI::Id(schema = "schema_name", table = "table_name")`)
#' @param categorical_threshold For text columns, the maximum number of unique values to consider as a categorical variable
#' @param ... Additional arguments passed to specific methods
#' @return A querychat_data_source object
#' @export
querychat_data_source <- function(x, ...) {
UseMethod("querychat_data_source")
}
#' @export
#' @rdname querychat_data_source
querychat_data_source.data.frame <- function(
x,
table_name = NULL,
categorical_threshold = 20,
...
) {
if (is.null(table_name)) {
# Infer table name from dataframe name, if not already added
table_name <- deparse(substitute(x))
if (is.null(table_name) || table_name == "NULL" || table_name == "x") {
rlang::abort(
"Unable to infer table name. Please specify `table_name` argument explicitly."
)
}
}
is_table_name_ok <- is.character(table_name) &&
length(table_name) == 1 &&
grepl("^[a-zA-Z][a-zA-Z0-9_]*$", table_name, perl = TRUE)
if (!is_table_name_ok) {
rlang::abort(
"`table_name` argument must be a string containing a valid table name."
)
}
# Create duckdb connection
conn <- DBI::dbConnect(duckdb::duckdb(), dbdir = ":memory:")
duckdb::duckdb_register(conn, table_name, x, experimental = FALSE)
structure(
list(
conn = conn,
table_name = table_name,
categorical_threshold = categorical_threshold
),
class = c("data_frame_source", "dbi_source", "querychat_data_source")
)
}
#' @export
#' @rdname querychat_data_source
querychat_data_source.DBIConnection <- function(
x,
table_name,
categorical_threshold = 20,
...
) {
# Handle different types of table_name inputs
if (inherits(table_name, "Id")) {
# DBI::Id object - keep as is
} else if (is.character(table_name) && length(table_name) == 1) {
# Character string - keep as is
} else {
# Invalid input
rlang::abort(
"`table_name` must be a single character string or a DBI::Id object"
)
}
# Check if table exists
if (!DBI::dbExistsTable(x, table_name)) {
rlang::abort(paste0(
"Table ",
DBI::dbQuoteIdentifier(x, table_name),
" not found in database. If you're using a table in a catalog or schema, pass a DBI::Id",
" object to `table_name`"
))
}
structure(
list(
conn = x,
table_name = table_name,
categorical_threshold = categorical_threshold
),
class = c("dbi_source", "querychat_data_source")
)
}
#' Execute a SQL query on a data source
#'
#' @param source A querychat_data_source object
#' @param query SQL query string
#' @param ... Additional arguments passed to methods
#' @return Result of the query as a data frame
#' @export
execute_query <- function(source, query, ...) {
UseMethod("execute_query")
}
#' @export
execute_query.dbi_source <- function(source, query, ...) {
if (is.null(query) || query == "") {
# For a null or empty query, default to returning the whole table (ie SELECT *)
query <- paste0(
"SELECT * FROM ",
DBI::dbQuoteIdentifier(source$conn, source$table_name)
)
}
# Execute the query directly
DBI::dbGetQuery(source$conn, query)
}
#' Get a lazy representation of a data source
#'
#' @param source A querychat_data_source object
#' @param query SQL query string
#' @param ... Additional arguments passed to methods
#' @return A lazy representation (typically a dbplyr tbl)
#' @export
get_lazy_data <- function(source, query, ...) {
UseMethod("get_lazy_data")
}
#' @export
get_lazy_data.dbi_source <- function(
source,
query = NULL,
...
) {
if (is.null(query) || query == "") {
# For a null or empty query, default to returning the whole table (ie SELECT *)
dplyr::tbl(source$conn, source$table_name)
} else {
# Clean the SQL query to avoid dbplyr issues with syntax problems
cleaned_query <- clean_sql(query, enforce_select = TRUE)
if (is.null(cleaned_query)) {
# If cleaning results in an empty query, raise an error
rlang::abort(c(
"Query cleaning resulted in an empty query.",
"i" = "Check the original query for proper syntax.",
"i" = "Query may consist only of comments or invalid SQL."
))
} else {
# Use dbplyr::sql to create a safe SQL query object with the cleaned query
# No fallback to full table on error - let errors propagate to the caller
dplyr::tbl(source$conn, dbplyr::sql(cleaned_query))
}
}
}
#' Test a SQL query on a data source.
#'
#' @param source A querychat_data_source object
#' @param query SQL query string
#' @param ... Additional arguments passed to methods
#' @return Result of the query, limited to one row of data.
#' @export
test_query <- function(source, query, ...) {
UseMethod("test_query")
}
#' @export
test_query.dbi_source <- function(source, query, ...) {
rs <- DBI::dbSendQuery(source$conn, query)
df <- DBI::dbFetch(rs, n = 1)
DBI::dbClearResult(rs)
df
}
#' Clean up a data source (close connections, etc.)
#'
#' @param source A querychat_data_source object
#' @param ... Additional arguments passed to methods
#' @return NULL (invisibly)
#' @export
cleanup_source <- function(source, ...) {
UseMethod("cleanup_source")
}
#' @export
cleanup_source.dbi_source <- function(source, ...) {
if (!is.null(source$conn) && DBI::dbIsValid(source$conn)) {
DBI::dbDisconnect(source$conn)
}
invisible(NULL)
}