Skip to content

Commit 89efeb5

Browse files
authored
Merge pull request #110 from massimosiani/doobie-rc4
Doobie rc4
2 parents b40ced6 + 48cba1e commit 89efeb5

File tree

7 files changed

+456
-14
lines changed

7 files changed

+456
-14
lines changed

build.sbt

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import microsites.MicrositesPlugin.autoImport.micrositeDescription
22

3-
val scala213Version = "2.13.10"
3+
val scala213Version = "2.13.12"
44
val scala3Version = "3.3.0"
55

66
val scalaVersions = Seq(scala213Version, scala3Version)
@@ -68,8 +68,9 @@ val http4sMilestoneVersion = "1.0.0-M40"
6868
val http4sStableVersion = "0.23.23"
6969
val circeVersion = "0.14.3"
7070
val slf4jVersion = "1.7.36"
71-
val fs2Version = "3.8.0"
72-
val doobieVersion = "1.0.0-RC2"
71+
val fs2Version = "3.9.1"
72+
val doobieVersion = "1.0.0-RC4"
73+
val doobieLegacyVersion = "1.0.0-RC2"
7374

7475
lazy val natchezDatadog = projectMatrix
7576
.in(file("natchez-extras-datadog"))
@@ -206,6 +207,20 @@ lazy val natchezDoobie = projectMatrix
206207
)
207208
.dependsOn(core)
208209

210+
lazy val natchezDoobieLegacy = projectMatrix
211+
.in(file("natchez-extras-doobie-legacy"))
212+
.jvmPlatform(scalaVersions = scalaVersions)
213+
.enablePlugins(GitVersioning)
214+
.settings(common :+ (name := "natchez-extras-doobie-legacy"))
215+
.settings(
216+
libraryDependencies ++= Seq(
217+
"org.tpolecat" %% "natchez-core" % natchezVersion,
218+
"org.tpolecat" %% "doobie-core" % doobieLegacyVersion,
219+
"org.tpolecat" %% "doobie-h2" % doobieLegacyVersion % Test
220+
)
221+
)
222+
.dependsOn(core)
223+
209224
lazy val core = projectMatrix
210225
.in(file("natchez-extras-core"))
211226
.jvmPlatform(scalaVersions = scalaVersions)
@@ -305,6 +320,7 @@ lazy val root = (project in file("."))
305320
.aggregate(natchezCombine.projectRefs: _*)
306321
.aggregate(natchezSlf4j.projectRefs: _*)
307322
.aggregate(natchezDoobie.projectRefs: _*)
323+
.aggregate(natchezDoobieLegacy.projectRefs: _*)
308324
.aggregate(natchezLog4Cats.projectRefs: _*)
309325
.aggregate(natchezHttp4s.projectRefs: _*)
310326
.aggregate(natchezFs2.projectRefs: _*)

