Skip to content

Commit fe9ec6b

Browse files
ABLL526benedekilsulak
authored
#305 Get-Ancestors-Server Functionality (#312)
* Added the getAncestors Server functionality. 1. Made the necessary changes as mentioned by the team. 2. Made the necessary changes to the getAncestors Server functionality. * Changes made: Adjustments to the SQL and added in status code 14. Adjustments to the tests, made it more readable and shorter. * Changes made: Made amendments to test code mentioned by PR. - Removed Breakable tests - Added a set case and removed the partial test case. * Changes Made: - Changed some tests. - Added an implicit class according to David's recommendation. - Changed all files from getAncestors to getPartitioningAncestors. * Changes Made: - Made some changes to the naming convention on the files. - Made some improvements in the `SeqImplicits` scala classes. - Made it much more streamlined and simple. - Added a new file in the model called PartitioningResult.scala containing `PartitioningResult` * - Amended the JacocoSetup.scala to exclude the file: - SeqImplicits - Amended the `JacocoSetup.scala` excludes to include the following: - "za.co.absa.atum.server.api.v2.service.PartitioningService", - "za.co.absa.atum.server.api.v2.repository.PartitioningRepository", - "za.co.absa.atum.server.api.v2.controller.PartitioningController" * - Edits to the server portion since merging with master. - Added a Unit test for `resultsToPartitioningWithIdDTOs` Signed-off-by: ABLL526 <Liam.Leibrandt@absa.africa> --------- Signed-off-by: ABLL526 <Liam.Leibrandt@absa.africa> Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> Co-authored-by: Ladislav Sulak <laco.sulak@gmail.com>
1 parent 01afea4 commit fe9ec6b

21 files changed

+590
-32
lines changed

project/JacocoSetup.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ object JacocoSetup {
5050
"za.co.absa.atum.server.api.v2.controller.PartitioningController$",
5151
"za.co.absa.atum.server.api.v2.service.PartitioningService",
5252
"za.co.absa.atum.server.api.v2.service.PartitioningService$",
53+
"za.co.absa.atum.server.api.common.http.Routes*",
54+
"za.co.absa.atum.server.implicits.SeqImplicits*",
5355
"za.co.absa.atum.model.envelopes.Pagination",
5456
"za.co.absa.atum.model.dto.PartitioningParentPatchDTO*",
5557
"za.co.absa.atum.model.ApiPaths*",

server/src/main/scala/za/co/absa/atum/server/Main.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ object Main extends ZIOAppDefault {
107107
GetFlowPartitionings.layer,
108108
GetPartitioningMainFlow.layer,
109109
UpdatePartitioningParent.layer,
110+
GetPartitioningAncestors.layer,
110111
PostgresDatabaseProvider.layer,
111112
TransactorProvider.layer,
112113
// aws

server/src/main/scala/za/co/absa/atum/server/api/common/http/Routes.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ object Routes extends ServerOptions with ServerUtils {
6464
api.v2.http.Endpoints.getPartitioningMainFlowEndpoint,
6565
api.v2.http.Endpoints.getFlowPartitioningsEndpoint,
6666
api.v2.http.Endpoints.getFlowCheckpointsEndpoint,
67+
api.v2.http.Endpoints.getPartitioningAncestorsEndpoint,
6768
api.common.http.Endpoints.healthEndpoint
6869
)
6970
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))

server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWith
2929
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
3030
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
3131
import zio.{Task, URLayer, ZIO, ZLayer}
32-
32+
import za.co.absa.atum.server.model.PartitioningResult
3333
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet
3434

35-
import scala.annotation.tailrec
36-
3735
class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
3836
extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, Option[
3937
GetFlowPartitioningsResult
@@ -53,30 +51,7 @@ class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Tas
5351
object GetFlowPartitionings {
5452
case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long])
5553
case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean)
56-
57-
object GetFlowPartitioningsResult {
58-
59-
@tailrec def resultsToPartitioningWithIdDTOs(
60-
results: Seq[GetFlowPartitioningsResult],
61-
acc: Seq[PartitioningWithIdDTO]
62-
): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = {
63-
if (results.isEmpty) Right(acc)
64-
else {
65-
val head = results.head
66-
val tail = results.tail
67-
val decodingResult = head.partitioningJson.as[PartitioningForDB]
68-
decodingResult match {
69-
case Left(decodingFailure) => Left(decodingFailure)
70-
case Right(partitioningForDB) =>
71-
val partitioningDTO = partitioningForDB.keys.map { key =>
72-
PartitionDTO(key, partitioningForDB.keysToValuesMap(key))
73-
}
74-
resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.id, partitioningDTO, head.author))
75-
}
76-
}
77-
}
78-
79-
}
54+
extends PartitioningResult(id, partitioningJson, author)
8055

