Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
bbba4ae
Added the getAncestors Database functionality.
ABLL526 Jan 21, 2025
becd6f2
Added the getAncestors Server functionality.
ABLL526 Jan 21, 2025
e479ea3
Added the getAncestors Database functionality.
ABLL526 Jan 21, 2025
8694a4f
Added the getAncestors Database functionality.
ABLL526 Feb 12, 2025
d968718
Added the getAncestors Database functionality.
ABLL526 Feb 12, 2025
a4a4fb1
Added the getAncestors Server functionality.
ABLL526 Jan 21, 2025
ebe5d5c
Added the getAncestors Server functionality.
ABLL526 Feb 14, 2025
a3c7dcf
Merge remote-tracking branch 'origin/305-Get-Ancestors-Server' into 3…
ABLL526 Feb 14, 2025
a5c556c
Added the getAncestors Database functionality.
ABLL526 Feb 14, 2025
8961762
Changes made:
ABLL526 Feb 25, 2025
f83460a
Changes made:
ABLL526 Feb 28, 2025
c99a34a
Merge branch 'master' into 305-Get-Ancestors-Database
ABLL526 Mar 3, 2025
9c3256f
Merge branch 'master' into 305-Get-Ancestors-Database
ABLL526 Mar 7, 2025
20aa4ad
Changes Made:
ABLL526 Mar 7, 2025
4dcaa86
Changes Made:
ABLL526 Mar 7, 2025
07392f5
Merge branch '305-Get-Ancestors-Database' into 305-Get-Ancestors-Server
ABLL526 Mar 7, 2025
036e870
Changes Made:
ABLL526 Mar 7, 2025
66d08cd
Delete database/src/main/postgres/runs/V0.3.0.2__alter_checkpoints.ddl
ABLL526 Mar 7, 2025
c68223f
Delete database/src/main/postgres/runs/V0.3.0.3__get_partitioning_anc…
ABLL526 Mar 7, 2025
84fb314
Delete database/src/test/scala/za/co/absa/atum/database/runs/GetParti…
ABLL526 Mar 7, 2025
e8bd369
Added the getPartitioningAncestors Server functionality.
ABLL526 Mar 7, 2025
e88b8d8
Fixes in the getPartitioningAncestors Server functionality.
ABLL526 Mar 7, 2025
982b615
Merge branch 'master' into 305-Get-Ancestors-Server
ABLL526 Mar 20, 2025
39e2b77
Fixes in the getPartitioningAncestors Server functionality.
ABLL526 Mar 24, 2025
1b2b37a
Merge branch 'master' into 305-Get-Ancestors-Server
ABLL526 Mar 27, 2025
83ec8b5
Fixes in the getPartitioningAncestors Server functionality.
ABLL526 Mar 28, 2025
b80ff9a
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 1, 2025
aeab677
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 1, 2025
ac89fa7
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 2, 2025
b9092e3
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 2, 2025
fb7634a
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 2, 2025
93f049c
- Amended the JacocoSetup.scala to exclude the file:
ABLL526 Apr 2, 2025
54615fc
Merge branch 'master' into 305-Get-Ancestors-Server
ABLL526 May 26, 2025
1dc9121
- Amended the code to be correct with the refactor.
ABLL526 May 26, 2025
e51582c
Delete scripts/ATUM-Services_DEV_env.postman_environment.json
ABLL526 May 26, 2025
2019d5d
- Amended the JacocoSetup.scala to ensure Correct Report generation.
ABLL526 May 26, 2025
026fd34
Merge remote-tracking branch 'origin/305-Get-Ancestors-Server' into 3…
ABLL526 May 26, 2025
1317a4b
Revert "Delete scripts/ATUM-Services_DEV_env.postman_environment.json"
ABLL526 May 26, 2025
96ffbfe
Delete scripts/ATUM-Services_DEV_env.postman_environment.json
ABLL526 May 26, 2025
f5694cc
Merge branch 'master' into 305-Get-Ancestors-Server
ABLL526 May 27, 2025
9e4e339
Update JacocoSetup.scala
ABLL526 May 30, 2025
3215280
Merge branch 'master' into 305-Get-Ancestors-Server
benedeki Jun 11, 2025
05ab12f
Merge branch 'master' into 305-Get-Ancestors-Server
lsulak Jul 23, 2025
72c1e0b
- Amended the `JacocoSetup.scala` excludes to include the following:
ABLL526 Jul 24, 2025
e03f21f
Merge remote-tracking branch 'origin/305-Get-Ancestors-Server' into 3…
ABLL526 Jul 24, 2025
f465628
- Amended the `JacocoSetup.scala` excludes to include the following:
ABLL526 Jul 24, 2025
9caf3b1
Merge branch 'master' into 305-Get-Ancestors-Server
ABLL526 Jul 31, 2025
28e8571
- Edits to the server portion since merging with master.
ABLL526 Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/JacocoSetup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ object JacocoSetup {
"za.co.absa.atum.server.api.v2.controller.PartitioningController$",
"za.co.absa.atum.server.api.v2.service.PartitioningService",
"za.co.absa.atum.server.api.v2.service.PartitioningService$",
"za.co.absa.atum.server.api.common.http.Routes*",
"za.co.absa.atum.server.implicits.SeqImplicits*",
"za.co.absa.atum.model.envelopes.Pagination",
"za.co.absa.atum.model.dto.PartitioningParentPatchDTO*",
"za.co.absa.atum.model.ApiPaths*",
Expand Down
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ object Main extends ZIOAppDefault {
GetFlowPartitionings.layer,
GetPartitioningMainFlow.layer,
UpdatePartitioningParent.layer,
GetPartitioningAncestors.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
// aws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object Routes extends ServerOptions with ServerUtils {
api.v2.http.Endpoints.getPartitioningMainFlowEndpoint,
api.v2.http.Endpoints.getFlowPartitioningsEndpoint,
api.v2.http.Endpoints.getFlowCheckpointsEndpoint,
api.v2.http.Endpoints.getPartitioningAncestorsEndpoint,
api.common.http.Endpoints.healthEndpoint
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWith
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio.{Task, URLayer, ZIO, ZLayer}

import za.co.absa.atum.server.model.PartitioningResult
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet

import scala.annotation.tailrec

class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, Option[
GetFlowPartitioningsResult
Expand All @@ -53,30 +51,7 @@ class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Tas
object GetFlowPartitionings {
case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long])
case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean)

object GetFlowPartitioningsResult {

@tailrec def resultsToPartitioningWithIdDTOs(
results: Seq[GetFlowPartitioningsResult],
acc: Seq[PartitioningWithIdDTO]
): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = {
if (results.isEmpty) Right(acc)
else {
val head = results.head
val tail = results.tail
val decodingResult = head.partitioningJson.as[PartitioningForDB]
decodingResult match {
case Left(decodingFailure) => Left(decodingFailure)
case Right(partitioningForDB) =>
val partitioningDTO = partitioningForDB.keys.map { key =>
PartitionDTO(key, partitioningForDB.keysToValuesMap(key))
}
resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.id, partitioningDTO, head.author))
}
}
}

}
extends PartitioningResult(id, partitioningJson, author)

