Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
StreamTypeV2.Entity,
deprecated = false,
None,
replacementTopics = None,
previousTopics = None,
InternalUseOnly,
NonEmptyList.of(cfg.contactMethod),
Instant.now,
Expand All @@ -65,6 +67,7 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None,
None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact"))
Expand All @@ -83,6 +86,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
StreamTypeV2.Entity,
deprecated = false,
None,
replacementTopics = None,
previousTopics = None,
InternalUseOnly,
NonEmptyList.of(dvsConsumersTopicConfig.contactMethod),
Instant.now,
Expand All @@ -93,6 +98,7 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None,
None
),
TopicDetails(
Expand All @@ -114,6 +120,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
StreamTypeV2.Entity,
deprecated = false,
None,
replacementTopics = None,
previousTopics = None,
InternalUseOnly,
NonEmptyList.of(cooTopicConfig.contactMethod),
Instant.now,
Expand All @@ -124,6 +132,7 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None,
None
),
TopicDetails(
Expand All @@ -144,6 +153,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
StreamTypeV2.Entity,
deprecated = false,
None,
replacementTopics = None,
previousTopics = None,
InternalUseOnly,
NonEmptyList.of(cooTopicConfig.contactMethod),
Instant.now,
Expand All @@ -152,6 +163,7 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None,
None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
StreamTypeV2.Entity,
deprecated = deprecated,
deprecatedDate,
replacementTopics = None,
previousTopics = None,
Public,
NonEmptyList.of(Email.create(email).get),
createdDate,
Expand All @@ -145,7 +147,8 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
Some("dvs-teamName"),
None,
List.empty,
Some("notificationUrl")
Some("notificationUrl"),
None
)