8156
val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer {
8257
for {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.server.api.database.runs.functions
18+
19+
import doobie.implicits.toSqlInterpolator
20+
import io.circe.Json
21+
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
22+
import za.co.absa.atum.server.api.database.runs.Runs
23+
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningAncestors._
24+
import za.co.absa.atum.server.model.PartitioningResult
25+
import za.co.absa.db.fadb.DBSchema
26+
import za.co.absa.db.fadb.doobie.DoobieEngine
27+
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
28+
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
29+
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
30+
import zio.{Task, URLayer, ZIO, ZLayer}
31+
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet
32+
33+
class GetPartitioningAncestors(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
34+
extends DoobieMultipleResultFunctionWithAggStatus[GetPartitioningAncestorsArgs, Option[
35+
GetPartitioningAncestorsResult
36+
], Task](args =>
37+
Seq(
38+
fr"${args.partitioningId}",
39+
fr"${args.limit}",
40+
fr"${args.offset}"
41+
)
42+
)
43+
with StandardStatusHandling
44+
with ByFirstErrorStatusAggregator {
45+
46+
override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ancestor_id", "partitioning", "author", "has_more")
47+
}
48+
49+
object GetPartitioningAncestors {
50+
case class GetPartitioningAncestorsArgs(partitioningId: Long, limit: Option[Int], offset: Option[Long])
51+
case class GetPartitioningAncestorsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean)
52+
extends PartitioningResult(id, partitioningJson, author)
53+
54+
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningAncestors] = ZLayer {
55+
for {
56+
dbProvider <- ZIO.service[PostgresDatabaseProvider]
57+
} yield new GetPartitioningAncestors()(Runs, dbProvider.dbEngine)
58+
}
59+
}

server/src/main/scala/za/co/absa/atum/server/api/v2/controller/PartitioningController.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,11 @@ trait PartitioningController {
6363
partitioningParentPatchDTO: PartitioningParentPatchDTO
6464
): IO[ErrorResponse, Unit]
6565

66+
67+
def getPartitioningAncestors(
68+
partitioningId: Long,
69+
limit: Option[Int],
70+
offset: Option[Long]
71+
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]]
72+
6673
}

server/src/main/scala/za/co/absa/atum/server/api/v2/controller/PartitioningControllerImpl.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
133133
_ => ()
134134
)
135135
}
136+
137+
override def getPartitioningAncestors(
138+
partitioningId: Long,
139+
limit: Option[Int],
140+
offset: Option[Long]
141+
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = {
142+
mapToPaginatedResponse(
143+
limit.get,
144+
offset.get,
145+
serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]](
146+
partitioningService.getPartitioningAncestors(partitioningId, limit, offset)
147+
)
148+
)
149+
}
150+
136151
}
137152

138153
object PartitioningControllerImpl {

server/src/main/scala/za/co/absa/atum/server/api/v2/http/Endpoints.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,19 @@ object Endpoints extends BaseEndpoints {
179179
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
180180
}
181181

182+
val getPartitioningAncestorsEndpoint
183+
: PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[
184+
PartitioningWithIdDTO
185+
], Any] = {
186+
apiV2.get
187+
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Ancestors)
188+
.in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000)))
189+
.in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L)))
190+
.out(statusCode(StatusCode.Ok))
191+
.out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]])
192+
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
193+
}
194+
182195
val serverEndpoints: List[ZServerEndpoint[HttpEnv.Env, Any]] = List(
183196
createServerEndpoint[
184197
(Long, CheckpointV2DTO),
@@ -259,6 +272,16 @@ object Endpoints extends BaseEndpoints {
259272
PartitioningController.patchPartitioningParent(partitioningId, partitioningParentPatchDTO)
260273
}
261274
),
275+
createServerEndpoint[
276+
(Long, Option[Int], Option[Long]),
277+
ErrorResponse,
278+
PaginatedResponse[PartitioningWithIdDTO]
279+
](
280+
getPartitioningAncestorsEndpoint,
281+
{ case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) =>
282+
PartitioningController.getPartitioningAncestors(partitioningId, limit, offset)
283+
}
284+
)
262285
)
263286