val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer {
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import io.circe.Json
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningAncestors._
import za.co.absa.atum.server.model.PartitioningResult
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio.{Task, URLayer, ZIO, ZLayer}
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet

class GetPartitioningAncestors(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[GetPartitioningAncestorsArgs, Option[
GetPartitioningAncestorsResult
], Task](args =>
Seq(
fr"${args.partitioningId}",
fr"${args.limit}",
fr"${args.offset}"
)
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ancestor_id", "partitioning", "author", "has_more")
}

object GetPartitioningAncestors {
case class GetPartitioningAncestorsArgs(partitioningId: Long, limit: Option[Int], offset: Option[Long])
case class GetPartitioningAncestorsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean)
extends PartitioningResult(id, partitioningJson, author)

val layer: URLayer[PostgresDatabaseProvider, GetPartitioningAncestors] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningAncestors()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ trait PartitioningController {
partitioningParentPatchDTO: PartitioningParentPatchDTO
): IO[ErrorResponse, Unit]


def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
_ => ()
)
}

override def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = {
mapToPaginatedResponse(
limit.get,
offset.get,
serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]](
partitioningService.getPartitioningAncestors(partitioningId, limit, offset)
)
)
}

}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,19 @@ object Endpoints extends BaseEndpoints {
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

val getPartitioningAncestorsEndpoint
: PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[
PartitioningWithIdDTO
], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Ancestors)
.in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000)))
.in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L)))
.out(statusCode(StatusCode.Ok))
.out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

