11import datadog.trace.agent.test.AgentTestRunner
22import datadog.trace.bootstrap.instrumentation.api.Tags
3+ import org.apache.kafka.clients.producer.ProducerRecord
4+ import org.apache.spark.api.java.function.VoidFunction2
35import org.apache.spark.sql.Dataset
46import org.apache.spark.sql.Row
57import org.apache.spark.sql.RowFactory
68import org.apache.spark.sql.SparkSession
9+ import org.apache.spark.sql.streaming.Trigger
710import org.apache.spark.sql.types.StructType
11+ import org.junit.ClassRule
12+ import org.springframework.kafka.core.DefaultKafkaProducerFactory
13+ import org.springframework.kafka.test.EmbeddedKafkaBroker
14+ import org.springframework.kafka.test.rule.EmbeddedKafkaRule
15+ import org.springframework.kafka.test.utils.KafkaTestUtils
16+ import spock.lang.Shared
17+
818
919class SparkExecutorTest extends AgentTestRunner {
20+ static final SOURCE_TOPIC = " source"
21+ static final SINK_TOPIC = " sink"
22+
23+ @Shared
24+ @ClassRule
25+ EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule (1 , false , 1 , SOURCE_TOPIC , SINK_TOPIC )
26+ EmbeddedKafkaBroker embeddedKafka = kafkaRule. embeddedKafka
1027
1128 @Override
1229 void configurePreAgent () {
1330 super . configurePreAgent()
1431 injectSysConfig(" dd.integration.spark-executor.enabled" , " true" )
32+ injectSysConfig(" dd.integration.spark.enabled" , " true" )
33+ injectSysConfig(" dd.integration.kafka.enabled" , " true" )
34+ injectSysConfig(" dd.data.streams.enabled" , " true" )
35+ injectSysConfig(" dd.trace.debug" , " true" )
1536 }
1637
1738 private Dataset<Row > generateSampleDataframe (SparkSession spark ) {
@@ -23,6 +44,57 @@ class SparkExecutorTest extends AgentTestRunner {
2344 spark. createDataFrame(rows, structType)
2445 }
2546
47+ def " test dsm service name override" () {
48+ setup :
49+ def sparkSession = SparkSession . builder()
50+ .config(" spark.master" , " local[2]" )
51+ .config(" spark.driver.bindAddress" , " localhost" )
52+ // .config("spark.sql.shuffle.partitions", "2")
53+ .appName(" test-app" )
54+ .getOrCreate()
55+
56+ def producerProps = KafkaTestUtils . producerProps(embeddedKafka. getBrokersAsString())
57+ def producer = new DefaultKafkaProducerFactory<Integer , String > (producerProps). createProducer()
58+
59+ when :
60+ for (int i = 0 ; i < 100 ; i++ ) {
61+ producer. send(new ProducerRecord<> (SOURCE_TOPIC , i, i. toString()))
62+ }
63+ producer. flush()
64+
65+ def df = sparkSession
66+ .readStream()
67+ .format(" kafka" )
68+ .option(" kafka.bootstrap.servers" , embeddedKafka. getBrokersAsString())
69+ .option(" startingOffsets" , " earliest" )
70+ .option(" failOnDataLoss" , " false" )
71+ .option(" subscribe" , SOURCE_TOPIC )
72+ .load()
73+
74+ def query = df
75+ .selectExpr(" CAST(key AS STRING) as key" , " CAST(value AS STRING) as value" )
76+ .writeStream()
77+ .format(" kafka" )
78+ .option(" kafka.bootstrap.servers" , embeddedKafka. getBrokersAsString())
79+ .option(" checkpointLocation" , " /tmp/" + System . currentTimeMillis(). toString())
80+ .option(" topic" , SINK_TOPIC )
81+ .trigger(Trigger.Once ())
82+ .foreachBatch(new VoidFunction2<Dataset<Row > , Long > () {
83+ @Override
84+ void call (Dataset<Row > rowDataset , Long aLong ) throws Exception {
85+ rowDataset. show()
86+ rowDataset. write()
87+ }
88+ })
89+ .start()
90+
91+ query. processAllAvailable()
92+
93+ then :
94+ query. stop()
95+ producer. close()
96+ }
97+
2698 def " generate spark task run spans" () {
2799 setup :
28100 def sparkSession = SparkSession . builder()
0 commit comments