264287
}

server/src/main/scala/za/co/absa/atum/server/api/v2/repository/PartitioningRepository.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,11 @@ trait PartitioningRepository {
6060
partitioningId: Long,
6161
partitioningParentPatchDTO: PartitioningParentPatchDTO
6262
): IO[DatabaseError, Unit]
63+
64+
def getPartitioningAncestors(
65+
partitioningId: Long,
66+
limit: Option[Int],
67+
offset: Option[Long]
68+
): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]]
69+
6370
}

server/src/main/scala/za/co/absa/atum/server/api/v2/repository/PartitioningRepositoryImpl.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import za.co.absa.atum.model.dto._
2020
import za.co.absa.atum.server.api.common.repository.BaseRepository
2121
import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._
2222
import za.co.absa.atum.server.api.database.flows.functions._
23+
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningAncestors._
2324
import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs
2425
import za.co.absa.atum.server.api.database.runs.functions.UpdatePartitioningParent.UpdatePartitioningParentArgs
2526
import za.co.absa.atum.server.api.database.runs.functions._
@@ -46,7 +47,8 @@ class PartitioningRepositoryImpl(
4647
getPartitioningFn: GetPartitioning,
4748
getFlowPartitioningsFn: GetFlowPartitionings,
4849
getPartitioningMainFlowFn: GetPartitioningMainFlow,
49-
updatePartitioningParentFn: UpdatePartitioningParent
50+
updatePartitioningParentFn: UpdatePartitioningParent,
51+
getPartitioningAncestorsFn: GetPartitioningAncestors
5052
) extends PartitioningRepository
5153
with BaseRepository {
5254

@@ -138,7 +140,7 @@ class PartitioningRepositoryImpl(
138140
).map(_.flatten)
139141
.flatMap { partitioningResults =>
140142
ZIO
141-
.fromEither(GetFlowPartitioningsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty))
143+
.fromEither(PartitioningResult.resultsToPartitioningWithIdDTOs(partitioningResults))
142144
.mapBoth(
143145
error => GeneralDatabaseError(error.getMessage),
144146
partitionings => {
@@ -168,6 +170,29 @@ class PartitioningRepositoryImpl(
168170
"updatePartitioningParent"
169171
)
170172
}
173+
174+
override def getPartitioningAncestors(
175+
partitioningId: Long,
176+
limit: Option[Int],
177+
offset: Option[Long]
178+
): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = {
179+
dbMultipleResultCallWithAggregatedStatus(
180+
getPartitioningAncestorsFn(GetPartitioningAncestorsArgs(partitioningId, limit, offset)),
181+
"getPartitioningAncestors"
182+
).map(_.flatten)
183+
.flatMap { partitioningResults =>
184+
ZIO
185+
.fromEither(PartitioningResult.resultsToPartitioningWithIdDTOs(partitioningResults))
186+
.mapBoth(
187+
error => GeneralDatabaseError(error.getMessage),
188+
partitionings => {
189+
if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings)
190+
else ResultNoMore(partitionings)
191+
}
192+
)
193+
}
194+
}
195+
171196
}
172197

173198
object PartitioningRepositoryImpl {
@@ -181,7 +206,8 @@ object PartitioningRepositoryImpl {
181206
with GetPartitioning
182207
with GetFlowPartitionings
183208
with GetPartitioningMainFlow
184-
with UpdatePartitioningParent,
209+
with UpdatePartitioningParent
210+
with GetPartitioningAncestors,
185211
PartitioningRepository
186212
] = ZLayer {
187213
for {
@@ -195,6 +221,7 @@ object PartitioningRepositoryImpl {
195221
getFlowPartitionings <- ZIO.service[GetFlowPartitionings]
196222
getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow]
197223
updatePartitioningParent <- ZIO.service[UpdatePartitioningParent]
224+
getPartitioningAncestors <- ZIO.service[GetPartitioningAncestors]
198225
} yield new PartitioningRepositoryImpl(
199226
createPartitioning,
200227
getPartitioningMeasures,
@@ -205,7 +232,8 @@ object PartitioningRepositoryImpl {
205232
getPartitioning,
206233
getFlowPartitionings,
207234
getPartitioningMainFlow,
208-
updatePartitioningParent
235+
updatePartitioningParent,
236+
getPartitioningAncestors
209237
)
210238
}
211239

0 commit comments

Comments
 (0)