Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b97b603
#244: Create the Info module
benedeki Aug 19, 2024
e623974
* fixed License headers
benedeki Aug 19, 2024
5e4eadb
* renamed to _Reader_
benedeki Aug 24, 2024
2e1e2ea
* README.md update
benedeki Aug 25, 2024
738c904
* fix
benedeki Aug 25, 2024
df8c9bd
* JaCoCO action update
benedeki Aug 26, 2024
5affd82
* added dummy code for testing coverage
benedeki Aug 26, 2024
0f1e121
* erroneous class renamed
benedeki Aug 26, 2024
d773a93
* Deleted wrong files
benedeki Aug 26, 2024
0776f9c
#245 Add the ability to query REST endpoints from Reader module
benedeki Sep 10, 2024
38fde1c
* Work still in progress
benedeki Sep 23, 2024
1ac2233
* the first working commit
benedeki Nov 1, 2024
e6dcb52
* Removed temporary notes
benedeki Nov 1, 2024
6968b02
* introduced `MonadError` into the `GenericServerConnection`
benedeki Nov 1, 2024
b9bacef
* Fixed UTs
benedeki Nov 1, 2024
bbb1e7f
* trying to get rid of Java 11 dependency
benedeki Nov 4, 2024
33e6628
* Downgraded sttpClient
benedeki Nov 4, 2024
f7ced56
* further downgrade
benedeki Nov 5, 2024
ca2116b
* Removed exceptions
benedeki Nov 6, 2024
e5e6f63
* commented out parts of README.md which are not yet part of the code
benedeki Nov 6, 2024
fe07272
- major rework
benedeki Nov 17, 2024
7656f6f
* doc fix
benedeki Nov 17, 2024
eb9a678
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 17, 2024
7641c07
* disabled failing test
benedeki Nov 17, 2024
bc82a5b
* adjustments
benedeki Nov 18, 2024
0e7675e
- further cleaning
benedeki Nov 18, 2024
432716a
* tests progress
benedeki Nov 21, 2024
11b0a16
* several UTs added
benedeki Nov 22, 2024
2c3f145
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 22, 2024
e07dffb
* last improvements before PR ready
benedeki Nov 24, 2024
3955a50
* description to class `ServerConfig`
benedeki Nov 25, 2024
b287a66
* removed empty line
benedeki Nov 25, 2024
d04d23b
* addressed PR comments
benedeki Nov 27, 2024
c344249
* just better implementation
benedeki Nov 30, 2024
c0b0988
#247: Implement basics of FlowReader
benedeki Dec 4, 2024
b53ba99
Merge remote-tracking branch 'remotes/origin/master' into feature/247…
benedeki Dec 4, 2024
55d60e1
* major progress
benedeki Dec 9, 2024
5dfe5c5
* Further progress
benedeki Dec 9, 2024
67ffe07
* Flow reader methods to read checkpoints
benedeki Dec 11, 2024
09e2ed8
* small fixes
benedeki Dec 11, 2024
e7ff732
* License year
benedeki Dec 11, 2024
e63a2e4
* Finished implementation
benedeki Feb 2, 2025
1488d1f
#247: Paging support in Reader module
benedeki Feb 4, 2025
96ffa33
* some omitted files
benedeki Feb 4, 2025
8b69e3e
Merge branch 'master' into feature/247-paging-in-reader
benedeki Feb 4, 2025
04b56bd
* filenames check exclusions fix
benedeki Feb 4, 2025
698b765
Merge branch 'feature/247-paging-in-reader' of https://github.com/Abs…
benedeki Feb 4, 2025
855a333
* further fix
benedeki Feb 4, 2025
be90711
* fix trial 3
benedeki Feb 4, 2025
afd64a6
* Licenses fix
benedeki Feb 4, 2025
f70143e
Merge branch 'feature/247-paging-in-reader' into feature/247-implemen…
benedeki Feb 5, 2025
f803ea3
* uncommented endpoint for Swagger documentation creation
benedeki Feb 5, 2025
1a56f3d
* complete refactoring
benedeki Feb 21, 2025
bbed1f0
* added adr
benedeki Feb 21, 2025
15dd12a
* removing unnecessary changes
benedeki Feb 21, 2025
c630fa0
* removed unnecessary changes II
benedeki Feb 21, 2025
f4ce4d3
* Typo fixes
benedeki Feb 21, 2025
56cfa09
* addressed more PR comments
benedeki Feb 21, 2025
205952e
* Further comments address
benedeki Feb 25, 2025
59a0b48
* some typo fixes
benedeki Feb 26, 2025
9bad58f
* FlowReader as case class
benedeki Feb 28, 2025
6b09086
* case class in test too
benedeki Feb 28, 2025
383b532
* capitalized constant names
benedeki Feb 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test_filenames_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
server/src/test/scala/za/co/absa/atum/server/api/TestData.scala,
server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala,
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala,
model/src/test/scala/za/co/absa/atum/testing/*
model/src/test/scala/za/co/absa/atum/testing/*,
reader/src/test/scala/za/co/absa/atum/testing/*
verbose-logging: 'false'
fail-on-violation: 'true'
28 changes: 28 additions & 0 deletions adrs/01_Basics-of-FlowReader-and-PartitioningReader.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<mxfile host="app.diagrams.net" agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0" version="24.8.6">
<diagram name="Page-1" id="M1M2r3vxqz2qx0Wid1ZL">
<mxGraphModel dx="1434" dy="733" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="rmWetNVkiCoe2Ea992-S-1" value="PartitioningReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="40" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-2" value="FlowReader" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="60" y="40" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-4" value="&lt;div align=&quot;left&quot;&gt;&lt;ul&gt;&lt;li&gt;&lt;b&gt;Identified by the main paritioining&lt;/b&gt;&lt;/li&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_flow&lt;/i&gt; based on the main partitioing &lt;br&gt;&lt;/li&gt;&lt;li&gt;Getting the checkpoints belonging to the flow&lt;/li&gt;&lt;li&gt;Getting checkpoints of certain name only&lt;/li&gt;&lt;li&gt;Getting it&#39;s partitioning (phase 2)&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;align=left;" vertex="1" parent="1">
<mxGeometry x="80" y="110" width="240" height="160" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-5" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints including partitioiing info in form of DTOs&lt;/li&gt;&lt;li&gt;returns just the dto as pagination&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="80" y="310" width="250" height="130" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-7" value="&lt;blockquote&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;&lt;b&gt;Identified by the paritioining&lt;/b&gt;&lt;/li&gt;&lt;/div&gt;&lt;div align=&quot;left&quot;&gt;&lt;li&gt;ability to retrieve &lt;i&gt;id_partitioing&lt;/i&gt; based on the partitioing &lt;/li&gt;&lt;li&gt;Getting the checkpoints&lt;/li&gt;&lt;li&gt;Getting checkpoint(s) filtered by name&lt;/li&gt;&lt;li&gt;Getting the additional data&lt;/li&gt;&lt;li&gt;Getting the checkpoints + additional data in form of _INFO file format (phase 2)&lt;/li&gt;&lt;/div&gt;&lt;/blockquote&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="110" width="240" height="160" as="geometry" />
</mxCell>
<mxCell id="rmWetNVkiCoe2Ea992-S-8" value="&lt;div align=&quot;left&quot;&gt;Data returned&lt;ul&gt;&lt;li&gt;Checkpoints in the form of DTO in Pagination DTO&lt;/li&gt;&lt;li&gt;additional data as their DTO&lt;/li&gt;&lt;/ul&gt;&lt;/div&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="440" y="320" width="240" height="120" as="geometry" />
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.api.http
package za.co.absa.atum.model

object ApiPaths {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
78 changes: 68 additions & 10 deletions reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,79 @@ package za.co.absa.atum.reader

import sttp.client3.SttpBackend
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.core.RequestResult.RequestResult
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.requests.QueryParamNames
import za.co.absa.atum.reader.server.ServerConfig

/**
* This class is a reader that reads data tight to a flow.
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
* @param serverConfig - the Atum server configuration
* @param backend - sttp backend, that will be executing the requests
* @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context
* bounds, as it make the imports easier to follow
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
* @param serverConfig - the Atum server configuration
* @param backend - sttp backend, that will be executing the requests
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*/
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F]{
case class FlowReader[F[_]: MonadError](mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
extends Reader[F] with PartitioningIdProvider[F] {

/**
* Function to retrieve a page of checkpoints belonging to the flow.
* The checkpoints are ordered by their creation order.
*
* @param pageSize - the size of the page (record count) to be returned
* @param offset - offset of the page (starting position)
* @return - a page of checkpoints
*/
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
for {
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, None, pageSize, offset))
} yield checkpointsOrError
}