private def buildSchema(topic: String, upgrade: Boolean): Schema = {
Expand Down
5 changes: 4 additions & 1 deletion ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ object TopicUtils {
StreamTypeV2.Entity,
deprecated = false,
deprecatedDate = None,
replacementTopics = None,
previousTopics = None,
Public,
NonEmptyList.of(Email.create("test@test.com").get),
createdDate,
Expand All @@ -34,7 +36,8 @@ object TopicUtils {
Some("dvs-teamName"),
None,
List.empty,
Some("notificationUrl")
Some("notificationUrl"),
additionalValidations = None
)
val topicMetadataContainer = TopicMetadataContainer(
topicMetadataKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package hydra.kafka.model

import enumeratum.{Enum, EnumEntry}
import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer

import scala.collection.immutable

sealed trait AdditionalValidation extends EnumEntry

sealed trait MetadataAdditionalValidation extends AdditionalValidation

object MetadataAdditionalValidation extends Enum[MetadataAdditionalValidation] {
case object replacementTopics extends MetadataAdditionalValidation

override val values: immutable.IndexedSeq[MetadataAdditionalValidation] = findValues

lazy val key: String = "MetadataAdditionalValidation"
}

object AdditionalValidation {
lazy val allValidations: Option[Map[String, List[AdditionalValidation]]] =
Some(Map(
MetadataAdditionalValidation.key -> MetadataAdditionalValidation.values.toList
))

/**
* An OLD topic will have its metadata populated.
* Therefore, additionalValidations=None will be picked from the metadata.
* And no new additionalValidations will be applied on older topics.
*
* A NEW topic will not have a metadata object.
* Therefore, all existing additionalValidations will be assigned.
* Thus, additionalValidations on corresponding fields will be applied.
*
* Corner case: After this feature has been on STAGE/PROD for sometime and some new additionalValidations are required.
* We need not worry about old topics as the value of additionalValidations will remain the same since the topic creation.
* New additionalValidations should be applied only on new topics.
* Therefore, assigning all the values under AdditionalValidation enum is reasonable.
*
* @param metadata a metadata object of current topic
* @return value of additionalValidations if the topic is already existing(OLD topic) otherwise all enum values under AdditionalValidation(NEW topic)
*/
def validations(metadata: Option[TopicMetadataContainer]): Option[Map[String, List[AdditionalValidation]]] =
metadata.map(_.value.additionalValidations).getOrElse(AdditionalValidation.allValidations)

def metadataValidations(metadata: Option[TopicMetadataContainer]): Option[List[MetadataAdditionalValidation]] =
validations(metadata) flatMap { vMap =>
vMap.get(MetadataAdditionalValidation.key)
.map(_.asInstanceOf[List[MetadataAdditionalValidation]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,34 @@ final case class TopicMetadataV2ValueOptionalTagList(
streamType: StreamTypeV2,
deprecated: Boolean,
deprecatedDate: Option[Instant],
replacementTopics: Option[List[String]],
previousTopics: Option[List[String]],
dataClassification: DataClassification,
contact: NonEmptyList[ContactMethod],
createdDate: Instant,
parentSubjects: List[String],
notes: Option[String],
teamName: Option[String],
tags: Option[List[String]],
notificationUrl: Option[String]
notificationUrl: Option[String],
additionalValidations: Option[Map[String, List[AdditionalValidation]]]
) {
def toTopicMetadataV2Value: TopicMetadataV2Value = {
TopicMetadataV2Value(
streamType,
deprecated,
deprecatedDate,
replacementTopics,
previousTopics,
dataClassification,
contact,
createdDate,
parentSubjects,
notes,
teamName,
tags.getOrElse(List.empty),
notificationUrl
notificationUrl,
additionalValidations
)
}
}
Expand All @@ -175,28 +181,34 @@ final case class TopicMetadataV2Value(
streamType: StreamTypeV2,
deprecated: Boolean,
deprecatedDate: Option[Instant],
replacementTopics: Option[List[String]],
previousTopics: Option[List[String]],
dataClassification: DataClassification,
contact: NonEmptyList[ContactMethod],
createdDate: Instant,
parentSubjects: List[String],
notes: Option[String],
teamName: Option[String],
tags: List[String],
notificationUrl: Option[String]
notificationUrl: Option[String],
additionalValidations: Option[Map[String, List[AdditionalValidation]]]
) {
def toTopicMetadataV2ValueOptionalTagList: TopicMetadataV2ValueOptionalTagList = {
TopicMetadataV2ValueOptionalTagList(
streamType,
deprecated,
deprecatedDate,
replacementTopics,
previousTopics,
dataClassification,
contact,
createdDate,
parentSubjects,
notes,
teamName,
tags.some,
notificationUrl
notificationUrl,
additionalValidations
)
}
}
Expand Down Expand Up @@ -258,6 +270,19 @@ object TopicMetadataV2ValueOptionalTagList {
private implicit val contactMethodCodec: Codec[ContactMethod] =
Codec.derive[ContactMethod]

private implicit val validationsCodec: Codec[AdditionalValidation] = Codec.deriveEnum[AdditionalValidation](
symbols = List(
MetadataAdditionalValidation.replacementTopics.entryName
),
encode = {
case MetadataAdditionalValidation.replacementTopics => MetadataAdditionalValidation.replacementTopics.entryName
},
decode = {
case "replacementTopics" => Right(MetadataAdditionalValidation.replacementTopics)
case other => Left(AvroError(s"$other is not a ${AdditionalValidation.toString}"))
}
)

implicit val codec: Codec[TopicMetadataV2ValueOptionalTagList] =
Codec.record[TopicMetadataV2ValueOptionalTagList](
name = "TopicMetadataV2Value",
Expand All @@ -267,14 +292,17 @@ object TopicMetadataV2ValueOptionalTagList {
(field("streamType", _.streamType),
field("deprecated", _.deprecated),
field("deprecatedDate", _.deprecatedDate, default = Some(None)),
field("replacementTopics", _.replacementTopics, default = Some(None)),
field("previousTopics", _.previousTopics, default = Some(None)),
field("dataClassification", _.dataClassification),
field("contact", _.contact),
field("createdDate", _.createdDate),
field("parentSubjects", _.parentSubjects),
field("notes", _.notes, default = Some(None)),
field("teamName", _.teamName, default = Some(None)),
field("tags", _.tags, default = Some(None)),
field("notificationUrl", _.notificationUrl, default = Some(None))
field("notificationUrl", _.notificationUrl, default = Some(None)),
field("additionalValidations", _.additionalValidations, default = Some(None))
).mapN(TopicMetadataV2ValueOptionalTagList.apply)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer
import hydra.kafka.model.TopicMetadataV2Request.Subject
import org.apache.avro.Schema
import shapeless.Witness
import shapeless.Witness.Lt

import java.time.Instant
import scala.collection.immutable

sealed trait DataClassification

Expand Down Expand Up @@ -78,6 +78,8 @@ final case class TopicMetadataV2Request(
streamType: StreamTypeV2,
deprecated: Boolean,
deprecatedDate: Option[Instant],
replacementTopics: Option[List[String]],
previousTopics: Option[List[String]],
dataClassification: DataClassification,
contact: NonEmptyList[ContactMethod],
createdDate: Instant,
Expand All @@ -86,22 +88,26 @@ final case class TopicMetadataV2Request(
teamName: Option[String],
numPartitions: Option[TopicMetadataV2Request.NumPartitions],
tags: List[String],
notificationUrl: Option[String]
notificationUrl: Option[String],
additionalValidations: Option[Map[String, List[AdditionalValidation]]]
) {

def toValue: TopicMetadataV2Value = {
TopicMetadataV2Value(
streamType,
deprecated,
deprecatedDate,
replacementTopics,
previousTopics,
dataClassification,
contact,
createdDate,
parentSubjects,
notes,
teamName,
tags,
notificationUrl
notificationUrl,
additionalValidations
)
}
}
Expand Down Expand Up @@ -138,6 +144,8 @@ object TopicMetadataV2Request {
mor.streamType,
mor.deprecated,
mor.deprecatedDate,
mor.replacementTopics,
mor.previousTopics,
mor.dataClassification,
mor.contact,
mor.createdDate,
Expand All @@ -146,7 +154,8 @@ object TopicMetadataV2Request {
mor.teamName,
mor.numPartitions,
mor.tags,
mor.notificationUrl
mor.notificationUrl,
mor.additionalValidations
)
}
}
Expand All @@ -160,6 +169,8 @@ final case class TopicMetadataV2Response(
streamType: StreamTypeV2,
deprecated: Boolean,
deprecatedDate: Option[Instant],
replacementTopics: Option[List[String]],
previousTopics: Option[List[String]],
dataClassification: DataClassification,
contact: NonEmptyList[ContactMethod],
createdDate: Instant,
Expand All @@ -179,6 +190,8 @@ object TopicMetadataV2Response {
v.streamType,
v.deprecated,
v.deprecatedDate,
v.replacementTopics,
v.previousTopics,
v.dataClassification,
v.contact,
v.createdDate,
Expand All @@ -194,6 +207,8 @@ object TopicMetadataV2Response {
final case class MetadataOnlyRequest(streamType: StreamTypeV2,
deprecated: Boolean,
deprecatedDate: Option[Instant],
replacementTopics: Option[List[String]],
previousTopics: Option[List[String]],
dataClassification: DataClassification,
contact: NonEmptyList[ContactMethod],
createdDate: Instant,
Expand All @@ -202,7 +217,8 @@ final case class MetadataOnlyRequest(streamType: StreamTypeV2,
teamName: Option[String],
numPartitions: Option[TopicMetadataV2Request.NumPartitions],
tags: List[String],
notificationUrl: Option[String]) {
notificationUrl: Option[String],
additionalValidations: Option[Map[String, List[AdditionalValidation]]]) {
}


Loading