-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
enhancementNew feature or requestNew feature or requestwork in progressWork on this item is not yet finished (mainly intended for PRs)Work on this item is not yet finished (mainly intended for PRs)
Description
Background
abstract class BaseF[I, R, F[_]: Async](functionNameOverride: Option[String])(implicit val schema: DBSchema) {
def toFragmentsSeq: I => Seq[Fragment]
val functionName: String = {
val fn = functionNameOverride.getOrElse(schema.objectNameFromClassName(getClass))
if (schema.schemaName.isEmpty) {
fn
} else {
s"${schema.schemaName}.$fn"
}
}
protected val alias = "FNC"
def fieldsToSelect: Seq[String] = Seq.empty
protected def selectEntry: String = {
val fieldsSeq = fieldsToSelect
if (fieldsSeq.isEmpty) {
"*"
} else {
val aliasToUse = if (alias.isEmpty) {
""
} else {
s"$alias."
}
fieldsToSelect.map(aliasToUse + _).mkString(",")
}
}
protected final def composeFragments(fragments: Seq[Fragment]): Fragment = {
val args = fragments.toList match {
case head :: tail => tail.foldLeft(head)((acc, frag) => acc ++ fr"," ++ frag)
case Nil => fr""
}
sql"SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}($args) ${Fragment.const(alias)};"
}
protected def executeQuery[T](input: I, query: Fragment => F[T]): F[T] = {
MonadError[F, Throwable]
.catchNonFatal(composeFragments(toFragmentsSeq(input)))
.flatMap(query)
}
}
trait StatusSupport {
def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]]
}
trait StandardStatusSupport extends StatusSupport {
override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]] = {
val functionStatus = statusWithData.functionStatus
functionStatus.statusCode / 10 match {
case 1 => Right(statusWithData)
case 2 => Left(ServerMisconfigurationException(functionStatus))
case 3 => Left(DataConflictException(functionStatus))
case 4 => Left(DataNotFoundException(functionStatus))
case 5 | 6 | 7 | 8 => Left(ErrorInDataException(functionStatus))
case 9 => Left(OtherStatusException(functionStatus))
case _ => Left(StatusOutOfRangeException(functionStatus))
}
}
}
class SingleF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[R] = {
executeQuery(input, fragment => fragment.query[R](read).unique.transact(transactor))
}
}
class SingleFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[R]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).unique))
}
}
abstract class SingleFWithStatusSupport[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[StatusWithData[R]])
extends BaseF[I, R, F](functionNameOverride)
with StatusSupport {
def apply(input: I)(implicit transactor: Transactor[F]): F[Either[StatusException, FunctionStatusWithData[R]]] = {
executeQuery(input, fragment => fragment.query[StatusWithData[R]](read).unique.transact(transactor))
.map(statusWithData =>
FunctionStatusWithData(FunctionStatus(statusWithData.status, statusWithData.statusText), statusWithData.data)
)
.map(functionStatusWithData => checkStatus(functionStatusWithData))
}
}
class MultiF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[Seq[R]] = {
executeQuery(input, fragment => fragment.query[R](read).to[Seq].transact(transactor))
}
}
class MultiFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[Seq[R]]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq]))
}
}
class MultiFStreaming[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I, chunkSize: Int = 512)(implicit transactor: Transactor[F]): fs2.Stream[F, R] = {
fs2.Stream
.eval(MonadError[F, Throwable].catchNonFatal(composeFragments(toFragmentsSeq(input))))
.flatMap(fragment => fragment.query[R](read).streamWithChunkSize(chunkSize).transact(transactor))
}
}
class OptionF[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I)(implicit transactor: Transactor[F]): F[Option[R]] = {
executeQuery(input, fragment => fragment.query[R](read).to[Seq].map(_.headOption).transact(transactor))
}
}
class OptionFConnectionIO[I, R, F[_]: Async](
val toFragmentsSeq: I => Seq[Fragment],
val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
extends BaseF[I, R, F](functionNameOverride) {
def apply(input: I): F[ConnectionIO[Option[R]]] = {
executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq].map(_.headOption)))
}
}
// Usage
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._
implicit object Runs extends DBSchema
case class Actor(name: String)
class GetActor extends SingleF[Int, Actor, IO]((i: Int) => Seq(fr"$i"))
class GetActorWithStatus extends SingleFWithStatusSupport[Int, Actor, IO]((i: Int) => Seq(fr"$i")) with StandardStatusSupport
class GetActorConnectionIO extends SingleFConnectionIO[Int, Actor, IO]((i: Int) => Seq(fr"$i"))
class Repository(getActorF: GetActor, getActorFWithStatus: GetActorWithStatus, getActorC: GetActorConnectionIO)
(implicit transactor: Transactor[IO]) {
def getActor(input: Int): IO[Actor] = getActorF(input)
def getActorWithStatus(input: Int): IO[Either[StatusException, FunctionStatusWithData[Actor]]] = getActorFWithStatus(input)
def groupedExecution(input: Int): IO[Actor] = {
val connectionIO = for {
actorConnectionIO <- getActorC(input)
// here could be another call
} yield actorConnectionIO
connectionIO.flatMap(connectionIO => connectionIO.transact(transactor))
}
}
// If you need to return Future you can always call unsafeToFuture
class Repository(getActorF: GetActor)(implicit transactor: Transactor[IO]) {
def getActor(input: Int): Future[Actor] = getActorF(input).unsafeToFuture
}Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestwork in progressWork on this item is not yet finished (mainly intended for PRs)Work on this item is not yet finished (mainly intended for PRs)
Type
Projects
Status
๐ To groom