/**
* Function to retrieve a page of checkpoints of the given name belonging to the flow.
* The checkpoints are ordered by their creation order.
*
* @param checkpointName - the name to filter with
* @param pageSize - the size of the page (record count) to be returned
* @param offset - offset of the page (starting position)
* @return - a page of checkpoints
*/
def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not to have just 1 function with checkpointName being Option[String] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it as inconvenient call.
And this way it clearly differentiate in the function name when only name-filtered entries are requested,
But can be unifies if seen as preferential.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it as: little convenience for users, little duplicity for developers. Less # of APIs & less code: better :D

But I don't mind, it's a matter of style, if you want to keep it I'm definitely okay with it

for {
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, Some(checkpointName), pageSize, offset))
} yield checkpointsOrError
}

private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}"
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint)
queryResult.map { result =>
result.map(_.data.id)
}
}

private def queryCheckpoints(flowId: Long,
checkpointName: Option[String],
limit: Int,
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}"
val params = Map(
QueryParamNames.Limit -> limit.toString,
QueryParamNames.Offset -> offset.toString
) ++ checkpointName.map(QueryParamNames.CheckpointName -> _)
getQuery(endpoint, params)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.reader
import sttp.client3.SttpBackend
import sttp.monad.MonadError
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.server.ServerConfig

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,27 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.model.dto.PartitioningWithIdDTO
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import za.co.absa.atum.reader.core.RequestResult.RequestResult

trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
trait PartitioningIdProvider[F[_]] {
self: Reader[F] =>
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
queryResult.map{result =>
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](
s"/$Api/$V2/${V2Paths.Partitionings}",
Map("partitioning" -> encodedPartitioning)
)
queryResult.map { result =>
result.map(_.data.id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import io.circe.Decoder
import sttp.client3.{Identity, RequestT, ResponseException, SttpBackend, basicRequest}
import sttp.client3.circe.asJson
import sttp.model.Uri
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.core.RequestResult._
import za.co.absa.atum.reader.server.ServerConfig
import za.co.absa.atum.reader.basic.RequestResult._
import za.co.absa.atum.reader.exceptions.RequestException.CirceError

/**
* Reader is a base class for reading data from a remote server.
Expand All @@ -35,6 +36,11 @@ import za.co.absa.atum.reader.basic.RequestResult._
*/
abstract class Reader[F[_]: MonadError](implicit val serverConfig: ServerConfig, val backend: SttpBackend[F, Any]) {

protected def mapRequestResultF[I, O](requestResult: RequestResult[I], f: I => F[RequestResult[O]]): F[RequestResult[O]] = requestResult match {
case Right(b) => f(b)
case Left(a) => MonadError[F].unit(Left(a))
}

protected def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = {
val endpointToQuery = serverConfig.host + endpointUri
val uri = Uri.unsafeParse(endpointToQuery).addParams(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@
* limitations under the License.
*/

package za.co.absa.atum.reader.basic
package za.co.absa.atum.reader.core

import sttp.client3.{DeserializationException, HttpError, Response, ResponseException}
import sttp.monad.MonadError
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.RequestException

object RequestResult {
type CirceError = io.circe.Error
type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R]
type RequestResult[R] = Either[RequestException, R]

def RequestOK[T](value: T): RequestResult[T] = Right(value)
def RequestFail[T](error: RequestException): RequestResult[T] = Left(error)

implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal {
def toRequestResult: RequestResult[R] = {
response.body.left.map {
case he: HttpError[String] =>
ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match {
case Right(er) => HttpError(er, he.statusCode)
case Left(ce) => DeserializationException(he.body, ce)
case Right(er) => HttpException(he.getMessage, he.statusCode, er, response.request.uri)
case Left(ce) => ParsingException.fromCirceError(ce, he.body)
}
case de: DeserializationException[CirceError] => de
case de: DeserializationException[CirceError] => ParsingException.fromCirceError(de.error, de.body)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

class ReaderException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

import sttp.model.{StatusCode, Uri}
import za.co.absa.atum.model.envelopes.ErrorResponse

sealed abstract class RequestException(message: String) extends ReaderException(message)


object RequestException {
type CirceError = io.circe.Error

final case class HttpException(
message: String,
statusCode: StatusCode,
errorResponse: ErrorResponse,
request: Uri
) extends RequestException(message)

final case class ParsingException(
message: String,
body: String
) extends RequestException(message)

object ParsingException {
def fromCirceError(error: CirceError, body: String): ParsingException = {
ParsingException(error.getMessage, body)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.requests

object QueryParamNames {
final val Limit = "limit"
final val Offset = "offset"
final val CheckpointName = "checkpoint-name"
}
Loading
Loading