Skip to content

Adds Kyo support to Magnum#119

Open
calvinlfer wants to merge 6 commits intoAugustNagro:masterfrom
kaizen-solutions:master
Open

Adds Kyo support to Magnum#119
calvinlfer wants to merge 6 commits intoAugustNagro:masterfrom
kaizen-solutions:master

Conversation

@calvinlfer
Copy link

@calvinlfer calvinlfer commented May 26, 2025

Adds a new module, magnum-kyo, which provides Kyo-based transactor implementation for database interactions (closely mirrors ZIO's module). This allows users to leverage Kyo's algebraic effects for managing database connections and transactions.

Special thanks to @fwbrasil, @hearnadam, @ahoy-jon for their help

Adds a new module, magnum-kyo, which provides Kyo-based
transactor implementation for database interactions.

This allows users to leverage Kyo's algebraic effects for managing
database connections and transactions.

Special thanks to @fwbrasil, @hearnadam, @ahoy-jon
Copy link

@ahoy-jon ahoy-jon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall, it looks good to me, from what I can check.

@calvinlfer
Copy link
Author

calvinlfer commented May 26, 2025

We can expose a streaming operation as well on Query but access to the DbCodec would need to be package private val on com.augustnagro.magnum (this this is just missing a val like frag has).

Here's an implementation:

package com.augustnagro.magnum.magkyo

import com.augustnagro.magnum.*
import kyo.*
import scala.util.Using
import java.sql.PreparedStatement
import java.sql.ResultSet

extension [Out](q: Query[Out]) {
  def stream(
      fetchSize: Int = 512
  )(using con: DbCon, tagOut: Tag[Out]): Stream[Out, Resource & IO] =
    def toEmit(
        it: ResultSetIterator[Out],
        size: Int
    ): Unit < (Emit[Chunk[Out]] & IO) =
      IO {
        val builder = Chunk.newBuilder[Out]
        var curSize = 0
        while it.hasNext && curSize < size do
          builder += it.next()
          curSize += 1

        Emit
          .value(builder.result())
          .andThen(
            if it.hasNext then toEmit(it, size)
            else ()
          )
      }

    val reader: DbCodec[Out] = q.reader // added a val to reader in Query to make this compile

    val preparedStatement: PreparedStatement < (Resource & IO) = Resource
      .acquireRelease(
        IO(con.connection.prepareStatement(q.frag.sqlString))
      )(con => IO(con.close()))

    val resultSetEmit: Unit < (Emit[Chunk[Out]] & Resource & IO) =
      preparedStatement
        .map { ps =>
          val resultSet: ResultSet < (Resource & IO) = Resource
            .acquireRelease {
              IO {
                ps.setFetchSize(fetchSize)
                q.frag.writer.write(ps, 1)
                ps.executeQuery()
              }
            }(rs => IO(rs.close()))

          resultSet.map(rs =>
            toEmit(
              ResultSetIterator(rs, q.frag, reader, con.sqlLogger),
              fetchSize
            )
          )
        }

    Stream(resultSetEmit)
  end stream
}

i also noticed I needed to be able to model effectful computations inside connect in order for this to have a better API when testing especially when doing streaming computations:

  // note: db computations can now execute effects needed to represent streaming properly  
  def connect[A, S](f: DbCon ?=> (A < S))(using 
      Frame
  ): A < (Abort[Throwable] & Async & S) =
    val res: A < (Resource & Abort[SqlException] & IO & S) =
      Resource
        .acquireRelease(acquireConnection)(releaseConnection)
        .map { lo =>
          IO[A, S] {
            connectionConfig(lo)
            f(using DbCon(lo, sqlLogger))
          }
        }
    val effect: A < (Async & Abort[SqlException] & S) = Resource.run(res)
    semaphore.fold(effect)(_.run(effect))

@calvinlfer
Copy link
Author

calvinlfer commented May 26, 2025

I have another branch which incorporates the changes for streaming and more idiomatic Kyo testing:
https://github.com/kaizen-solutions/magnum/tree/feat/streaming-and-better-tests

Update: changes are in this PR

libraryDependencies ++= Seq(
"io.getkyo" %% "kyo-core" % "0.19.0" % Provided,
"io.getkyo" %% "kyo-combinators" % "0.19.0" % Provided,
"org.scalameta" %% "munit" % munitVersion % Test,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the precedence in this package, but you could use kyo-zio-test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back for this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to port this to kyo-zio-test but would need getkyo/kyo#1246 to do this to support sharing of layers and first-class support for Kyo layers

- Implements streaming support for `Query` using Kyo effects
- make it easier to write safer tests
Copy link

@hearnadam hearnadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, though I don't know the Magnum constructs well.

We can probably eventually replace toEmit with Stream.fromIterator, but that's minor.

@calvinlfer
Copy link
Author

Good to go @AugustNagro! Thank you again @hearnadam for all the feedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants