Skip to content

Commit fe1b66b

Browse files
authored
#247 Implement basics of FlowReader (#306)
#247: Implement basics of FlowReader * `FlowReader` methods to read checkpoints * uncommented several endpoints for Swagger documentation creation * `FlowReader` as case class * `RequestException` defined * `ApiPath` constants moved from _Server_ to _Model_ as the least best solution for sharing * `QueryParamNames` constants defined
1 parent d9099aa commit fe1b66b

26 files changed

+605
-64
lines changed

.github/workflows/test_filenames_check.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ jobs:
4242
server/src/test/scala/za/co/absa/atum/server/api/TestData.scala,
4343
server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala,
4444
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala,
45-
model/src/test/scala/za/co/absa/atum/testing/*
45+
model/src/test/scala/za/co/absa/atum/testing/*,
46+
reader/src/test/scala/za/co/absa/atum/testing/*
4647
verbose-logging: 'false'
4748
fail-on-violation: 'true'
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<mxfile host="app.diagrams.net" agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0" version="24.8.6">
2+
<diagram name="Page-1" id="M1M2r3vxqz2qx0Wid1ZL">
3+
<mxGraphModel dx="1434" dy="733" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
4+
<root>
5+
<mxCell id="0" />
6+
<mxCell id="1" parent="0" />
7+
<mxCell id="rmWetNVkiCoe2Ea992-S-1" value="PartitioningReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
8+
<mxGeometry x="440" y="40" width="120" height="60" as="geometry" />
9+
</mxCell>
10+
<mxCell id="rmWetNVkiCoe2Ea992-S-2" value="FlowReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
11+
<mxGeometry x="60" y="40" width="120" height="60" as="geometry" />
12+
</mxCell>
13+
<mxCell id="rmWetNVkiCoe2Ea992-S-4" value="&lt;div align=&quot;left&quot;&gt;&lt;ul&gt;&lt;li&gt;&lt;b&gt;Identified by the main paritioining&lt;/b&gt;&lt;/li&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_flow&lt;/i&gt; based on the main partitioing &lt;br&gt;&lt;/li&gt;&lt;li&gt;Getting the checkpoints belonging to the flow&lt;/li&gt;&lt;li&gt;Getting checkpoints of certain name only&lt;/li&gt;&lt;li&gt;Getting it&#39;s partitioning (phase 2)&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;align=left;" vertex="1" parent="1">
14+
<mxGeometry x="80" y="110" width="240" height="160" as="geometry" />
15+
</mxCell>
16+
<mxCell id="rmWetNVkiCoe2Ea992-S-5" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints including partitioiing info in form of DTOs&lt;/li&gt;&lt;li&gt;returns just the dto as pagination&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
17+
<mxGeometry x="80" y="310" width="250" height="130" as="geometry" />
18+
</mxCell>
19+
<mxCell id="rmWetNVkiCoe2Ea992-S-7" value="&lt;blockquote&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;&lt;b&gt;Identified by the paritioining&lt;/b&gt;&lt;/li&gt;&lt;/div&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_partitioing&lt;/i&gt; based on the partitioing &lt;/li&gt;&lt;li&gt;Getting the checkpoints&lt;/li&gt;&lt;li&gt;Getting checkpoint(s) filtered by name&lt;/li&gt;&lt;li&gt;Getting the additional data&lt;/li&gt;&lt;li&gt;Getting the checkpoints + additional data in form of _INFO file format (phase 2)&lt;/li&gt;&lt;/div&gt;&lt;/blockquote&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
20+
<mxGeometry x="440" y="110" width="240" height="160" as="geometry" />
21+
</mxCell>
22+
<mxCell id="rmWetNVkiCoe2Ea992-S-8" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints in the form of DTO in Pagination DTO&lt;/li&gt;&lt;li&gt;additional data as their DTO&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
23+
<mxGeometry x="440" y="320" width="240" height="120" as="geometry" />
24+
</mxCell>
25+
</root>
26+
</mxGraphModel>
27+
</diagram>
28+
</mxfile>

server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala renamed to model/src/main/scala/za/co/absa/atum/model/ApiPaths.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.atum.server.api.http
17+
package za.co.absa.atum.model
1818

1919
object ApiPaths {
2020

model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 ABSA Group Limited
2+
* Copyright 2021 ABSA Group Limited
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 ABSA Group Limited
2+
* Copyright 2021 ABSA Group Limited
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 ABSA Group Limited
2+
* Copyright 2021 ABSA Group Limited
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,79 @@ package za.co.absa.atum.reader
1818

1919
import sttp.client3.SttpBackend
2020
import sttp.monad.MonadError
21+
import sttp.monad.syntax._
22+
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
23+
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
2124
import za.co.absa.atum.model.types.basic.AtumPartitions
22-
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
25+
import za.co.absa.atum.reader.core.RequestResult.RequestResult
26+
import za.co.absa.atum.model.ApiPaths._
27+
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
28+
import za.co.absa.atum.reader.requests.QueryParamNames
2329
import za.co.absa.atum.reader.server.ServerConfig
2430

2531
/**
2632
* This class is a reader that reads data tight to a flow.
27-
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
28-
* @param serverConfig - the Atum server configuration
29-
* @param backend - sttp backend, that will be executing the requests
30-
* @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context
31-
* bounds, as it make the imports easier to follow
32-
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
33+
*
34+
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
35+
* @param serverConfig - the Atum server configuration
36+
* @param backend - sttp backend, that will be executing the requests
37+
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
3338
*/
34-
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
35-
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
36-
extends Reader[F] with PartitioningIdProvider[F]{
39+
case class FlowReader[F[_]: MonadError](mainFlowPartitioning: AtumPartitions)
40+
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
41+
extends Reader[F] with PartitioningIdProvider[F] {
42+
43+
/**
44+
* Function to retrieve a page of checkpoints belonging to the flow.
45+
* The checkpoints are ordered by their creation order.
46+
*
47+
* @param pageSize - the size of the page (record count) to be returned
48+
* @param offset - offset of the page (starting position)
49+
* @return - a page of checkpoints
50+
*/
51+
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
52+
for {
53+
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
54+
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
55+
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, None, pageSize, offset))
56+
} yield checkpointsOrError
57+
}
58+
59+
/**
60+
* Function to retrieve a page of checkpoints of the given name belonging to the flow.
61+
* The checkpoints are ordered by their creation order.
62+
*
63+
* @param checkpointName - the name to filter with
64+
* @param pageSize - the size of the page (record count) to be returned
65+
* @param offset - offset of the page (starting position)
66+
* @return - a page of checkpoints
67+
*/
68+
def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
69+
for {
70+
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
71+
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
72+
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, Some(checkpointName), pageSize, offset))
73+
} yield checkpointsOrError
74+
}
75+
76+
private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = {
77+
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}"
78+
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint)
79+
queryResult.map { result =>
80+
result.map(_.data.id)
81+
}
82+
}
83+
84+
private def queryCheckpoints(flowId: Long,
85+
checkpointName: Option[String],
86+
limit: Int,
87+
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
88+
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}"
89+
val params = Map(
90+
QueryParamNames.Limit -> limit.toString,
91+
QueryParamNames.Offset -> offset.toString
92+
) ++ checkpointName.map(QueryParamNames.CheckpointName -> _)
93+
getQuery(endpoint, params)
94+
}
3795