val serverEndpoints: List[ZServerEndpoint[HttpEnv.Env, Any]] = List(
createServerEndpoint[
(Long, CheckpointV2DTO),
Expand Down Expand Up @@ -259,6 +272,16 @@ object Endpoints extends BaseEndpoints {
PartitioningController.patchPartitioningParent(partitioningId, partitioningParentPatchDTO)
}
),
createServerEndpoint[
(Long, Option[Int], Option[Long]),
ErrorResponse,
PaginatedResponse[PartitioningWithIdDTO]
](
getPartitioningAncestorsEndpoint,
{ case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) =>
PartitioningController.getPartitioningAncestors(partitioningId, limit, offset)
}
)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@ trait PartitioningRepository {
partitioningId: Long,
partitioningParentPatchDTO: PartitioningParentPatchDTO
): IO[DatabaseError, Unit]

def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.common.repository.BaseRepository
import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._
import za.co.absa.atum.server.api.database.flows.functions._
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningAncestors._
import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs
import za.co.absa.atum.server.api.database.runs.functions.UpdatePartitioningParent.UpdatePartitioningParentArgs
import za.co.absa.atum.server.api.database.runs.functions._
Expand All @@ -46,7 +47,8 @@ class PartitioningRepositoryImpl(
getPartitioningFn: GetPartitioning,
getFlowPartitioningsFn: GetFlowPartitionings,
getPartitioningMainFlowFn: GetPartitioningMainFlow,
updatePartitioningParentFn: UpdatePartitioningParent
updatePartitioningParentFn: UpdatePartitioningParent,
getPartitioningAncestorsFn: GetPartitioningAncestors
) extends PartitioningRepository
with BaseRepository {

Expand Down Expand Up @@ -138,7 +140,7 @@ class PartitioningRepositoryImpl(
).map(_.flatten)
.flatMap { partitioningResults =>
ZIO
.fromEither(GetFlowPartitioningsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty))
.fromEither(PartitioningResult.resultsToPartitioningWithIdDTOs(partitioningResults))
.mapBoth(
error => GeneralDatabaseError(error.getMessage),
partitionings => {
Expand Down Expand Up @@ -168,6 +170,29 @@ class PartitioningRepositoryImpl(
"updatePartitioningParent"
)
}

override def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = {
dbMultipleResultCallWithAggregatedStatus(
getPartitioningAncestorsFn(GetPartitioningAncestorsArgs(partitioningId, limit, offset)),
"getPartitioningAncestors"
).map(_.flatten)
.flatMap { partitioningResults =>
ZIO
.fromEither(PartitioningResult.resultsToPartitioningWithIdDTOs(partitioningResults))
.mapBoth(
error => GeneralDatabaseError(error.getMessage),
partitionings => {
if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings)
else ResultNoMore(partitionings)
}
)
}
}

}

