Skip to content

Commit 780e122

Browse files
stanchclaude
andcommitted
Migrate from SQLite to PostgreSQL storage
Remove SQLite storage implementation and replace with PostgreSQL-only solution: - Add PostgreSQL storage with JSONB support via doobie-postgres-circe - Remove all SQLite dependencies and code - Add storage configuration file approach with --storage CLI parameter - Update CI/CD with PostgreSQL service container 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 3eea87f commit 780e122

File tree

10 files changed

+237
-207
lines changed

10 files changed

+237
-207
lines changed

.github/workflows/test.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@ jobs:
77

88
runs-on: ubuntu-latest
99

10+
services:
11+
postgres:
12+
image: postgres:15
13+
env:
14+
POSTGRES_PASSWORD: test_password
15+
POSTGRES_USER: test_user
16+
POSTGRES_DB: micro_test
17+
options: >-
18+
--health-cmd pg_isready
19+
--health-interval 10s
20+
--health-timeout 5s
21+
--health-retries 5
22+
ports:
23+
- 5432:5432
24+
1025
steps:
1126
- uses: actions/checkout@v2
1227

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,28 @@ sbt +publishLocal && cd ..
3232

3333
To run the tests:
3434

35+
**Prerequisites**: PostgreSQL tests require a local PostgreSQL instance. Start one with Docker:
36+
37+
```bash
38+
docker run --name micro-postgres-test \
39+
-e POSTGRES_PASSWORD=test_password \
40+
-e POSTGRES_USER=test_user \
41+
-e POSTGRES_DB=micro_test \
42+
-p 5432:5432 \
43+
-d postgres:15
3544
```
45+
46+
Then run the tests:
47+
48+
```bash
3649
sbt test
3750
```
3851

