Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ val postgresDriverVersion = "42.7.4"

lazy val root = project
.in(file("."))
.aggregate(magnum, magnumPg, magnumZio)
.aggregate(magnum, magnumPg, magnumZio, magnumKyo)

lazy val magnum = project
.in(file("magnum"))
Expand Down Expand Up @@ -99,3 +99,20 @@ lazy val magnumZio = project
"org.postgresql" % "postgresql" % postgresDriverVersion % Test
)
)

lazy val magnumKyo = project
.in(file("magnum-kyo"))
.dependsOn(magnum)
.settings(
Test / fork := true,
publish / skip := false,
scalaVersion := "3.7.0",
libraryDependencies ++= Seq(
"io.getkyo" %% "kyo-core" % "0.19.0" % Provided,
"io.getkyo" %% "kyo-combinators" % "0.19.0" % Provided,
"org.scalameta" %% "munit" % munitVersion % Test,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the precedence in this package, but you could use kyo-zio-test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back for this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to port this to kyo-zio-test but would need getkyo/kyo#1246 to do this to support sharing of layers and first-class support for Kyo layers

"com.dimafeng" %% "testcontainers-scala-munit" % testcontainersVersion % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testcontainersVersion % Test,
"org.postgresql" % "postgresql" % postgresDriverVersion % Test
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.augustnagro.magnum.magkyo

import com.augustnagro.magnum.*
import kyo.*
import scala.util.Using
import java.sql.PreparedStatement
import java.sql.ResultSet

extension [Out](q: Query[Out]) {
def stream(
fetchSize: Int = 512
)(using con: DbCon, tagOut: Tag[Out]): Stream[Out, Resource & IO] =
def toEmit(
it: ResultSetIterator[Out],
size: Int
): Unit < (Emit[Chunk[Out]] & IO) =
IO {
val builder = Chunk.newBuilder[Out]
var curSize = 0
while it.hasNext && curSize < size do
builder += it.next()
curSize += 1

Emit
.value(builder.result())
.andThen(
if it.hasNext then toEmit(it, size)
else ()
)
}

val preparedStatement: PreparedStatement < (Resource & IO) = Resource
.acquireRelease(
IO(con.connection.prepareStatement(q.frag.sqlString))
)(con => IO(con.close()))

val resultSetEmit: Unit < (Emit[Chunk[Out]] & Resource & IO) =
preparedStatement
.map { ps =>
val resultSet: ResultSet < (Resource & IO) = Resource
.acquireRelease {
IO {
ps.setFetchSize(fetchSize)
q.frag.writer.write(ps, 1)
ps.executeQuery()
}
}(rs => IO(rs.close()))

resultSet.map(rs =>
toEmit(
ResultSetIterator(rs, q.frag, q.reader, con.sqlLogger),
fetchSize
)
)
}

Stream(resultSetEmit)
end stream
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package com.augustnagro.magnum.magkyo

import com.augustnagro.magnum.{DbCon, DbTx, SqlException, SqlLogger}
import kyo.*

import java.sql.Connection
import javax.sql.DataSource
import scala.util.control.NonFatal

final class TransactorKyo private (
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit,
semaphore: Maybe[Meter]
):
def withSqlLogger(sqlLogger: SqlLogger): TransactorKyo =
new TransactorKyo(
dataSource,
sqlLogger,
connectionConfig,
semaphore
)

def withConnectionConfig(
connectionConfig: Connection => Unit
): TransactorKyo =
new TransactorKyo(
dataSource,
sqlLogger,
connectionConfig,
semaphore
)

def connect[A, S](f: DbCon ?=> (A < S))(using
Frame
): A < (Abort[Throwable] & Async & S) =
val effect: A < (Abort[SqlException] & IO & S) =
acquireReleaseWith(acquireConnection)(releaseConnection) { lo =>
IO[A, S] {
connectionConfig(lo)
f(using DbCon(lo, sqlLogger))
}
}
semaphore.fold(effect)(_.run(effect))

def transact[A](f: DbTx ?=> A): A < (Abort[Throwable] & Async) =
val effect =
acquireReleaseWith(acquireConnection)(releaseConnection) { lo =>
Async.mask[Throwable, A, Any] {
connectionConfig(lo)
lo.setAutoCommit(false)
Abort
.catching[Throwable](
f(using DbTx(lo, sqlLogger))
)
.foldAbort(
out =>
IO {
lo.commit()
out
},
error =>
IO {
lo.rollback()
Abort.fail(error)
}
)
}
}
semaphore.fold(effect)(_.run(effect))
end transact

private val acquireConnection: Connection < (IO & Abort[SqlException]) =
IO(
Abort
.catching[Throwable](dataSource.getConnection())
.mapAbort[SqlException, Any](t =>
SqlException("Unable to acquire DB Connection", t)
)
)

private def releaseConnection(con: Connection) =
if con eq null then () else IO(con.close())
end TransactorKyo

object TransactorKyo:
private val noOpConnectionConfig: Connection => Unit = _ => ()

/** Construct a TransactorKyo
*
* @param sqlLogger
* Logging configuration
* @param connectionConfig
* Customize the underlying JDBC Connections
* @param maxBlockingThreads
* Number of threads in your connection pool. This helps the library be
* more memory efficient by limiting the number of blocking pool threads
* used.
*/
def make(
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit,
maxBlockingThreads: Maybe[Int]
): TransactorKyo < IO =
val sem = maxBlockingThreads match
case Present(max) => Meter.initSemaphore(max).map(Maybe(_))
case Absent => Absent: Maybe[Meter] < IO

sem.map(
new TransactorKyo(
dataSource,
sqlLogger,
connectionConfig,
_
)
)

/** Construct a TransactorKyo Layer
*
* @param sqlLogger
* Logging configuration
* @param connectionConfig
* Customize the underlying JDBC Connections
* @param maxBlockingThreads
* Number of threads in your connection pool. This helps the library be
* more memory efficient by limiting the number of blocking pool threads
* used.
*/
def layer(
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit,
maxBlockingThreads: Maybe[Int]
): Layer[TransactorKyo, Env[DataSource] & IO] = Layer(
Env.use[DataSource](
make(_, sqlLogger, connectionConfig, maxBlockingThreads)
)
)

/** Construct a TransactorKyo
*
* @param sqlLogger
* Logging configuration
* @param connectionConfig
* Customize the underlying JDBC Connections
*/
def layer(
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit
): Layer[TransactorKyo, Env[DataSource] & IO] =
layer(
sqlLogger = sqlLogger,
connectionConfig = connectionConfig,
maxBlockingThreads = Maybe.empty
)

/** Construct a TransactorKyo
*
* @param dataSource
* Datasource to be used
* @param sqlLogger
* Logging configuration
*/
def layer(sqlLogger: SqlLogger): Layer[TransactorKyo, Env[DataSource] & IO] =
layer(
sqlLogger = sqlLogger,
connectionConfig = noOpConnectionConfig,
maxBlockingThreads = Maybe.empty
)

/** Construct a TransactorKyo */
val layer: Layer[TransactorKyo, Env[DataSource] & IO] =
layer(
sqlLogger = SqlLogger.Default,
connectionConfig = noOpConnectionConfig,
maxBlockingThreads = Maybe.empty
)

/** Construct a TransactorZIO
*
* @param connectionConfig
* Customize the underlying JDBC Connections
*/
def layer(
connectionConfig: Connection => Unit
): Layer[TransactorKyo, Env[DataSource] & IO] =
layer(
sqlLogger = SqlLogger.Default,
connectionConfig = connectionConfig,
maxBlockingThreads = Maybe.empty
)

/** @param maxBlockingThreads
* Number of threads in your connection pool. This helps magzio be more
* memory efficient by limiting the number of blocking pool threads used.
* Not needed if using the ZIO virtual-thread based blocking executor
*/
def layer(
maxBlockingThreads: Int
): Layer[TransactorKyo, Env[DataSource] & IO] =
layer(
sqlLogger = SqlLogger.Default,
connectionConfig = noOpConnectionConfig,
maxBlockingThreads = Maybe(maxBlockingThreads)
)

end TransactorKyo
11 changes: 11 additions & 0 deletions magnum-kyo/src/main/scala/com/augustnagro/magnum/magkyo/util.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.augustnagro.magnum.magkyo

import kyo.*

// TODO: Replace with https://github.com/getkyo/kyo/issues/1220 when resolved
inline def acquireReleaseWith[A, S1](
acquire: => A < (S1 & IO)
)(release: A => Any < IO)[B, S2](
use: A => B < S2
)(using Frame): B < (IO & S1 & S2) =
IO(acquire).map(a => IO.ensure(release(a))(use(a)))
10 changes: 10 additions & 0 deletions magnum-kyo/src/test/resources/pg/big-dec.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
drop table if exists big_dec cascade;

create table big_dec (
id int primary key,
my_big_dec numeric
);

insert into big_dec values
(1, 123),
(2, null);
15 changes: 15 additions & 0 deletions magnum-kyo/src/test/resources/pg/car.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS car;

CREATE TABLE car (
model VARCHAR(50) NOT NULL,
id bigint PRIMARY KEY,
top_speed INT NOT NULL,
vin INT,
color TEXT NOT NULL CHECK (color IN ('Red', 'Green', 'Blue')),
created TIMESTAMP WITH TIME ZONE NOT NULL
);

INSERT INTO car (model, id, top_speed, vin, color, created) VALUES
('McLaren Senna', 1, 208, 123, 'Red', '2024-11-24T22:17:30.000000000Z'::timestamptz),
('Ferrari F8 Tributo', 2, 212, 124, 'Green', '2024-11-24T22:17:31.000000000Z'::timestamptz),
('Aston Martin Superleggera', 3, 211, null, 'Blue', '2024-11-24T22:17:32.000000000Z'::timestamptz);
11 changes: 11 additions & 0 deletions magnum-kyo/src/test/resources/pg/my-user.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
drop table if exists my_user cascade;

create table my_user (
first_name text not null,
id bigint primary key generated always as identity
);

insert into my_user (first_name) values
('George'),
('Alexander'),
('John');
12 changes: 12 additions & 0 deletions magnum-kyo/src/test/resources/pg/no-id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
drop table if exists no_id;

create table no_id (
created_at timestamptz not null default now(),
user_name text not null,
user_action text not null
);

insert into no_id values
(timestamp '1997-08-15', 'Josh', 'clicked a button'),
(timestamp '1997-08-16', 'Danny', 'opened a toaster'),
(timestamp '1997-08-17', 'Greg', 'ran some QA tests');
20 changes: 20 additions & 0 deletions magnum-kyo/src/test/resources/pg/person.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
drop table if exists person cascade;

create table person (
id bigint primary key,
first_name varchar(50),
last_name varchar(50) not null,
is_admin boolean not null,
created timestamptz not null,
social_id UUID
);

insert into person (id, first_name, last_name, is_admin, created, social_id) values
(1, 'George', 'Washington', true, now(), 'd06443a6-3efb-46c4-a66a-a80a8a9a5388'),
(2, 'Alexander', 'Hamilton', true, now(), '529b6c6d-7228-4da5-81d7-13b706f78ddb'),
(3, 'John', 'Adams', true, now(), null),
(4, 'Benjamin', 'Franklin', true, now(), null),
(5, 'John', 'Jay', true, now(), null),
(6, 'Thomas', 'Jefferson', true, now(), null),
(7, 'James', 'Madison', true, now(), null),
(8, null, 'Nagro', false, timestamp '1997-08-12', null);
Loading