Skip to content

Commit 7682aea

Browse files
Expose Avro related APIs (#805)
1 parent 328ca1b commit 7682aea

File tree

3 files changed

+177
-1
lines changed

3 files changed

+177
-1
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Xunit;
9+
using static Microsoft.Spark.Sql.Avro.Functions;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class AvroFunctionsTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public AvroFunctionsTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for Avro APIs introduced in Spark 2.4.*.
25+
/// </summary>
26+
[SkipIfSparkVersionIsLessThan(Versions.V2_4_0)]
27+
public void TestSignaturesV2_4_X()
28+
{
29+
DataFrame df = _spark.Range(1);
30+
string jsonSchema = "{\"type\":\"long\", \"name\":\"col\"}";
31+
32+
Column inputCol = df.Col("id");
33+
Column avroCol = ToAvro(inputCol);
34+
Assert.IsType<Column>(FromAvro(avroCol, jsonSchema));
35+
}
36+
37+
/// <summary>
38+
/// Test signatures for Avro APIs introduced in Spark 3.0.*.
39+
/// </summary>
40+
[SkipIfSparkVersionIsLessThan(Versions.V3_0_0)]
41+
public void TestSignaturesV3_0_X()
42+
{
43+
DataFrame df = _spark.Range(1);
44+
string jsonSchema = "{\"type\":\"long\", \"name\":\"col\"}";
45+
var options = new Dictionary<string, string>() { { "mode", "PERMISSIVE" } };
46+
47+
Column inputCol = df.Col("id");
48+
Column avroCol = ToAvro(inputCol, jsonSchema);
49+
Assert.IsType<Column>(FromAvro(avroCol, jsonSchema, options));
50+
}
51+
}
52+
}

src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.IO;
88
using System.Reflection;
99
using System.Runtime.InteropServices;
10+
using System.Text;
1011
using Microsoft.Spark.Interop.Ipc;
1112
using Microsoft.Spark.Sql;
1213
using Microsoft.Spark.UnitTest.TestUtils;
@@ -114,6 +115,35 @@ public SparkFixture()
114115
Jvm = Spark.Reference.Jvm;
115116
}
116117

118+
public string AddPackages(string args)
119+
{
120+
string packagesOption = "--packages ";
121+
string[] splits = args.Split(packagesOption, 2);
122+
123+
StringBuilder newArgs = new StringBuilder(splits[0])
124+
.Append(packagesOption)
125+
.Append(GetAvroPackage());
126+
if (splits.Length > 1)
127+
{
128+
newArgs.Append(",").Append(splits[1]);
129+
}
130+
131+
return newArgs.ToString();
132+
}
133+
134+
public string GetAvroPackage()
135+
{
136+
Version sparkVersion = SparkSettings.Version;
137+
string avroVersion = sparkVersion.Major switch
138+
{
139+
2 => $"spark-avro_2.11:{sparkVersion}",
140+
3 => $"spark-avro_2.12:{sparkVersion}",
141+
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
142+
};
143+
144+
return $"org.apache.spark:{avroVersion}";
145+
}
146+
117147
public void Dispose()
118148
{
119149
Spark.Dispose();
@@ -175,7 +205,7 @@ private void BuildSparkCmd(out string filename, out string args)
175205
string logOption = "--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=" +
176206
$"{resourceUri}/log4j.properties";
177207

178-
args = $"{logOption} {warehouseDir} {extraArgs} {classArg} --master local {jar} debug";
208+
args = $"{logOption} {warehouseDir} {AddPackages(extraArgs)} {classArg} --master local {jar} debug";
179209
}
180210

181211
private string GetJarPrefix()
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using Microsoft.Spark.Interop;
8+
using Microsoft.Spark.Interop.Ipc;
9+
10+
namespace Microsoft.Spark.Sql.Avro
11+
{
12+
/// <summary>
13+
/// Functions for serialization and deserialization of data in Avro format.
14+
/// </summary>
15+
public static class Functions
16+
{
17+
private static IJvmBridge Jvm { get; } = SparkEnvironment.JvmBridge;
18+
private static readonly Lazy<string> s_avroClassName =
19+
new Lazy<string>(() =>
20+
{
21+
Version sparkVersion = SparkEnvironment.SparkVersion;
22+
return sparkVersion.Major switch
23+
{
24+
2 => "org.apache.spark.sql.avro.package",
25+
3 => "org.apache.spark.sql.avro.functions",
26+
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
27+
};
28+
});
29+
30+
/// <summary>
31+
/// Converts a binary column of avro format into its corresponding catalyst value. The specified
32+
/// schema must match the read data, otherwise the behavior is undefined: it may fail or return
33+
/// arbitrary result.
34+
/// </summary>
35+
/// <param name="data">The binary column.</param>
36+
/// <param name="jsonFormatSchema">The avro schema in JSON string format.</param>
37+
/// <returns>Column object</returns>
38+
[Since(Versions.V2_4_0)]
39+
public static Column FromAvro(Column data, string jsonFormatSchema) =>
40+
new Column(
41+
(JvmObjectReference)Jvm.CallStaticJavaMethod(
42+
s_avroClassName.Value,
43+
"from_avro",
44+
data,
45+
jsonFormatSchema));
46+
47+
/// <summary>
48+
/// Converts a binary column of avro format into its corresponding catalyst value. The specified
49+
/// schema must match the read data, otherwise the behavior is undefined: it may fail or return
50+
/// arbitrary result. To deserialize the data with a compatible and evolved schema, the expected Avro
51+
/// schema can be set via the option avroSchema.
52+
/// </summary>
53+
/// <param name="data">The binary column.</param>
54+
/// <param name="jsonFormatSchema">The avro schema in JSON string format.</param>
55+
/// <param name="options">Options to control how the Avro record is parsed.</param>
56+
/// <returns>Column object</returns>
57+
[Since(Versions.V3_0_0)]
58+
public static Column FromAvro(
59+
Column data,
60+
string jsonFormatSchema,
61+
Dictionary<string, string> options) =>
62+
new Column(
63+
(JvmObjectReference)Jvm.CallStaticJavaMethod(
64+
s_avroClassName.Value,
65+
"from_avro",
66+
data,
67+
jsonFormatSchema,
68+
options));
69+
70+
/// <summary>
71+
/// Converts a column into binary of avro format.
72+
/// </summary>
73+
/// <param name="data">The data column.</param>
74+
/// <returns>Column object</returns>
75+
[Since(Versions.V2_4_0)]
76+
public static Column ToAvro(Column data) =>
77+
new Column((JvmObjectReference)Jvm.CallStaticJavaMethod(s_avroClassName.Value, "to_avro", data));
78+
79+
/// <summary>
80+
/// Converts a column into binary of avro format.
81+
/// </summary>
82+
/// <param name="data">The data column.</param>
83+
/// <param name="jsonFormatSchema">User-specified output avro schema in JSON string format.</param>
84+
/// <returns>Column object</returns>
85+
[Since(Versions.V3_0_0)]
86+
public static Column ToAvro(Column data, string jsonFormatSchema) =>
87+
new Column(
88+
(JvmObjectReference)Jvm.CallStaticJavaMethod(
89+
s_avroClassName.Value,
90+
"to_avro",
91+
data,
92+
jsonFormatSchema));
93+
}
94+
}

0 commit comments

Comments
 (0)