52+
To stop the test database:
53+
```bash
54+
docker stop micro-postgres-test && docker rm micro-postgres-test
55+
```
56+
3957
To build a Docker image for local testing:
4058
```
4159
cd ui

build.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ lazy val dependencies = Seq(
4040
Dependencies.circeGeneric,
4141
Dependencies.doobieCore,
4242
Dependencies.doobieHikari,
43-
Dependencies.sqliteJdbc,
43+
Dependencies.doobiePostgres,
44+
Dependencies.doobiePostgresCirce,
45+
Dependencies.postgresqlJdbc,
4446
Dependencies.specs2,
4547
Dependencies.specs2CE,
4648
Dependencies.badRows,

project/Dependencies.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ object Dependencies {
2525
val decline = "2.4.1"
2626
val slf4j = "2.0.17"
2727
val doobie = "1.0.0-RC10"
28-
val sqlite = "3.50.3.0"
28+
val postgresql = "42.7.8"
2929

3030
// specs2
3131
val specs2 = "4.12.2"
@@ -55,11 +55,13 @@ object Dependencies {
5555
val circeGeneric = "io.circe" %% "circe-generic" % V.circe
5656

5757
// doobie
58-
val doobieCore = "org.tpolecat" %% "doobie-core" % V.doobie
59-
val doobieHikari = "org.tpolecat" %% "doobie-hikari" % V.doobie
58+
val doobieCore = "org.tpolecat" %% "doobie-core" % V.doobie
59+
val doobieHikari = "org.tpolecat" %% "doobie-hikari" % V.doobie
60+
val doobiePostgres = "org.tpolecat" %% "doobie-postgres" % V.doobie
61+
val doobiePostgresCirce = "org.tpolecat" %% "doobie-postgres-circe" % V.doobie
6062

61-
// sqlite
62-
val sqliteJdbc = "org.xerial" % "sqlite-jdbc" % V.sqlite
63+
// postgresql
64+
val postgresqlJdbc = "org.postgresql" % "postgresql" % V.postgresql
6365

6466
// specs2
6567
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test

src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
package com.snowplowanalytics.snowplow.micro
1212

13-
import cats.data.EitherT
13+
import cats.data.{EitherT, ValidatedNel}
1414
import cats.effect.IO
1515
import cats.implicits._
1616
import com.monovore.decline.{Argument, Opts}
@@ -24,10 +24,11 @@ import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig}
2424
import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas}
2525
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
2626
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
27-
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigValueFactory, Config => TypesafeConfig}
27+
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig}
2828
import fs2.io.file.{Files, Path => FS2Path}
2929
import io.circe.config.syntax.CirceConfigOps
3030
import io.circe.generic.semiauto.deriveDecoder
31+
import io.circe.config.syntax._
3132
import io.circe.syntax.EncoderOps
3233
import io.circe.{Decoder, Json, JsonObject}
3334
import org.http4s.Uri
@@ -36,7 +37,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
3637

3738
import java.net.URI
3839
import java.nio.file.{Path, Paths}
39-
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
40+
import scala.concurrent.duration.FiniteDuration
4041

4142
object Configuration {
4243

@@ -47,32 +48,30 @@ object Configuration {
4748
case object Json extends OutputFormat
4849
}
4950

50-
sealed trait StorageConfig
51-
object StorageConfig {
52-
case object None extends StorageConfig
53-
case object InMemory extends StorageConfig
54-
case class Persistent(path: Path, ttl: FiniteDuration, cleanupInterval: FiniteDuration) extends StorageConfig
51+
sealed trait StorageMode
52+
object StorageMode {
53+
case object None extends StorageMode
54+
case object InMemory extends StorageMode
55+
case class Persistent(host: String,
56+
port: Int,
57+
database: String,
58+
user: String,
59+
password: String,
60+
ttl: FiniteDuration,
61+
cleanupInterval: FiniteDuration) extends StorageMode
5562
}
5663

5764
object Cli {
5865
implicit val uriArgument: Argument[Uri] = Argument.from("uri") { str =>
5966
Uri.fromString(str).leftMap(_ => s"Invalid URI: $str").toValidatedNel
6067
}
6168

62-
implicit val durationArgument: Argument[FiniteDuration] = Argument.from("duration") { str =>
63-
Either.catchNonFatal {
64-
val d = ConfigValueFactory.fromAnyRef(str).atKey("d").getDuration("d")
65-
Duration.fromNanos(d.toNanos)
66-
}.leftMap(_ => s"Invalid duration format: $str. Use HOCON format (e.g., 1h, 30m, 7d)")
67-
.toValidatedNel
68-
}
69-
7069
final case class Config(collector: Option[Path],
7170
iglu: Option[Path],
7271
outputFormat: OutputFormat,
7372
destination: Option[Uri],
7473
yauaa: Boolean,
75-
storage: StorageConfig,
74+
storage: StorageMode,
7675
auth: Option[Path])
7776

7877
private val collector = Opts.option[Path]("collector-config", "Configuration file for Collector", "c", "config.hocon").orNone
@@ -81,11 +80,7 @@ object Configuration {
8180
private val outputJson = Opts.flag("output-json", "Output events in JSON format to standard output or HTTP destination (with a separate key for each schema)", "j").orFalse
8281
private val destination = Opts.option[Uri]("destination", "HTTP(s) URL to send output data to (requires --output-json or --output-tsv)", "d").orNone
8382
private val noStorage = Opts.flag("no-storage", "Do not store events anywhere, and disable the API (handy if using Micro purely for output)").orFalse
84-
private val storagePath = Opts.option[Path]("storage-file", "Path to an SQLite database file for persistent storage", "s").orNone
85-
private val storageTtl = Opts.option[FiniteDuration]("storage-ttl", "Time-to-live for events in persistent storage (e.g. 1h, 1d; default: 7d)")
86-
.withDefault(7.days)
87-
private val storageCleanupInterval = Opts.option[FiniteDuration]("storage-cleanup-interval", "Interval for cleanup of expired events in persistent storage (e.g. 1h, 1d; default: 1h)")
88-
.withDefault(1.hour)
83+
private val storage = Opts.option[Path]("storage", "Configuration file for PostgreSQL storage", "s", "storage.hocon").orNone
8984
private val yauaa = Opts.flag("yauaa", "Enable YAUAA user agent enrichment").orFalse
9085
private val auth = Opts.option[Path]("auth", "Configuration file for authentication", "a", "auth.hocon").orNone
9186

@@ -99,22 +94,18 @@ object Configuration {
9994
case (true, true, _) => "Cannot specify both --output-tsv and --output-json".invalidNel[(OutputFormat, Option[Uri])]
10095
}
10196

102-
private val storage = (noStorage, storagePath, storageTtl, storageCleanupInterval)
103-
.mapN { (_, _, _, _) }
97+
private val storageConfig = (noStorage, storage)
98+
.mapN { (_, _) }
10499
.mapValidated {
105-
case (true, _, _, _) =>
106-
StorageConfig.None.validNel[String]
107-
case (_, _, ttl, _) if ttl.toMinutes < 5 =>
108-
"--storage-ttl must be at least 5 minutes (5m)".invalidNel[StorageConfig]
109-
case (_, _, _, cleanupInterval) if cleanupInterval.toMinutes < 1 =>
110-
"--storage-cleanup-interval must be at least 1 minute (1m)".invalidNel[StorageConfig]
111-
case (_, None, _, _) =>
112-
StorageConfig.InMemory.validNel[String]
113-
case (_, Some(path), ttl, cleanupInterval) =>
114-
StorageConfig.Persistent(path, ttl, cleanupInterval).validNel[String]
100+
case (true, _) =>
101+
StorageMode.None.validNel[String]
102+
case (_, None) =>
103+
StorageMode.InMemory.validNel[String]
104+
case (_, Some(path)) =>
105+
parseStorageConfig(path)
115106
}
116107

117-
val config: Opts[Config] = (collector, iglu, output, yauaa, storage, auth).mapN {
108+
val config: Opts[Config] = (collector, iglu, output, yauaa, storageConfig, auth).mapN {
118109
case (c, i, (f, d), y, s, a) => Config(c, i, f, d, y, s, a)
119110
}
120111
}
@@ -126,6 +117,7 @@ object Configuration {
126117
val sslCertificatePassword = "MICRO_SSL_CERT_PASSWORD"
127118
val azureBlobAccount = "MICRO_AZURE_BLOB_ACCOUNT"
128119
val azureBlobSasToken = "MICRO_AZURE_BLOB_SAS_TOKEN"
120+
val postgresqlPassword = "MICRO_POSTGRESQL_PASSWORD"
129121
}
130122

131123
final case class DummySinkConfig()
@@ -139,13 +131,20 @@ object Configuration {
139131
organizationId: String,
140132
clientId: String)
141133

134+
final case class StorageConfig(host: String,
135+
port: Int,
136+
database: String,
137+
user: String,
138+
ttl: FiniteDuration,
139+
cleanupInterval: FiniteDuration)
140+
142141
final case class MicroConfig(collector: CollectorConfig[SinkConfig],
143142
iglu: IgluResources,
144143
enrichmentsConfig: List[EnrichmentConf],
145144
enrichConfig: EnrichConfig,
146145
outputFormat: OutputFormat,
147146
destination: Option[Uri],
148-
storage: StorageConfig,
147+
storage: StorageMode,
149148
auth: Option[AuthConfig])
150149

151150
final case class EnrichValidation(atomicFieldsLimits: AtomicFields)
@@ -159,6 +158,40 @@ object Configuration {
159158

160159
implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO]
161160

161+
private def parseStorageConfig(path: Path): ValidatedNel[String, StorageMode] = {
162+
try {
163+
val config = ConfigFactory.parseFile(path.toFile)
164+
config.as[StorageConfig] match {
165+
case Right(storageConfig) =>
166+
sys.env.get(EnvironmentVariables.postgresqlPassword) match {
167+
case Some(password) =>
168+
if (storageConfig.ttl.toMinutes < 5) {
169+
"Storage TTL must be at least 5 minutes (5m)".invalidNel[StorageMode]
170+
} else if (storageConfig.cleanupInterval.toMinutes < 1) {
171+
"Storage cleanup interval must be at least 1 minute (1m)".invalidNel[StorageMode]
172+
} else {
173+
StorageMode.Persistent(
174+
storageConfig.host,
175+
storageConfig.port,
176+
storageConfig.database,
177+
storageConfig.user,
178+
password,
179+
storageConfig.ttl,
180+
storageConfig.cleanupInterval
181+
).validNel[String]
182+
}
183+
case None =>
184+
s"PostgreSQL password not found in environment variable ${EnvironmentVariables.postgresqlPassword}".invalidNel[StorageMode]
185+
}
186+
case Left(error) =>
187+
s"Failed to parse storage configuration: ${error.show}".invalidNel[StorageMode]
188+
}
189+
} catch {
190+
case e: Exception =>
191+
s"Failed to read storage configuration file: ${e.getMessage}".invalidNel[StorageMode]
192+
}
193+
}
194+
162195
def load(): Opts[EitherT[IO, String, MicroConfig]] = {
163196
Cli.config.map { cliConfig =>
164197
for {
@@ -407,4 +440,6 @@ object Configuration {
407440
}
408441

409442
implicit val authConfigDecoder: Decoder[AuthConfig] = deriveDecoder[AuthConfig]
443+
444+
implicit val storageConfigDecoder: Decoder[StorageConfig] = deriveDecoder[StorageConfig]
410445
}

src/main/scala/com.snowplowanalytics.snowplow.micro/EventStorage.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
package com.snowplowanalytics.snowplow.micro
1212

1313
import cats.effect.{IO, Resource}
14-
import com.snowplowanalytics.snowplow.micro.Configuration.StorageConfig
14+
import cats.implicits._
15+
import com.snowplowanalytics.snowplow.micro.Configuration.StorageMode
1516
import com.snowplowanalytics.snowplow.micro.model.ColumnStatsResponse
1617
import io.circe.Json
18+
import org.typelevel.log4cats.Logger
19+
import org.typelevel.log4cats.slf4j.Slf4jLogger
1720

1821
import scala.collection.mutable
1922

@@ -41,16 +44,21 @@ object NoStorage extends EventStorage {
4144
}
4245

4346
object EventStorage {
44-
def create(config: StorageConfig): Resource[IO, EventStorage] = {
47+
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
48+
49+
def create(config: StorageMode): Resource[IO, EventStorage] = {
4550
config match {
46-
case StorageConfig.None =>
51+
case StorageMode.None =>
52+
Resource.eval(logger.info("Not storing any events")) *>
4753
Resource.pure(NoStorage)
48-
case StorageConfig.Persistent(path, ttl, cleanupInterval) =>
49-
for {
50-
storage <- SqliteStorage.file(path.toString)
54+
case StorageMode.Persistent(host, port, database, user, password, ttl, cleanupInterval) =>
55+
Resource.eval(logger.info(s"Storing events in PostgreSQL at $host:$port/$database")) *>
56+
(for {
57+
storage <- PostgresqlStorage.create(host, port, database, user, password)
5158
_ <- storage.scheduleCleanup(ttl, cleanupInterval)
52-
} yield storage
53-
case StorageConfig.InMemory =>
59+
} yield storage)
60+
case StorageMode.InMemory =>
61+
Resource.eval(logger.info("Storing events in memory")) *>
5462
Resource.pure(new InMemoryStorage())
5563
}
5664
}

src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ object Main
2020
header =
2121
s"""Notes:
2222
| - By default, events are stored in memory. This mode supports all API endpoints.
23-
| - With --storage-file (persistent storage), /micro/all, /micro/good and /micro/bad endpoints are not supported.
24-
| - With --no-storage (no in-memory or on-disk storage), all /micro/* endpoints are disabled.""".stripMargin,
23+
| - With --storage (PostgreSQL storage), /micro/all, /micro/good and /micro/bad endpoints are not supported.
24+
| - With --no-storage (no in-memory or database storage), all /micro/* endpoints are disabled.""".stripMargin,
2525
version = BuildInfo.version
2626
) {
2727
override def main: Opts[IO[ExitCode]] = Run.run()

0 commit comments

Comments
 (0)