Skip to content

Commit 1f396e8

Browse files
authored
feat(spark): OpenLineage 1.24.2 upgrade (#11830)
1 parent 85c8e60 commit 1f396e8

File tree

13 files changed

+89
-53
lines changed

13 files changed

+89
-53
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ buildscript {
5656
ext.hazelcastVersion = '5.3.6'
5757
ext.ebeanVersion = '15.5.2'
5858
ext.googleJavaFormatVersion = '1.18.1'
59-
ext.openLineageVersion = '1.19.0'
59+
ext.openLineageVersion = '1.24.2'
6060
ext.logbackClassicJava8 = '1.2.12'
6161

6262
ext.docker_registry = 'acryldata'

entity-registry/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ dependencies {
2525
because("previous versions are vulnerable to CVE-2022-25857")
2626
}
2727
}
28+
api project(path: ':li-utils')
29+
api project(path: ':li-utils', configuration: "dataTemplate")
2830
dataModel project(':li-utils')
2931
annotationProcessor externalDependency.lombok
3032

metadata-integration/java/acryl-spark-lineage/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ information like tokens.
165165
| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg:<http://localhost:8080> |
166166
| spark.datahub.rest.token | | | Authentication token. |
167167
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
168+
| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. ||
168169
| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed |
169170
| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries |
170171
| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set |

metadata-integration/java/acryl-spark-lineage/build.gradle

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
plugins {
22
id("com.palantir.git-version") apply false
33
}
4-
apply plugin: 'java'
4+
apply plugin: 'java-library'
55
apply plugin: 'com.github.johnrengelman.shadow'
66
apply plugin: 'signing'
77
apply plugin: 'io.codearte.nexus-staging'
@@ -51,8 +51,8 @@ dependencies {
5151

5252
implementation project(':metadata-integration:java:openlineage-converter')
5353

54-
implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow')
55-
implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow')
54+
implementation project(path: ':metadata-integration:java:datahub-client')
55+
implementation project(path: ':metadata-integration:java:openlineage-converter')
5656

5757
//implementation "io.acryl:datahub-client:0.10.2"
5858
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
@@ -91,6 +91,8 @@ shadowJar {
9191
zip64 = true
9292
archiveClassifier = ''
9393
mergeServiceFiles()
94+
project.configurations.implementation.canBeResolved = true
95+
configurations = [project.configurations.implementation]
9496

9597
def exclude_modules = project
9698
.configurations
@@ -106,6 +108,8 @@ shadowJar {
106108
exclude(dependency {
107109
exclude_modules.contains(it.name)
108110
})
111+
exclude(dependency("org.slf4j::"))
112+
exclude("org/apache/commons/logging/**")
109113
}
110114

111115
// preventing java multi-release JAR leakage
@@ -123,39 +127,36 @@ shadowJar {
123127
relocate 'com.sun.activation', 'io.acryl.shaded.com.sun.activation'
124128
relocate 'com.sun.codemodel', 'io.acryl.shaded.com.sun.codemodel'
125129
relocate 'com.sun.mail', 'io.acryl.shaded.com.sun.mail'
126-
relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson'
127-
relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j'
128130
//
129131
relocate 'org.apache.hc', 'io.acryl.shaded.http'
130-
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
131-
relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress'
132-
relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3'
132+
relocate 'org.apache.commons.codec', 'io.acryl.shaded.org.apache.commons.codec'
133+
relocate 'org.apache.commons.compress', 'io.acryl.shaded.org.apache.commons.compress'
134+
relocate 'org.apache.commons.lang3', 'io.acryl.shaded.org.apache.commons.lang3'
133135
relocate 'mozilla', 'datahub.spark2.shaded.mozilla'
134-
relocate 'com.typesafe', 'datahub.spark2.shaded.typesafe'
135-
relocate 'io.opentracing', 'datahub.spark2.shaded.io.opentracing'
136-
relocate 'io.netty', 'datahub.spark2.shaded.io.netty'
137-
relocate 'ch.randelshofer', 'datahub.spark2.shaded.ch.randelshofer'
138-
relocate 'ch.qos', 'datahub.spark2.shaded.ch.qos'
136+
relocate 'com.typesafe', 'io.acryl.shaded.com.typesafe'
137+
relocate 'io.opentracing', 'io.acryl.shaded.io.opentracing'
138+
relocate 'io.netty', 'io.acryl.shaded.io.netty'
139+
relocate 'ch.randelshofer', 'io.acryl.shaded.ch.randelshofer'
140+
relocate 'ch.qos', 'io.acryl.shaded.ch.qos'
139141
relocate 'org.springframework', 'io.acryl.shaded.org.springframework'
140142
relocate 'com.fasterxml.jackson', 'io.acryl.shaded.jackson'
141143
relocate 'org.yaml', 'io.acryl.shaded.org.yaml' // Required for shading snakeyaml
142144
relocate 'net.jcip.annotations', 'io.acryl.shaded.annotations'
143145
relocate 'javassist', 'io.acryl.shaded.javassist'
144146
relocate 'edu.umd.cs.findbugs', 'io.acryl.shaded.findbugs'
145-
relocate 'org.antlr', 'io.acryl.shaded.org.antlr'
146-
relocate 'antlr', 'io.acryl.shaded.antlr'
147+
//relocate 'org.antlr', 'io.acryl.shaded.org.antlr'
148+
//relocate 'antlr', 'io.acryl.shaded.antlr'
147149
relocate 'com.google.common', 'io.acryl.shaded.com.google.common'
148-
relocate 'org.apache.commons', 'io.acryl.shaded.org.apache.commons'
149150
relocate 'org.reflections', 'io.acryl.shaded.org.reflections'
150151
relocate 'st4hidden', 'io.acryl.shaded.st4hidden'
151152
relocate 'org.stringtemplate', 'io.acryl.shaded.org.stringtemplate'
152153
relocate 'org.abego.treelayout', 'io.acryl.shaded.treelayout'
153-
relocate 'org.slf4j', 'io.acryl.shaded.slf4j'
154154
relocate 'javax.annotation', 'io.acryl.shaded.javax.annotation'
155155
relocate 'com.github.benmanes.caffeine', 'io.acryl.shaded.com.github.benmanes.caffeine'
156156
relocate 'org.checkerframework', 'io.acryl.shaded.org.checkerframework'
157157
relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone'
158158
relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna'
159+
159160
}
160161

161162
checkShadowJar {

metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
120120
boolean disableSslVerification =
121121
sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY)
122122
&& sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);
123-
123+
boolean disableChunkedEncoding =
124+
sparkConf.hasPath(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING)
125+
&& sparkConf.getBoolean(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING);
124126
int retry_interval_in_sec =
125127
sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
126128
? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
@@ -150,6 +152,7 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
150152
.disableSslVerification(disableSslVerification)
151153
.maxRetries(max_retries)
152154
.retryIntervalSec(retry_interval_in_sec)
155+
.disableChunkedEncoding(disableChunkedEncoding)
153156
.build();
154157
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
155158
case "kafka":
@@ -374,7 +377,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
374377
String disabledFacets;
375378
if (openLineageConfig.getFacetsConfig() != null
376379
&& openLineageConfig.getFacetsConfig().getDisabledFacets() != null) {
377-
disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets());
380+
disabledFacets =
381+
String.join(";", openLineageConfig.getFacetsConfig().getEffectiveDisabledFacets());
378382
} else {
379383
disabledFacets = "";
380384
}

metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class SparkConfigParser {
3030
public static final String GMS_AUTH_TOKEN = "rest.token";
3131
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
3232
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
33+
public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";
34+
3335
public static final String MAX_RETRIES = "rest.max_retries";
3436
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
3537
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";

metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@
55

66
package io.openlineage.spark.agent.util;
77

8-
import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE;
9-
108
import com.typesafe.config.Config;
119
import com.typesafe.config.ConfigFactory;
1210
import datahub.spark.conf.SparkLineageConf;
1311
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
1412
import io.openlineage.client.OpenLineage;
1513
import io.openlineage.spark.agent.Versions;
14+
import io.openlineage.spark.api.naming.NameNormalizer;
1615
import java.io.IOException;
1716
import java.net.URI;
1817
import java.net.URISyntaxException;
@@ -21,7 +20,6 @@
2120
import java.util.Collection;
2221
import java.util.Collections;
2322
import java.util.List;
24-
import java.util.Locale;
2523
import java.util.Objects;
2624
import java.util.Optional;
2725
import java.util.UUID;
@@ -186,7 +184,7 @@ public static OpenLineage.ParentRunFacet parentRunFacet(
186184
.run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build())
187185
.job(
188186
new OpenLineage.ParentRunFacetJobBuilder()
189-
.name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT))
187+
.name(NameNormalizer.normalize(parentJob))
190188
.namespace(parentJobNamespace)
191189
.build())
192190
.build();
@@ -287,8 +285,6 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) {
287285
* @param pfn
288286
* @param x
289287
* @return
290-
* @param <T>
291-
* @param <D>
292288
*/
293289
public static <T, D> List<T> safeApply(PartialFunction<D, List<T>> pfn, D x) {
294290
try {

metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.Arrays;
99
import java.util.Objects;
10+
import java.util.concurrent.atomic.AtomicBoolean;
1011
import java.util.stream.Stream;
1112
import lombok.extern.slf4j.Slf4j;
1213
import org.apache.commons.lang3.reflect.FieldUtils;
@@ -18,6 +19,7 @@
1819
import org.apache.spark.rdd.MapPartitionsRDD;
1920
import org.apache.spark.rdd.ParallelCollectionRDD;
2021
import org.apache.spark.rdd.RDD;
22+
import org.apache.spark.sql.execution.datasources.FilePartition;
2123
import org.apache.spark.sql.execution.datasources.FileScanRDD;
2224
import scala.Tuple2;
2325
import scala.collection.immutable.Seq;
@@ -90,7 +92,7 @@ public boolean isDefinedAt(Object rdd) {
9092
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
9193
public Stream<Path> extract(FileScanRDD rdd) {
9294
return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream()
93-
.flatMap(fp -> Arrays.stream(fp.files()))
95+
.flatMap((FilePartition fp) -> Arrays.stream(fp.files()))
9496
.map(
9597
f -> {
9698
if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) {
@@ -115,21 +117,25 @@ public boolean isDefinedAt(Object rdd) {
115117

116118
@Override
117119
public Stream<Path> extract(ParallelCollectionRDD rdd) {
120+
int SEQ_LIMIT = 1000;
121+
AtomicBoolean loggingDone = new AtomicBoolean(false);
118122
try {
119123
Object data = FieldUtils.readField(rdd, "data", true);
120124
log.debug("ParallelCollectionRDD data: {}", data);
121-
if (data instanceof Seq) {
122-
return ScalaConversionUtils.fromSeq((Seq) data).stream()
125+
if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) {
126+
// exit if the first element is invalid
127+
Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT);
128+
return ScalaConversionUtils.fromSeq(data_slice).stream()
123129
.map(
124130
el -> {
125131
Path path = null;
126132
if (el instanceof Tuple2) {
127133
// we're able to extract path
128134
path = parentOf(((Tuple2) el)._1.toString());
129135
log.debug("Found input {}", path);
130-
} else {
131-
// Change to debug to silence error
132-
log.debug("unable to extract Path from {}", el.getClass().getCanonicalName());
136+
} else if (!loggingDone.get()) {
137+
log.warn("unable to extract Path from {}", el.getClass().getCanonicalName());
138+
loggingDone.set(true);
133139
}
134140
return path;
135141
})

metadata-integration/java/datahub-client/build.gradle

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
plugins {
22
id("com.palantir.git-version") apply false
3-
id 'java'
3+
id 'java-library'
44
id 'com.github.johnrengelman.shadow'
55
id 'jacoco'
66
id 'signing'
@@ -12,11 +12,13 @@ apply from: "../versioning.gradle"
1212
import org.apache.tools.ant.filters.ReplaceTokens
1313

1414

15-
jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation
15+
jar {
16+
archiveClassifier = "lib"
17+
}
1618

1719
dependencies {
18-
implementation project(':entity-registry')
19-
implementation project(':metadata-integration:java:datahub-event')
20+
api project(':entity-registry')
21+
api project(':metadata-integration:java:datahub-event')
2022
implementation(externalDependency.kafkaAvroSerializer) {
2123
exclude group: "org.apache.avro"
2224
}
@@ -33,7 +35,7 @@ dependencies {
3335
implementation externalDependency.jacksonDataBind
3436
runtimeOnly externalDependency.jna
3537

36-
implementation externalDependency.slf4jApi
38+
api externalDependency.slf4jApi
3739
compileOnly externalDependency.lombok
3840
annotationProcessor externalDependency.lombok
3941
// VisibleForTesting
@@ -78,6 +80,11 @@ shadowJar {
7880
// https://github.com/johnrengelman/shadow/issues/729
7981
exclude('module-info.class', 'META-INF/versions/**',
8082
'**/LICENSE', '**/LICENSE*.txt', '**/NOTICE', '**/NOTICE.txt', 'licenses/**', 'log4j2.*', 'log4j.*')
83+
dependencies {
84+
exclude(dependency("org.slf4j::"))
85+
exclude(dependency("antlr::"))
86+
exclude("org/apache/commons/logging/**")
87+
}
8188
mergeServiceFiles()
8289
// we relocate namespaces manually, because we want to know exactly which libs we are exposing and why
8390
// we can move to automatic relocation using ConfigureShadowRelocation after we get to a good place on these first
@@ -88,15 +95,20 @@ shadowJar {
8895
relocate 'javassist', 'datahub.shaded.javassist'
8996
relocate 'edu.umd.cs.findbugs', 'datahub.shaded.findbugs'
9097
relocate 'org.antlr', 'datahub.shaded.org.antlr'
91-
relocate 'antlr', 'datahub.shaded.antlr'
98+
//relocate 'antlr', 'datahub.shaded.antlr'
9299
relocate 'com.google.common', 'datahub.shaded.com.google.common'
93-
relocate 'org.apache.commons', 'datahub.shaded.org.apache.commons'
100+
relocate 'org.apache.commons.codec', 'datahub.shaded.org.apache.commons.codec'
101+
relocate 'org.apache.commons.compress', 'datahub.shaded.org.apache.commons.compress'
102+
relocate 'org.apache.commons.lang3', 'datahub.shaded.org.apache.commons.lang3'
103+
relocate 'org.apache.commons.lang', 'datahub.shaded.org.apache.commons.lang'
104+
relocate 'org.apache.commons.cli', 'datahub.shaded.org.apache.commons.cli'
105+
relocate 'org.apache.commons.text', 'datahub.shaded.org.apache.commons.text'
106+
relocate 'org.apache.commons.io', 'datahub.shaded.org.apache.commons.io'
94107
relocate 'org.apache.maven', 'datahub.shaded.org.apache.maven'
95108
relocate 'org.reflections', 'datahub.shaded.org.reflections'
96109
relocate 'st4hidden', 'datahub.shaded.st4hidden'
97110
relocate 'org.stringtemplate', 'datahub.shaded.org.stringtemplate'
98111
relocate 'org.abego.treelayout', 'datahub.shaded.treelayout'
99-
relocate 'org.slf4j', 'datahub.shaded.slf4j'
100112
relocate 'javax.annotation', 'datahub.shaded.javax.annotation'
101113
relocate 'com.github.benmanes.caffeine', 'datahub.shaded.com.github.benmanes.caffeine'
102114
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'

metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public boolean retryRequest(
4848

4949
@Override
5050
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
51-
log.warn("Retrying request due to error: {}", response);
5251
return super.retryRequest(response, execCount, context);
5352
}
5453
}

0 commit comments

Comments
 (0)