3896
}

reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package za.co.absa.atum.reader
1919
import sttp.client3.SttpBackend
2020
import sttp.monad.MonadError
2121
import za.co.absa.atum.model.types.basic.AtumPartitions
22-
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
22+
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
2323
import za.co.absa.atum.reader.server.ServerConfig
2424

2525
/**

reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala renamed to reader/src/main/scala/za/co/absa/atum/reader/core/PartitioningIdProvider.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,27 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.atum.reader.basic
17+
package za.co.absa.atum.reader.core
1818

1919
import sttp.monad.MonadError
2020
import sttp.monad.syntax._
21+
import za.co.absa.atum.model.ApiPaths._
2122
import za.co.absa.atum.model.dto.PartitioningWithIdDTO
2223
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
2324
import za.co.absa.atum.model.types.basic.AtumPartitions
2425
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
2526
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
26-
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
27+
import za.co.absa.atum.reader.core.RequestResult.RequestResult
2728

28-
trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
29+
trait PartitioningIdProvider[F[_]] {
30+
self: Reader[F] =>
2931
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
3032
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
31-
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
32-
queryResult.map{result =>
33+
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](
34+
s"/$Api/$V2/${V2Paths.Partitionings}",
35+
Map("partitioning" -> encodedPartitioning)
36+
)
37+
queryResult.map { result =>
3338
result.map(_.data.id)
3439
}
3540
}

reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala renamed to reader/src/main/scala/za/co/absa/atum/reader/core/Reader.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.atum.reader.basic
17+
package za.co.absa.atum.reader.core
1818

1919
import io.circe.Decoder
2020
import sttp.client3.{Identity, RequestT, ResponseException, SttpBackend, basicRequest}
2121
import sttp.client3.circe.asJson
2222
import sttp.model.Uri
2323
import sttp.monad.MonadError
2424
import sttp.monad.syntax._
25+
import za.co.absa.atum.reader.core.RequestResult._
2526
import za.co.absa.atum.reader.server.ServerConfig
26-
import za.co.absa.atum.reader.basic.RequestResult._
27+
import za.co.absa.atum.reader.exceptions.RequestException.CirceError
2728

2829
/**
2930
* Reader is a base class for reading data from a remote server.
@@ -35,6 +36,11 @@ import za.co.absa.atum.reader.basic.RequestResult._
3536
*/
3637
abstract class Reader[F[_]: MonadError](implicit val serverConfig: ServerConfig, val backend: SttpBackend[F, Any]) {
3738

39+
protected def mapRequestResultF[I, O](requestResult: RequestResult[I], f: I => F[RequestResult[O]]): F[RequestResult[O]] = requestResult match {
40+
case Right(b) => f(b)
41+
case Left(a) => MonadError[F].unit(Left(a))
42+
}
43+
3844
protected def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = {
3945
val endpointToQuery = serverConfig.host + endpointUri
4046
val uri = Uri.unsafeParse(endpointToQuery).addParams(params)

0 commit comments

Comments
 (0)