Skip to content

Commit ef8708c

Browse files
authored
Validate schema key field lengths before calling Iglu Server (#188)
SDJ schema fields (vendor, name, format) received over the public internet are validated to be at most 128 characters before invoking client.check. This prevents untrusted, oversized schema keys from reaching Iglu Server. The 128 limit matches the VARCHAR column size used in Iglu Server's database.
1 parent fe92056 commit ef8708c

File tree

2 files changed

+121
-28
lines changed
  • modules/common/src

2 files changed

+121
-28
lines changed

modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import io.circe.generic.semiauto._
2626
import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient}
2727
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
2828

29-
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
29+
import com.snowplowanalytics.iglu.core.{ParseError, SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
3030
import com.snowplowanalytics.iglu.core.circe.implicits._
3131

3232
import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure
@@ -260,6 +260,9 @@ object IgluUtils {
260260
.toIor
261261
} yield valid
262262

263+
// Matches the VARCHAR column size used for vendor, name, and format in Iglu Server's database.
264+
private val MaxSchemaFieldLength = 128
265+
263266
/** Check that a SDJ is valid */
264267
private[enrich] def validateSDJ[F[_]: Sync](
265268
client: IgluCirceClient[F],
@@ -269,34 +272,56 @@ object IgluUtils {
269272
etlTstamp: Instant
270273
): EitherT[F, Failure.SchemaViolation, ValidSDJ] = {
271274
implicit val rl: RegistryLookup[F] = registryLookup
272-
client
273-
.check(sdj)
274-
.leftSemiflatMap {
275-
case re: ClientError.ResolutionError if client.resolver.isSystemError(re) =>
276-
val message = s"Could not reach Iglu Server for schema '${sdj.schema.toSchemaUri}'. " +
277-
s"Check resolver configuration and ensure registries are available. " +
278-
s"Resolution errors: ${re.getMessage}"
279-
Logger[F].error(message) >> Sync[F].raiseError[ClientError](IgluSystemError(message))
280-
case other =>
281-
Sync[F].pure(other)
282-
}
283-
.leftMap(clientError =>
284-
Failure.SchemaViolation(
285-
schemaViolation = FailureDetails.SchemaViolation.IgluError(sdj.schema, clientError),
286-
source = field,
287-
data = sdj.data,
288-
etlTstamp = etlTstamp
289-
)
290-
)
291-
.map { supersededBy =>
292-
val validationInfo = supersededBy.map(s => ValidationInfo(sdj.schema, s))
293-
ValidSDJ(
294-
replaceSchemaVersion(sdj, validationInfo),
295-
validationInfo
296-
)
297-
}
275+
for {
276+
_ <- validateSchemaFieldLength(sdj, sdj.schema.vendor, field, etlTstamp)
277+
_ <- validateSchemaFieldLength(sdj, sdj.schema.name, field, etlTstamp)
278+
_ <- validateSchemaFieldLength(sdj, sdj.schema.format, field, etlTstamp)
279+
result <- client
280+
.check(sdj)
281+
.leftSemiflatMap {
282+
case re: ClientError.ResolutionError if client.resolver.isSystemError(re) =>
283+
val message = s"Could not reach Iglu Server for schema '${sdj.schema.toSchemaUri}'. " +
284+
s"Check resolver configuration and ensure registries are available. " +
285+
s"Resolution errors: ${re.getMessage}"
286+
Logger[F].error(message) >> Sync[F].raiseError[ClientError](IgluSystemError(message))
287+
case other =>
288+
Sync[F].pure(other)
289+
}
290+
.leftMap(clientError =>
291+
Failure.SchemaViolation(
292+
schemaViolation = FailureDetails.SchemaViolation.IgluError(sdj.schema, clientError),
293+
source = field,
294+
data = sdj.data,
295+
etlTstamp = etlTstamp
296+
)
297+
)
298+
.map { supersededBy =>
299+
val validationInfo = supersededBy.map(s => ValidationInfo(sdj.schema, s))
300+
ValidSDJ(
301+
replaceSchemaVersion(sdj, validationInfo),
302+
validationInfo
303+
)
304+
}
305+
} yield result
298306
}
299307

308+
private def validateSchemaFieldLength[F[_]: Sync](
309+
sdj: SelfDescribingData[Json],
310+
value: String,
311+
source: String,
312+
etlTstamp: Instant
313+
): EitherT[F, Failure.SchemaViolation, Unit] =
314+
EitherT.cond[F](
315+
value.length <= MaxSchemaFieldLength,
316+
(),
317+
Failure.SchemaViolation(
318+
schemaViolation = FailureDetails.SchemaViolation.NotIglu(sdj.normalize, ParseError.InvalidIgluUri),
319+
source = source,
320+
data = sdj.data,
321+
etlTstamp = etlTstamp
322+
)
323+
)
324+
300325
/**
301326
* Check that several SDJs are valid
302327
* @return `SchemaViolation`s in the `Left` and valid SDJs in the `Right`, if any

modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import io.circe.syntax._
2121

2222
import cats.data.{Ior, NonEmptyList}
2323

24-
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
24+
import com.snowplowanalytics.iglu.core.{ParseError, SchemaKey, SchemaVer}
2525

2626
import com.snowplowanalytics.iglu.client.ClientError.{ResolutionError, ValidationError}
2727

@@ -962,6 +962,74 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect
962962
}
963963
}
964964

965+
"return a NotIglu error if a context has a schema vendor exceeding 128 characters" >> {
966+
val longVendor = "a" * 129
967+
val longVendorContext = s"""{
968+
"schema": "iglu:$longVendor/email_sent/jsonschema/1-0-0",
969+
"data": {"emailAddress": "hello@world.com", "emailAddress2": "foo@bar.org"}
970+
}"""
971+
972+
IgluUtils
973+
.parseAndValidateContexts(
974+
Some(buildInputContexts(List(longVendorContext))),
975+
SpecHelpers.client,
976+
SpecHelpers.registryLookup,
977+
SpecHelpers.DefaultMaxJsonDepth,
978+
SpecHelpers.etlTstamp
979+
)
980+
.value
981+
.map {
982+
case Ior.Both(
983+
NonEmptyList(
984+
Failure.SchemaViolation(
985+
FailureDetails.SchemaViolation.NotIglu(_, ParseError.InvalidIgluUri),
986+
`contextsFieldName`,
987+
_,
988+
_
989+
),
990+
Nil
991+
),
992+
None
993+
) =>
994+
ok
995+
case other => ko(s"[$other] is not a NotIglu error with InvalidIgluUri")
996+
}
997+
}
998+
999+
"return a NotIglu error if a context has a schema name exceeding 128 characters" >> {
1000+
val longName = "a" * 129
1001+
val longNameContext = s"""{
1002+
"schema": "iglu:com.acme/$longName/jsonschema/1-0-0",
1003+
"data": {"emailAddress": "hello@world.com", "emailAddress2": "foo@bar.org"}
1004+
}"""
1005+
1006+
IgluUtils
1007+
.parseAndValidateContexts(
1008+
Some(buildInputContexts(List(longNameContext))),
1009+
SpecHelpers.client,
1010+
SpecHelpers.registryLookup,
1011+
SpecHelpers.DefaultMaxJsonDepth,
1012+
SpecHelpers.etlTstamp
1013+
)
1014+
.value
1015+
.map {
1016+
case Ior.Both(
1017+
NonEmptyList(
1018+
Failure.SchemaViolation(
1019+
FailureDetails.SchemaViolation.NotIglu(_, ParseError.InvalidIgluUri),
1020+
`contextsFieldName`,
1021+
_,
1022+
_
1023+
),
1024+
Nil
1025+
),
1026+
None
1027+
) =>
1028+
ok
1029+
case other => ko(s"[$other] is not a NotIglu error with InvalidIgluUri")
1030+
}
1031+
}
1032+
9651033
"return the extracted unstructured event and the extracted input contexts when schema is superseded by another schema" >> {
9661034
val input = IgluUtils.EventExtractInput(unstructEvent = Some(buildUnstruct(supersedingExample1)),
9671035
contexts = Some(buildInputContexts(List(supersedingExample1, supersedingExample2)))

0 commit comments

Comments
 (0)