docs/docs/docs/natchez-doobie.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ object NatchezDoobie extends IOApp {
6363
driver = "org.postgresql.Driver",
6464
url = "jdbc:postgresql:example",
6565
user = "postgres",
66-
pass = "password" // of course don't hard code these details in your applications!
66+
password = "password", // of course don't hard code these details in your applications!
67+
logHandler = None,
6768
)
6869
)
6970

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package com.ovoenergy.natchez.extras.doobie
2+
3+
import java.io.{InputStream, Reader}
4+
import java.net.URL
5+
import java.sql.{Array => _, _}
6+
import java.util.Calendar
7+
import scala.annotation.nowarn
8+
9+
/**
10+
* This is an absolutely abominable brute force solution to linking PreparedStatements
11+
* with a SQL string so we can include it in traces but hey I figure it is a one time cost
12+
* Pretend this doesn't exist and you never had to see it
13+
*/
14+
@nowarn
15+
private[doobie] case class TracedStatement(
16+
p: PreparedStatement,
17+
queryString: String
18+
) extends PreparedStatement {
19+
def executeQuery(): ResultSet = p.executeQuery()
20+
21+
def executeUpdate(): Int = p.executeUpdate()
22+
23+
def setNull(parameterIndex: Int, sqlType: Int): Unit = p.setNull(parameterIndex, sqlType)
24+
25+
def setBoolean(parameterIndex: Int, x: Boolean): Unit = p.setBoolean(parameterIndex, x)
26+
27+
def setByte(parameterIndex: Int, x: Byte): Unit = p.setByte(parameterIndex, x)
28+
29+
def setShort(parameterIndex: Int, x: Short): Unit = p.setShort(parameterIndex, x)
30+
31+
def setInt(parameterIndex: Int, x: Int): Unit = p.setInt(parameterIndex, x)
32+
33+
def setLong(parameterIndex: Int, x: Long): Unit = p.setLong(parameterIndex, x)
34+
35+
def setFloat(parameterIndex: Int, x: Float): Unit = p.setFloat(parameterIndex, x)
36+
37+
def setDouble(parameterIndex: Int, x: Double): Unit = p.setDouble(parameterIndex, x)
38+
39+
def setBigDecimal(parameterIndex: Int, x: java.math.BigDecimal): Unit = p.setBigDecimal(parameterIndex, x)
40+
41+
def setString(parameterIndex: Int, x: String): Unit = p.setString(parameterIndex, x)
42+
43+
def setBytes(parameterIndex: Int, x: Array[Byte]): Unit = p.setBytes(parameterIndex, x)
44+
45+
def setDate(parameterIndex: Int, x: Date): Unit = p.setDate(parameterIndex, x)
46+
47+
def setTime(parameterIndex: Int, x: Time): Unit = p.setTime(parameterIndex, x)
48+
49+
def setTimestamp(parameterIndex: Int, x: Timestamp): Unit = p.setTimestamp(parameterIndex, x)
50+
51+
def setAsciiStream(parameterIndex: Int, x: InputStream, length: Int): Unit =
52+
p.setAsciiStream(parameterIndex, x, length)
53+
54+
def setUnicodeStream(parameterIndex: Int, x: InputStream, length: Int): Unit =
55+
p.setUnicodeStream(parameterIndex, x, length)
56+
57+
def setBinaryStream(parameterIndex: Int, x: InputStream, length: Int): Unit =
58+
p.setBinaryStream(parameterIndex, x, length)
59+
60+
def clearParameters(): Unit = p.clearParameters()
61+
62+
def setObject(parameterIndex: Int, x: Any, targetSqlType: Int): Unit =
63+
p.setObject(parameterIndex, x, targetSqlType)
64+
65+
def setObject(parameterIndex: Int, x: Any): Unit = p.setObject(parameterIndex, x)
66+
67+
def execute(): Boolean = p.execute()
68+
69+
def addBatch(): Unit = p.addBatch()
70+
71+
def setCharacterStream(parameterIndex: Int, reader: Reader, length: Int): Unit =
72+
p.setCharacterStream(parameterIndex, reader, length)
73+
74+
def setRef(parameterIndex: Int, x: Ref): Unit = p.setRef(parameterIndex, x)
75+
76+
def setBlob(parameterIndex: Int, x: Blob): Unit = p.setBlob(parameterIndex, x)
77+
78+
def setClob(parameterIndex: Int, x: Clob): Unit = p.setClob(parameterIndex, x)
79+
80+
def setArray(parameterIndex: Int, x: java.sql.Array): Unit = p.setArray(parameterIndex, x)
81+
82+
def getMetaData: ResultSetMetaData = p.getMetaData
83+
84+
def setDate(parameterIndex: Int, x: Date, cal: Calendar): Unit = p.setDate(parameterIndex, x, cal)
85+
86+
def setTime(parameterIndex: Int, x: Time, cal: Calendar): Unit = p.setTime(parameterIndex, x, cal)
87+
88+
def setTimestamp(parameterIndex: Int, x: Timestamp, cal: Calendar): Unit =
89+
p.setTimestamp(parameterIndex, x, cal)
90+
91+
def setNull(parameterIndex: Int, sqlType: Int, typeName: String): Unit =
92+
p.setNull(parameterIndex, sqlType, typeName)
93+
94+
def setURL(parameterIndex: Int, x: URL): Unit = p.setURL(parameterIndex, x)
95+
96+
def getParameterMetaData: ParameterMetaData = p.getParameterMetaData
97+
98+
def setRowId(parameterIndex: Int, x: RowId): Unit = p.setRowId(parameterIndex, x)
99+
100+
def setNString(parameterIndex: Int, value: String): Unit = p.setNString(parameterIndex, value)
101+
102+
def setNCharacterStream(parameterIndex: Int, value: Reader, length: Long): Unit =
103+
p.setNCharacterStream(parameterIndex, value, length)
104+
105+
def setNClob(parameterIndex: Int, value: NClob): Unit = p.setNClob(parameterIndex, value)
106+
107+
def setClob(parameterIndex: Int, reader: Reader, length: Long): Unit =
108+
p.setClob(parameterIndex, reader, length)
109+
110+
def setBlob(parameterIndex: Int, inputStream: InputStream, length: Long): Unit =
111+
p.setBlob(parameterIndex, inputStream, length)
112+
113+
def setNClob(parameterIndex: Int, reader: Reader, length: Long): Unit =
114+
p.setNClob(parameterIndex, reader, length)
115+
116+
def setSQLXML(parameterIndex: Int, xmlObject: SQLXML): Unit = p.setSQLXML(parameterIndex, xmlObject)
117+
118+
def setObject(parameterIndex: Int, x: Any, targetSqlType: Int, scaleOrLength: Int): Unit =
119+
p.setObject(parameterIndex, x, targetSqlType, scaleOrLength)
120+
121+
def setAsciiStream(parameterIndex: Int, x: InputStream, length: Long): Unit =
122+
p.setAsciiStream(parameterIndex, x, length)
123+
124+
def setBinaryStream(parameterIndex: Int, x: InputStream, length: Long): Unit =
125+
p.setBinaryStream(parameterIndex, x, length)
126+
127+
def setCharacterStream(parameterIndex: Int, reader: Reader, length: Long): Unit =
128+
p.setCharacterStream(parameterIndex, reader, length)
129+
130+
def setAsciiStream(parameterIndex: Int, x: InputStream): Unit = p.setAsciiStream(parameterIndex, x)
131+
132+
def setBinaryStream(parameterIndex: Int, x: InputStream): Unit = p.setBinaryStream(parameterIndex, x)
133+
134+
def setCharacterStream(parameterIndex: Int, reader: Reader): Unit =
135+
p.setCharacterStream(parameterIndex, reader)
136+
137+
def setNCharacterStream(parameterIndex: Int, value: Reader): Unit =
138+
p.setNCharacterStream(parameterIndex, value)
139+
140+
def setClob(parameterIndex: Int, reader: Reader): Unit = p.setClob(parameterIndex, reader)
141+
142+
def setBlob(parameterIndex: Int, inputStream: InputStream): Unit = p.setBlob(parameterIndex, inputStream)
143+
144+
def setNClob(parameterIndex: Int, reader: Reader): Unit = p.setNClob(parameterIndex, reader)
145+
146+
def executeQuery(sql: String): ResultSet = p.executeQuery(sql)
147+
148+
def executeUpdate(sql: String): Int = p.executeUpdate(sql)
149+
150+
def close(): Unit = p.close()
151+
152+
def getMaxFieldSize: Int = p.getMaxFieldSize
153+
154+
def setMaxFieldSize(max: Int): Unit = p.setMaxFieldSize(max)
155+
156+
def getMaxRows: Int = p.getMaxRows
157+
158+
def setMaxRows(max: Int): Unit = p.setMaxRows(max)
159+
160+
def setEscapeProcessing(enable: Boolean): Unit = p.setEscapeProcessing(enable)
161+
162+
def getQueryTimeout: Int = p.getQueryTimeout
163+
164+
def setQueryTimeout(seconds: Int): Unit = p.setQueryTimeout(seconds)
165+
166+
def cancel(): Unit = p.cancel()
167+
168+
def getWarnings: SQLWarning = p.getWarnings
169+
170+
def clearWarnings(): Unit = p.clearWarnings()
171+
172+
def setCursorName(name: String): Unit = p.setCursorName(name)
173+
174+
def execute(sql: String): Boolean = p.execute(sql)
175+
176+
def getResultSet: ResultSet = p.getResultSet
177+
178+
def getUpdateCount: Int = p.getUpdateCount
179+
180+
def getMoreResults: Boolean = p.getMoreResults()
181+
182+
def setFetchDirection(direction: Int): Unit = p.setFetchDirection(direction)
183+
184+
def getFetchDirection: Int = p.getFetchDirection
185+
186+
def setFetchSize(rows: Int): Unit = p.setFetchSize(rows)
187+
188+
def getFetchSize: Int = p.getFetchSize
189+
190+
def getResultSetConcurrency: Int = p.getResultSetConcurrency
191+
192+
def getResultSetType: Int = p.getResultSetType
193+
194+
def addBatch(sql: String): Unit = p.addBatch(sql)
195+
196+
def clearBatch(): Unit = p.clearBatch()
197+
198+
def executeBatch(): Array[Int] = p.executeBatch()
199+
200+
def getConnection: Connection = p.getConnection
201+
202+
def getMoreResults(current: Int): Boolean = p.getMoreResults(current)
203+
204+
def getGeneratedKeys: ResultSet = p.getGeneratedKeys
205+
206+
def executeUpdate(sql: String, autoGeneratedKeys: Int): Int = p.executeUpdate(sql, autoGeneratedKeys)
207+
208+
def executeUpdate(sql: String, columnIndexes: Array[Int]): Int = p.executeUpdate(sql, columnIndexes)
209+
210+
def executeUpdate(sql: String, columnNames: Array[String]): Int = p.executeUpdate(sql, columnNames)
211+
212+
def execute(sql: String, autoGeneratedKeys: Int): Boolean = p.execute(sql, autoGeneratedKeys)
213+
214+
def execute(sql: String, columnIndexes: Array[Int]): Boolean = p.execute(sql, columnIndexes)
215+
216+
def execute(sql: String, columnNames: Array[String]): Boolean = p.execute(sql, columnNames)
217+
218+
def getResultSetHoldability: Int = p.getResultSetHoldability
219+
220+
def isClosed: Boolean = p.isClosed
221+
222+
def setPoolable(poolable: Boolean): Unit = p.setPoolable(poolable)
223+
224+
def isPoolable: Boolean = p.isPoolable
225+
226+
def closeOnCompletion(): Unit = p.closeOnCompletion()
227+
228+
def isCloseOnCompletion: Boolean = p.isCloseOnCompletion
229+
230+
def unwrap[T](iface: Class[T]): T = p.unwrap(iface)
231+
232+
def isWrapperFor(iface: Class[_]): Boolean = p.isWrapperFor(iface)
233+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.ovoenergy.natchez.extras.doobie
2+
3+
import cats.data.Kleisli
4+
import cats.effect.Async
5+
import cats.implicits.catsSyntaxFlatMapOps
6+
import com.ovoenergy.natchez.extras.core.Config
7+
import com.ovoenergy.natchez.extras.core.Config.ServiceAndResource
8+
import doobie.{KleisliInterpreter, WeakAsync}
9+
import doobie.util.transactor.Transactor
10+
import natchez.{Span, Trace}
11+
12+
import java.sql.{Connection, PreparedStatement, ResultSet}
13+
14+
object TracedTransactor {
15+
private val DefaultResourceName = "db.execute"
16+
17+
type Traced[F[_], A] = Kleisli[F, Span[F], A]
18+
def apply[F[_]: Async](
19+
service: String,
20+
transactor: Transactor[F]
21+
): Transactor[Traced[F, *]] = {
22+
val kleisliTransactor = transactor
23+
.mapK(Kleisli.liftK[F, Span[F]])(implicitly, Async.asyncForKleisli(implicitly))
24+
trace(ServiceAndResource(s"$service-db", DefaultResourceName), kleisliTransactor)
25+
}
26+
27+
private val commentNamedQueryRegEx = """--\s*Name:\s*(\w+)""".r
28+
29+
private def extractQueryNameOrSql(sql: String): String =
30+
commentNamedQueryRegEx.findFirstMatchIn(sql).flatMap(m => Option(m.group(1))).getOrElse(sql)
31+
32+
private def formatQuery(q: String): String =
33+
q.replace("\n", " ").replaceAll("\\s+", " ").trim()
34+
35+
def trace[F[_]: Trace: Async](
36+
config: Config,
37+
transactor: Transactor[F]
38+
): Transactor[F] =
39+
transactor
40+
.copy(
41+
interpret0 = createInterpreter(config, Async[F]).ConnectionInterpreter
42+
)
43+
44+
private def createInterpreter[F[_]: Trace](config: Config, F: Async[F]): KleisliInterpreter[F] = {
45+
new KleisliInterpreter[F] {
46+
implicit val asyncM: WeakAsync[F] =
47+
WeakAsync.doobieWeakAsyncForAsync(F)
48+
49+
override lazy val PreparedStatementInterpreter: PreparedStatementInterpreter =
50+
new PreparedStatementInterpreter {
51+
52+
type TracedOp[A] = Kleisli[F, PreparedStatement, A] //PreparedStatement => F[A]
53+
54+
def runTraced[A](f: TracedOp[A]): TracedOp[A] =
55+
Kleisli {
56+
case TracedStatement(p, sql) =>
57+
Trace[F].span(config.fullyQualifiedSpanName(formatQuery(extractQueryNameOrSql(sql))))(
58+
Trace[F].put("span.type" -> "db") >> f(p)
59+
)
60+
case a =>
61+
f(a)
62+
}
63+
64+
override val executeBatch: TracedOp[Array[Int]] =
65+
runTraced(super.executeBatch)
66+
67+
override val executeLargeBatch: TracedOp[Array[Long]] =
68+
runTraced(super.executeLargeBatch)
69+
70+
override val execute: TracedOp[Boolean] =
71+
runTraced(super.execute)
72+
73+
override val executeUpdate: TracedOp[Int] =
74+
runTraced(super.executeUpdate)
75+
76+
override val executeQuery: TracedOp[ResultSet] =
77+
runTraced(super.executeQuery)
78+
}
79+
80+
override lazy val ConnectionInterpreter: ConnectionInterpreter =
81+
new ConnectionInterpreter {
82+
override def prepareStatement(a: String): Kleisli[F, Connection, PreparedStatement] =
83+
super.prepareStatement(a).map(TracedStatement(_, a): PreparedStatement)
84+
}
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)