Skip to content

Commit a0ece1a

Browse files
authored
Refactor sources to support cloud (chitralverma#141)
* Fixes for 0.45.x * Steward Version Updates * Move parse_json_to_options to correct mod * Refactor scan for parquet * Refactor scan for ipc * Refactor scan for csv * Refactor scan for ndjson * Refactor docstrings
1 parent 31eca38 commit a0ece1a

36 files changed

+1093
-844
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ target/
1616
.settings/
1717
.metals
1818
.bloop
19+
metals.sbt

core/src/main/scala/org/polars/scala/polars/Polars.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.polars.scala.polars
22

3-
import org.polars.scala.polars.api.io.builders._
3+
import org.polars.scala.polars.api.io.Scannable
44
import org.polars.scala.polars.api.{DataFrame, LazyFrame}
55
import org.polars.scala.polars.config.Config
66
import org.polars.scala.polars.internal.jni.{common, data_frame, lazy_frame}
@@ -11,13 +11,17 @@ object Polars {
1111

1212
def version(): String = common.version()
1313

14-
def csv: CSVInputBuilder = new CSVInputBuilder()
15-
16-
def parquet: ParquetInputBuilder = new ParquetInputBuilder()
17-
18-
def ipc: IPCInputBuilder = new IPCInputBuilder()
19-
20-
def ndJson: NdJsonInputBuilder = new NdJsonInputBuilder()
14+
/** Returns a [[org.polars.scala.polars.api.io.Scannable Scannable]] that can be used to lazily
15+
* scan datasets of various formats ([[org.polars.scala.polars.api.io.Scannable.parquet
16+
* parquet]], [[org.polars.scala.polars.api.io.Scannable.ipc ipc]],
17+
* [[org.polars.scala.polars.api.io.Scannable.csv csv]] and
18+
* [[org.polars.scala.polars.api.io.Scannable.jsonLines jsonLines]]) from local filesystems and
19+
* cloud object stores (aws, gcp and azure) as a
20+
* [[org.polars.scala.polars.api.LazyFrame LazyFrame]].
21+
* @return
22+
* [[org.polars.scala.polars.api.io.Scannable Scannable]]
23+
*/
24+
def scan: Scannable = new Scannable()
2125

2226
def concat(lazyFrame: LazyFrame, lazyFrames: Array[LazyFrame]): LazyFrame =
2327
concat(lazyFrame, lazyFrames, reChunk = false, parallel = true)
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package org.polars.scala.polars.api.io
2+
3+
import scala.annotation.varargs
4+
import scala.collection.mutable.{Map => MutableMap}
5+
import scala.jdk.CollectionConverters._
6+
7+
import org.polars.scala.polars.Polars
8+
import org.polars.scala.polars.api.LazyFrame
9+
import org.polars.scala.polars.internal.jni.io.scan._
10+
11+
/** Interface used to scan datasets of various formats from local filesystems and cloud object
12+
* stores (aws, gcp and azure). Use [[Polars.scan scan()]] to access this.
13+
*
14+
* Cloud options are global and can be set by methods like [[option option[s]()]]
15+
* - For amazon s3 options, see
16+
* [[https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants here]]
17+
* - For google cloud options, see
18+
* [[https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants here]]
19+
* - For azure options, see
20+
* [[https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants here]]
21+
*/
22+
class Scannable private[polars] () {
23+
import org.polars.scala.polars.jsonMapper
24+
25+
private val _options: MutableMap[String, String] = MutableMap.empty[String, String]
26+
27+
/** Adds options for the underlying dataset. */
28+
def options(opts: Iterable[(String, String)]): Scannable = synchronized {
29+
opts.foreach { case (key, value) => option(key, value) }
30+
this
31+
}
32+
33+
/** Adds options for the underlying dataset. */
34+
def options(opts: java.util.Map[String, String]): Scannable = synchronized {
35+
opts.asScala.foreach { case (key, value) => option(key, value) }
36+
this
37+
}
38+
39+
/** Adds an option for the underlying dataset. */
40+
def option(key: String, value: String): Scannable = synchronized {
41+
if (Option(key).exists(_.trim.isEmpty) || Option(value).exists(_.trim.isEmpty)) {
42+
throw new IllegalArgumentException("Option key or value cannot be null or empty.")
43+
}
44+
45+
_options.put(key.trim, value.trim)
46+
this
47+
}
48+
49+
/** Scans a dataset in Parquet format from the specified path(s) (local or cloud). Supports
50+
* globbing and path expansion.
51+
*
52+
* Supported options:
53+
* - `scan_parquet_n_rows`: Maximum number of rows to read. Default: `null`.
54+
* - `scan_parquet_parallel`: Strategy for parallelism. Supported values:
55+
* - `auto` (default): Automatically determines the unit of parallelization.
56+
* - `columns`: Parallelizes over columns.
57+
* - `row_groups`: Parallelizes over row groups.
58+
* - `prefiltered`: Evaluates pushed-down predicates in parallel and filters rows.
59+
* - `none`: Disables parallelism.
60+
* - `scan_parquet_row_index_name`: Adds a row index column with the specified name. Default:
61+
* `null`.
62+
* - `scan_parquet_row_index_offset`: Offset (≥0) for row index column (used only if
63+
* `scan_parquet_row_index_name` is set). Default: `0`.
64+
* - `scan_parquet_use_statistics`: Uses parquet statistics to skip unnecessary pages.
65+
* Default: `true`.
66+
* - `scan_parquet_cache`: Caches the scan result. Default: `true`.
67+
* - `scan_parquet_glob`: Expands paths using globbing rules. Default: `true`.
68+
* - `scan_parquet_low_memory`: Reduces memory usage at the cost of performance. Default:
69+
* `false`.
70+
* - `scan_parquet_rechunk`: Re-chunks the final DataFrame for memory contiguity when reading
71+
* multiple files. Default: `false`.
72+
* - `scan_parquet_allow_missing_columns`: Returns NULL columns instead of errors for missing
73+
* columns across files. Default: `false`.
74+
* - `scan_parquet_include_file_paths`: Includes source file paths as a column with the
75+
* specified name. Default: `null`.
76+
* - `scan_parquet_hive_scan_partitions`: Infers schema from Hive partitions and use them to
77+
* prune reads. Default: `true`.
78+
* - `scan_parquet_hive_try_parse_dates`: Attempts to parse Hive values as date/datetime.
79+
* Default: `true`.
80+
*
81+
* @param path
82+
* Main input file location.
83+
* @param paths
84+
* Additional input file locations.
85+
*
86+
* @note
87+
* All provided paths must belong to the same object store.
88+
*/
89+
@varargs
90+
def parquet(path: String, paths: String*): LazyFrame =
91+
LazyFrame.withPtr(
92+
scanParquet(
93+
paths = paths.+:(path).toArray[String],
94+
options = jsonMapper.writeValueAsString(_options)
95+
)
96+
)
97+
98+
/** Scans a dataset in IPC format from the specified path(s) (local or cloud). Supports globbing
99+
* and path expansion.
100+
*
101+
* Supported options:
102+
* - `scan_ipc_n_rows`: Maximum number of rows to read. Default: `null`.
103+
* - `scan_ipc_cache`: Caches the scan result. Default: `true`.
104+
* - `scan_ipc_rechunk`: Re-chunks the final DataFrame for memory contiguity when reading
105+
* multiple files. Default: `false`.
106+
* - `scan_ipc_row_index_name`: Adds a row index column with the specified name. Default:
107+
* `null`.
108+
* - `scan_ipc_row_index_offset`: Offset (≥0) for row index column (used only if
109+
* `scan_ipc_row_index_name` is set). Default: `0`.
110+
* - `scan_ipc_include_file_paths`: Includes source file paths as a column with the specified
111+
* name. Default: `null`.
112+
* - `scan_ipc_hive_scan_partitions`: Infers schema from Hive partitions. Default: `true`.
113+
* - `scan_ipc_hive_try_parse_dates`: Attempts to parse Hive values as date/datetime.
114+
* Default: `true`.
115+
*
116+
* @param path
117+
* Main input file location.
118+
* @param paths
119+
* Additional input file locations.
120+
*
121+
* @note
122+
* All provided paths must belong to the same object store.
123+
*/
124+
@varargs
125+
def ipc(path: String, paths: String*): LazyFrame =
126+
LazyFrame.withPtr(
127+
scanIPC(
128+
paths = paths.+:(path).toArray[String],
129+
options = jsonMapper.writeValueAsString(_options)
130+
)
131+
)
132+
133+
/** Scans a dataset in CSV format from the specified path(s) (local or cloud). Supports globbing
134+
* and path expansion.
135+
*
136+
* Supported options:
137+
* - `scan_csv_n_rows`: Maximum number of rows to read. Default: `null`.
138+
* - `scan_csv_encoding`: File encoding. Supported values:
139+
* - `utf8` (default): UTF-8 encoding.
140+
* - `lossy_utf8`: UTF-8 with invalid bytes replaced by `�`.
141+
* - `scan_csv_row_index_name`: Adds a row index column with the specified name. Default:
142+
* `null`.
143+
* - `scan_csv_row_index_offset`: Offset (≥0) for row index column (used only if
144+
* `scan_csv_row_index_name` is set). Default: `0`.
145+
* - `scan_csv_cache`: Caches the scan result. Default: `true`.
146+
* - `scan_csv_glob`: Expands paths using globbing rules. Default: `true`.
147+
* - `scan_csv_low_memory`: Reduces memory usage at the cost of performance. Default:
148+
* `false`.
149+
* - `scan_csv_rechunk`: Re-chunks the final DataFrame for memory contiguity when reading
150+
* multiple files. Default: `false`.
151+
* - `scan_csv_include_file_paths`: Includes source file paths as a column with the specified
152+
* name. Default: `null`.
153+
* - `scan_csv_raise_if_empty`: Returns an empty LazyFrame instead of raising errors for
154+
* empty datasets. Default: `true`.
155+
* - `scan_csv_ignore_errors`: Continues parsing despite errors in some lines. Default:
156+
* `false`.
157+
* - `scan_csv_has_header`: Treats the first row as a header. If false, generates column
158+
* names as `column_x`. Default: `true`.
159+
* - `scan_csv_missing_is_null`: Treats missing fields as null values. Default: `true`.
160+
* - `scan_csv_truncate_ragged_lines`: Truncates lines exceeding the schema length. Default:
161+
* `false`.
162+
* - `scan_csv_try_parse_dates`: Attempts to parse dates automatically. Default: `false`.
163+
* - `scan_csv_decimal_comma`: Uses commas as decimal separators. Default: `false`.
164+
* - `scan_csv_chunk_size`: Parser chunk size for memory optimization. Default: `2^18`.
165+
* - `scan_csv_skip_rows`: Skips the first `n` rows. Default: `0`.
166+
* - `scan_csv_skip_rows_after_header`: Number of rows to skip after the header. Default:
167+
* `0`.
168+
* - `scan_csv_skip_infer_schema_length`: Number of rows to use for schema inference.
169+
* Default: `100`.
170+
* - `scan_csv_separator`: Column separator character. Default: `,`.
171+
* - `scan_csv_quote_char`: Quote character for values. Default: `"`.
172+
* - `scan_csv_eol_char`: End-of-line character. Default: `\n`.
173+
* - `scan_csv_null_value`: Value to interpret as null. Default: `null`.
174+
* - `scan_csv_comment_prefix`: Prefix for comment lines. Default: `null`.
175+
*
176+
* @param path
177+
* Main input file location.
178+
* @param paths
179+
* Additional input file locations.
180+
*
181+
* @note
182+
* All provided paths must belong to the same object store.
183+
*/
184+
@varargs
185+
def csv(path: String, paths: String*): LazyFrame =
186+
LazyFrame.withPtr(
187+
scanCSV(
188+
paths = paths.+:(path).toArray[String],
189+
options = jsonMapper.writeValueAsString(_options)
190+
)
191+
)
192+
193+
/** Scans the contents of a dataset in Newline Delimited JSON (NDJSON) format from the specified
194+
* path(s) (local and cloud). Provided paths support globbing and expansion.
195+
*
196+
* Supported options:
197+
* - `scan_ndjson_n_rows`: Maximum number of rows to read. Default: `null`.
198+
* - `scan_ndjson_row_index_name`: Adds a row index column with the specified name. Default:
199+
* `null`.
200+
* - `scan_ndjson_row_index_offset`: Offset (≥0) for row index column (used only if
201+
* `scan_ndjson_row_index_name` is set). Default: `0`.
202+
* - `scan_ndjson_low_memory`: Reduces memory usage at the cost of performance. Default:
203+
* `false`.
204+
* - `scan_ndjson_rechunk`: Re-chunks the final DataFrame for memory contiguity when reading
205+
* multiple files. Default: `false`.
206+
* - `scan_ndjson_include_file_paths`: Includes source file paths as a column with the
207+
* specified name. Default: `null`.
208+
* - `scan_ndjson_ignore_errors`: Continues parsing despite errors in some lines. Default:
209+
* `false`.
210+
* - `scan_ndjson_batch_size`: Number of rows to read in each batch to influence performance.
211+
* Reduce this for memory efficiency at the cost of performance. Default: `null`
212+
* - `scan_ndjson_infer_schema_length`: Number of rows to use for schema inference. Default:
213+
* `100`.
214+
*
215+
* @param path
216+
* Main input file location.
217+
* @param paths
218+
* Additional input file locations.
219+
* @note
220+
* All provided paths must belong to the same object store.
221+
*/
222+
@varargs
223+
def jsonLines(path: String, paths: String*): LazyFrame =
224+
LazyFrame.withPtr(
225+
scanJsonLines(
226+
paths = paths.+:(path).toArray[String],
227+
options = jsonMapper.writeValueAsString(_options)
228+
)
229+
)
230+
}

core/src/main/scala/org/polars/scala/polars/api/io/Writeable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class Writeable private[polars] (ptr: Long) {
206206
* @param filePath
207207
* output file location
208208
*/
209-
def json_lines(filePath: String): Unit = {
209+
def jsonLines(filePath: String): Unit = {
210210
option("write_json_format", "json_lines")
211211
writeJson(
212212
ptr = ptr,

core/src/main/scala/org/polars/scala/polars/api/io/builders/CSVInputBuilder.java

Lines changed: 0 additions & 78 deletions
This file was deleted.

core/src/main/scala/org/polars/scala/polars/api/io/builders/IPCInputBuilder.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)