object PartitioningRepositoryImpl {
Expand All @@ -181,7 +206,8 @@ object PartitioningRepositoryImpl {
with GetPartitioning
with GetFlowPartitionings
with GetPartitioningMainFlow
with UpdatePartitioningParent,
with UpdatePartitioningParent
with GetPartitioningAncestors,
PartitioningRepository
] = ZLayer {
for {
Expand All @@ -195,6 +221,7 @@ object PartitioningRepositoryImpl {
getFlowPartitionings <- ZIO.service[GetFlowPartitionings]
getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow]
updatePartitioningParent <- ZIO.service[UpdatePartitioningParent]
getPartitioningAncestors <- ZIO.service[GetPartitioningAncestors]
} yield new PartitioningRepositoryImpl(
createPartitioning,
getPartitioningMeasures,
Expand All @@ -205,7 +232,8 @@ object PartitioningRepositoryImpl {
getPartitioning,
getFlowPartitionings,
getPartitioningMainFlow,
updatePartitioningParent
updatePartitioningParent,
getPartitioningAncestors
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ trait PartitioningService {
partitioningId: Long,
partitioningParentPatchDTO: PartitioningParentPatchDTO
): IO[ServiceError, Unit]

def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
"updatePartitioningParent"
)
}

override def getPartitioningAncestors(
partitioningId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = {
repositoryCall(
partitioningRepository.getPartitioningAncestors(partitioningId, limit, offset),
"getPartitioningAncestors"
)
}

}

object PartitioningServiceImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.implicits

import io.circe.DecodingFailure

object SeqImplicits {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree abstracting the seq processing is useful, the main bit (conversion from database results into desired type) wasn't addressed at all and is still duplicated in GetPartitioningAncestors and GetFlowPartitioninigs. Please give this another go and try to address the duplication.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw it's not really doing simple decoding - try to use more descriptive names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @salamonpavel I think I managed to do this. Please check. I have also tried to rename the functions, please let me know if you are happy with those or should it be named differently.

Copy link
Collaborator

@salamonpavel salamonpavel Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose the following

// 1. define PartitioningResult in a separate file in server's model package (not inside SeqImplicits) as an abstract class without type parameter
package za.co.absa.atum.server.model

import io.circe.{Decoder, DecodingFailure, Json}
import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningWithIdDTO}
import za.co.absa.atum.server.implicits.SeqImplicits.SeqEnhancements

abstract class PartitioningResult(id: Long, partitioning: Json, author: String) {

  protected def toPartitioningWithIdDTO(implicit
    decoder: Decoder[PartitioningForDB]
  ): Either[DecodingFailure, PartitioningWithIdDTO] = {
    decoder.decodeJson(partitioning).map { partitioningForDB =>
      val partitioningDTO = partitioningForDB.keys.map { key =>
        PartitionDTO(key, partitioningForDB.keysToValuesMap(key))
      }
      PartitioningWithIdDTO(id, partitioningDTO, author)
    }
  }

}

object PartitioningResult {

  def resultsToPartitioningWithIdDTOs(results: Seq[PartitioningResult]): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = {
    results.seqDecode(_.toPartitioningWithIdDTO)
  }

}

// 2. extend the abstract class in db results classes
case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean)
  extends PartitioningResult(id, partitioningJson, author) // do the same for GetPartitioningAncestorsResult

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @salamonpavel this helps greatly. I have made the changes as suggested. It looks much cleaner now.
Please check and confirm. If you are happy please approve.

implicit class SeqEnhancements[T](val seq: Seq[T]) extends AnyVal {

def seqDecode[R](decodingFnc: T => Either[DecodingFailure, R]): Either[DecodingFailure, Seq[R]] = {
seq.foldLeft(Right(List.empty[R]): Either[DecodingFailure, List[R]]) { (acc, item) =>
for {
decodedList <- acc
decodedItem <- decodingFnc(item)
} yield decodedItem :: decodedList
}.map(_.reverse)
}
}
}
Loading
Loading