Skip to content

Commit 905ead4

Browse files
committed
support for sqlite
1 parent ba15553 commit 905ead4

File tree

13 files changed

+1243
-11
lines changed

13 files changed

+1243
-11
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ target
1919
.metadata
2020
.settings/
2121
.vscode
22+
.claude

tessellate-main/build.gradle.kts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023-2025 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
2+
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
33
*
44
* This Source Code Form is subject to the terms of the Mozilla Public
55
* License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -17,6 +17,7 @@ plugins {
1717
application
1818
`java-test-fixtures`
1919
id("org.jreleaser") version "1.16.0"
20+
idea
2021
}
2122

2223
val versionProperties = Properties().apply {
@@ -117,6 +118,8 @@ dependencies {
117118

118119
implementation("com.github.f4b6a3:tsid-creator:5.2.6")
119120

121+
implementation("org.xerial:sqlite-jdbc:3.46.0.0")
122+
120123
testImplementation("net.wensel:cascading-core:$cascading:tests")
121124

122125
// https://github.com/hosuaby/inject-resources
@@ -206,7 +209,7 @@ configurations["integrationTestImplementation"].extendsFrom(configurations.testI
206209

207210
tasks.named<ProcessResources>("processResources") {
208211
doFirst {
209-
file("${buildDir}/resources/main/version.properties")
212+
file("${layout.buildDirectory.get().asFile}/resources/main/version.properties")
210213
.writeText("release.full=${version}")
211214
}
212215
}
@@ -274,7 +277,7 @@ jreleaser {
274277
name.set("tess")
275278
}
276279
artifact {
277-
path.set(file("build/distributions/{{distributionName}}-{{projectVersion}}.zip"))
280+
path.set(file("${layout.buildDirectory.get().asFile}/distributions/{{distributionName}}-{{projectVersion}}.zip"))
278281
}
279282
}
280283
}
@@ -324,7 +327,7 @@ jreleaser {
324327

325328
tasks.register("createPath") {
326329
doLast {
327-
file("build/jreleaser").mkdirs()
330+
file("${layout.buildDirectory.get().asFile}/jreleaser").mkdirs()
328331
}
329332
}
330333

@@ -335,3 +338,11 @@ tasks.register("release") {
335338
dependsOn("jreleaserPackage")
336339
dependsOn("jreleaserPublish")
337340
}
341+
342+
idea {
343+
module {
344+
// Configure IntelliJ IDEA to use the same output directory as Gradle
345+
outputDir = layout.buildDirectory.get().asFile.resolve("classes/java/main")
346+
testOutputDir = layout.buildDirectory.get().asFile.resolve("classes/java/test")
347+
}
348+
}

tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023-2025 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
2+
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
33
*
44
* This Source Code Form is subject to the terms of the Mozilla Public
55
* License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -12,6 +12,7 @@
1212
import io.clusterless.tessellate.factory.hdfs.JSONFSFactory;
1313
import io.clusterless.tessellate.factory.hdfs.ParquetFactory;
1414
import io.clusterless.tessellate.factory.hdfs.TextFSFactory;
15+
import io.clusterless.tessellate.factory.jdbc.SQLiteFactory;
1516
import io.clusterless.tessellate.factory.local.LocalDirectoryFactory;
1617
import io.clusterless.tessellate.factory.local.StdOutFactory;
1718
import io.clusterless.tessellate.model.Sink;
@@ -37,7 +38,8 @@ public class TapFactories {
3738
LocalDirectoryFactory.INSTANCE,
3839
ParquetFactory.INSTANCE,
3940
JSONFSFactory.INSTANCE,
40-
TextFSFactory.INSTANCE
41+
TextFSFactory.INSTANCE,
42+
SQLiteFactory.INSTANCE
4143
));
4244
private static final LinkedListMultimap<Protocol, SourceFactory> sourceFactories = LinkedListMultimap.create();
4345
private static final LinkedListMultimap<Protocol, SinkFactory> sinkFactories = LinkedListMultimap.create();
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.factory.jdbc;
10+
11+
import java.util.Properties;
12+
13+
/**
14+
* Configuration constants and utilities for SQLite operations
15+
*/
16+
public class SQLiteConfig {
17+
18+
// Configuration property keys
19+
public static final String SQLITE_BATCH_SIZE = "cascading.sqlite.batch.size";
20+
public static final String SQLITE_WAL_MODE = "cascading.sqlite.wal.enabled";
21+
public static final String SQLITE_SYNC_MODE = "cascading.sqlite.synchronous.mode";
22+
public static final String SQLITE_CACHE_SIZE = "cascading.sqlite.cache.size";
23+
public static final String SQLITE_TEMP_STORE = "cascading.sqlite.temp.store";
24+
public static final String SQLITE_TRACE_ENABLED = "cascading.sqlite.trace.enabled";
25+
public static final String SQLITE_AUTO_COMMIT = "cascading.sqlite.auto.commit";
26+
27+
// Default values
28+
public static final int DEFAULT_BATCH_SIZE = 1000;
29+
public static final boolean DEFAULT_WAL_MODE = true;
30+
public static final String DEFAULT_SYNC_MODE = "NORMAL";
31+
public static final int DEFAULT_CACHE_SIZE = 10000;
32+
public static final String DEFAULT_TEMP_STORE = "memory";
33+
public static final boolean DEFAULT_TRACE_ENABLED = false;
34+
public static final boolean DEFAULT_AUTO_COMMIT = false;
35+
36+
/**
37+
* Get batch size from configuration
38+
*/
39+
public static int getBatchSize(Properties conf) {
40+
return Integer.parseInt(conf.getProperty(SQLITE_BATCH_SIZE, String.valueOf(DEFAULT_BATCH_SIZE)));
41+
}
42+
43+
/**
44+
* Check if WAL mode is enabled
45+
*/
46+
public static boolean isWalModeEnabled(Properties conf) {
47+
return Boolean.parseBoolean(conf.getProperty(SQLITE_WAL_MODE, String.valueOf(DEFAULT_WAL_MODE)));
48+
}
49+
50+
/**
51+
* Get synchronous mode setting
52+
*/
53+
public static String getSynchronousMode(Properties conf) {
54+
return conf.getProperty(SQLITE_SYNC_MODE, DEFAULT_SYNC_MODE);
55+
}
56+
57+
/**
58+
* Get cache size setting
59+
*/
60+
public static int getCacheSize(Properties conf) {
61+
return Integer.parseInt(conf.getProperty(SQLITE_CACHE_SIZE, String.valueOf(DEFAULT_CACHE_SIZE)));
62+
}
63+
64+
/**
65+
* Get temp store setting
66+
*/
67+
public static String getTempStore(Properties conf) {
68+
return conf.getProperty(SQLITE_TEMP_STORE, DEFAULT_TEMP_STORE);
69+
}
70+
71+
/**
72+
* Check if tracing is enabled
73+
*/
74+
public static boolean isTraceEnabled(Properties conf) {
75+
return Boolean.parseBoolean(conf.getProperty(SQLITE_TRACE_ENABLED, String.valueOf(DEFAULT_TRACE_ENABLED)));
76+
}
77+
78+
/**
79+
* Check if auto commit is enabled
80+
*/
81+
public static boolean isAutoCommitEnabled(Properties conf) {
82+
return Boolean.parseBoolean(conf.getProperty(SQLITE_AUTO_COMMIT, String.valueOf(DEFAULT_AUTO_COMMIT)));
83+
}
84+
85+
/**
86+
* Set default SQLite configuration properties
87+
*/
88+
public static void setDefaults(Properties conf) {
89+
conf.setProperty(SQLITE_BATCH_SIZE, String.valueOf(DEFAULT_BATCH_SIZE));
90+
conf.setProperty(SQLITE_WAL_MODE, String.valueOf(DEFAULT_WAL_MODE));
91+
conf.setProperty(SQLITE_SYNC_MODE, DEFAULT_SYNC_MODE);
92+
conf.setProperty(SQLITE_CACHE_SIZE, String.valueOf(DEFAULT_CACHE_SIZE));
93+
conf.setProperty(SQLITE_TEMP_STORE, DEFAULT_TEMP_STORE);
94+
conf.setProperty(SQLITE_TRACE_ENABLED, String.valueOf(DEFAULT_TRACE_ENABLED));
95+
conf.setProperty(SQLITE_AUTO_COMMIT, String.valueOf(DEFAULT_AUTO_COMMIT));
96+
}
97+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.factory.jdbc;
10+
11+
import cascading.tap.Tap;
12+
import cascading.tuple.Fields;
13+
import io.clusterless.tessellate.factory.SinkFactory;
14+
import io.clusterless.tessellate.factory.TapFactory;
15+
import io.clusterless.tessellate.model.Sink;
16+
import io.clusterless.tessellate.options.PipelineOptions;
17+
import io.clusterless.tessellate.util.Compression;
18+
import io.clusterless.tessellate.util.Format;
19+
import io.clusterless.tessellate.util.Protocol;
20+
21+
import java.io.IOException;
22+
import java.util.Properties;
23+
import java.util.Set;
24+
25+
public class SQLiteFactory implements SinkFactory {
26+
public static TapFactory INSTANCE = new SQLiteFactory();
27+
28+
@Override
29+
public Set<Protocol> getSinkProtocols() {
30+
return Set.of(Protocol.sqlite);
31+
}
32+
33+
@Override
34+
public Set<Format> getFormats() {
35+
return Set.of(Format.sql);
36+
}
37+
38+
@Override
39+
public Set<Compression> getCompressions() {
40+
return Set.of(Compression.none);
41+
}
42+
43+
@Override
44+
public Tap<Properties, ?, ?> getSink(PipelineOptions pipelineOptions, Sink sinkModel, Fields currentFields) throws IOException {
45+
SQLiteScheme scheme = new SQLiteScheme(currentFields, sinkModel);
46+
return new SQLiteTap(scheme, sinkModel);
47+
}
48+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.factory.jdbc;
10+
11+
import cascading.flow.FlowProcess;
12+
import cascading.scheme.Scheme;
13+
import cascading.scheme.SinkCall;
14+
import cascading.scheme.SourceCall;
15+
import cascading.tap.Tap;
16+
import cascading.tuple.Fields;
17+
import io.clusterless.tessellate.model.Sink;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.IOException;
22+
import java.util.Properties;
23+
24+
/**
25+
* A Cascading Scheme for SQLite database operations.
26+
* This scheme is designed for sink-only operations and delegates actual
27+
* data writing to SQLiteTupleEntryCollector for better resource management.
28+
*/
29+
public class SQLiteScheme extends Scheme<Properties, Void, Void, Void, Void> {
30+
private static final Logger LOG = LoggerFactory.getLogger(SQLiteScheme.class);
31+
32+
private final Fields fields;
33+
private final Sink sinkModel;
34+
35+
public SQLiteScheme(Fields fields, Sink sinkModel) {
36+
super(fields, fields);
37+
this.fields = fields;
38+
this.sinkModel = sinkModel;
39+
}
40+
41+
@Override
42+
public void sourceConfInit(FlowProcess<? extends Properties> flowProcess,
43+
Tap<Properties, Void, Void> tap, Properties conf) {
44+
throw new UnsupportedOperationException("SQLite scheme only supports sink operations");
45+
}
46+
47+
@Override
48+
public void sinkConfInit(FlowProcess<? extends Properties> flowProcess,
49+
Tap<Properties, Void, Void> tap, Properties conf) {
50+
if (LOG.isDebugEnabled()) {
51+
LOG.debug("Initializing SQLite sink configuration for tap: {}", tap.getIdentifier());
52+
}
53+
54+
// Set default SQLite configuration
55+
SQLiteConfig.setDefaults(conf);
56+
57+
// Configure SQLite-specific properties
58+
SQLiteTap sqliteTap = (SQLiteTap) tap;
59+
conf.setProperty("sqlite.database.path", sqliteTap.getDatabasePath());
60+
conf.setProperty("sqlite.table.name", sqliteTap.getTableName());
61+
62+
if (SQLiteConfig.isTraceEnabled(conf)) {
63+
LOG.info("SQLite trace enabled for database: {} table: {}",
64+
sqliteTap.getDatabasePath(), sqliteTap.getTableName());
65+
}
66+
}
67+
68+
@Override
69+
public void sinkPrepare(FlowProcess<? extends Properties> flowProcess,
70+
SinkCall<Void, Void> sinkCall) throws IOException {
71+
if (LOG.isDebugEnabled()) {
72+
LOG.debug("Preparing SQLite sink for processing");
73+
}
74+
75+
SQLiteTap sqliteTap = (SQLiteTap) sinkCall.getTap();
76+
try {
77+
// Ensure database and table are ready
78+
sqliteTap.createResource(flowProcess.getConfigCopy());
79+
80+
if (SQLiteConfig.isTraceEnabled(flowProcess.getConfigCopy())) {
81+
LOG.info("SQLite sink prepared successfully for table: {}", sqliteTap.getTableName());
82+
}
83+
} catch (Exception e) {
84+
throw new IOException("Failed to prepare SQLite sink", e);
85+
}
86+
}
87+
88+
@Override
89+
public void sinkCleanup(FlowProcess<? extends Properties> flowProcess,
90+
SinkCall<Void, Void> sinkCall) throws IOException {
91+
if (LOG.isDebugEnabled()) {
92+
LOG.debug("Cleaning up SQLite sink resources");
93+
}
94+
95+
SQLiteTap sqliteTap = (SQLiteTap) sinkCall.getTap();
96+
try {
97+
// Commit any pending transactions and close connections
98+
sqliteTap.commitTransaction();
99+
100+
if (SQLiteConfig.isTraceEnabled(flowProcess.getConfigCopy())) {
101+
LOG.info("SQLite sink cleanup completed for table: {}", sqliteTap.getTableName());
102+
}
103+
} catch (Exception e) {
104+
LOG.warn("Error during SQLite sink cleanup", e);
105+
// Don't throw exception during cleanup to avoid masking original errors
106+
}
107+
}
108+
109+
@Override
110+
public boolean source(FlowProcess<? extends Properties> flowProcess,
111+
SourceCall<Void, Void> sourceCall) throws IOException {
112+
throw new UnsupportedOperationException("SQLite scheme only supports sink operations");
113+
}
114+
115+
@Override
116+
public void sink(FlowProcess<? extends Properties> flowProcess,
117+
SinkCall<Void, Void> sinkCall) throws IOException {
118+
// For SQLite, the actual writing is handled by SQLiteTupleEntryCollector
119+
// This method is called by Cascading framework but we delegate to the collector
120+
throw new UnsupportedOperationException(
121+
"SQLite sink operations are handled by SQLiteTupleEntryCollector. " +
122+
"This method should not be called in normal operation."
123+
);
124+
}
125+
126+
/**
127+
* Get the fields associated with this scheme
128+
*/
129+
public Fields getFields() {
130+
return fields;
131+
}
132+
133+
/**
134+
* Get the sink model associated with this scheme
135+
*/
136+
public Sink getSinkModel() {
137+
return sinkModel;
138+
}
139+
}

0 commit comments

Comments
 (0)