Skip to content

Commit 8e4b9eb

Browse files
Support DateType (#420)
1 parent 85c4711 commit 8e4b9eb

File tree

15 files changed

+267
-26
lines changed

15 files changed

+267
-26
lines changed

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/FunctionsTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Microsoft.Spark.E2ETest.Utils;
99
using Microsoft.Spark.Sql;
1010
using Microsoft.Spark.Sql.Catalog;
11+
using Microsoft.Spark.Sql.Types;
1112
using Xunit;
1213
using static Microsoft.Spark.Sql.Functions;
1314

@@ -650,6 +651,7 @@ private void TestUdf()
650651
Udf<int, int>((arg) => arg);
651652
Udf<long, long>((arg) => arg);
652653
Udf<short, short>((arg) => arg);
654+
Udf<Date, Date>((arg) => arg);
653655

654656
// Test array type.
655657
Udf<string, string[]>((arg) => new[] { arg });

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTests.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using System;
56
using System.Collections.Generic;
67
using System.Linq;
78
using Microsoft.Spark.E2ETest.Utils;
@@ -78,13 +79,14 @@ public void TestCreateDataFrame()
7879
// Calling CreateDataFrame with schema
7980
{
8081
var data = new List<GenericRow>();
81-
data.Add(new GenericRow(new object[] { "Alice", 20 }));
82-
data.Add(new GenericRow(new object[] { "Bob", 30 }));
82+
data.Add(new GenericRow(new object[] { "Alice", 20, new Date(2020, 1, 1) }));
83+
data.Add(new GenericRow(new object[] { "Bob", 30, new Date(2020, 1, 2) }));
8384

8485
var schema = new StructType(new List<StructField>()
8586
{
8687
new StructField("Name", new StringType()),
87-
new StructField("Age", new IntegerType())
88+
new StructField("Age", new IntegerType()),
89+
new StructField("Date", new DateType())
8890
});
8991
DataFrame df = _spark.CreateDataFrame(data, schema);
9092
ValidateDataFrame(df, data.Select(a => a.Values), schema);
@@ -125,6 +127,19 @@ public void TestCreateDataFrame()
125127
DataFrame df = _spark.CreateDataFrame(data);
126128
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
127129
}
130+
131+
// Calling CreateDataFrame(IEnumerable<Date> _) without schema
132+
{
133+
var data = new Date[]
134+
{
135+
new Date(2020, 1, 1),
136+
new Date(2020, 1, 2)
137+
};
138+
StructType schema = SchemaWithSingleColumn(new DateType());
139+
140+
DataFrame df = _spark.CreateDataFrame(data);
141+
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
142+
}
128143
}
129144

130145
private void ValidateDataFrame(

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/TypesTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5-
using Microsoft.Spark.Interop;
65
using Microsoft.Spark.Interop.Ipc;
76
using Microsoft.Spark.Sql.Types;
87
using Xunit;
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
{"name":"Michael", "ids":[1], "info1":{"city":"Burdwan"}, "info2":{"state":"Paschimbanga"}, "info3":{"company":{"job":"Developer"}}}"
2-
{"name":"Andy", "age":30, "ids":[3,5], "info1":{"city":"Los Angeles"}, "info2":{"state":"California"}, "info3":{"company":{"job":"Developer"}}}
3-
{"name":"Justin", "age":19, "ids":[2,4], "info1":{"city":"Seattle"}, "info2":{"state":"Washington"}, "info3":{"company":{"job":"Developer"}}}
1+
{"name":"Michael", "ids":[1], "date":"2020-1-1", "info1":{"city":"Burdwan"}, "info2":{"state":"Paschimbanga"}, "info3":{"company":{"job":"Developer"}}}"
2+
{"name":"Andy", "age":30, "ids":[3,5], "date":"2020-1-2", "info1":{"city":"Los Angeles"}, "info2":{"state":"California"}, "info3":{"company":{"job":"Developer"}}}
3+
{"name":"Justin", "age":19, "ids":[2,4], "date":"2020-1-3", "info1":{"city":"Seattle"}, "info2":{"state":"Washington"}, "info3":{"company":{"job":"Developer"}}}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.IO;
7+
using System.Linq;
8+
using Microsoft.Spark.Sql;
9+
using Microsoft.Spark.Sql.Types;
10+
using Xunit;
11+
using static Microsoft.Spark.Sql.Functions;
12+
13+
namespace Microsoft.Spark.E2ETest.UdfTests
14+
{
15+
[Collection("Spark E2E Tests")]
16+
public class UdfSimpleTypesTests
17+
{
18+
private readonly SparkSession _spark;
19+
private readonly DataFrame _df;
20+
21+
public UdfSimpleTypesTests(SparkFixture fixture)
22+
{
23+
_spark = fixture.Spark;
24+
_df = _spark
25+
.Read()
26+
.Schema("name STRING, age INT, date DATE")
27+
.Json(Path.Combine($"{TestEnvironment.ResourceDirectory}people.json"));
28+
}
29+
30+
/// <summary>
31+
/// UDF that takes in Date type.
32+
/// </summary>
33+
[Fact]
34+
public void TestUdfWithDateType()
35+
{
36+
Func<Column, Column> udf = Udf<Date, string>(date => date.ToString());
37+
38+
Row[] rows = _df.Select(udf(_df["date"])).Collect().ToArray();
39+
Assert.Equal(3, rows.Length);
40+
41+
var expected = new string[] { "2020-01-01", "2020-01-02", "2020-01-03" };
42+
string[] rowsToArray = rows.Select(x => x[0].ToString()).ToArray();
43+
Assert.Equal(expected, rowsToArray);
44+
}
45+
46+
/// <summary>
47+
/// UDF that returns Date type.
48+
/// </summary>
49+
[Fact]
50+
public void TestUdfWithReturnAsDateType()
51+
{
52+
Func<Column, Column> udf1 = Udf<int?, Date>(
53+
s => new Date(2020 + s.GetValueOrDefault(), 1, 4));
54+
Func<Column, Column> udf2 = Udf<Date, string>(date => date.ToString());
55+
56+
// Test UDF that returns a Date object.
57+
{
58+
Row[] rows = _df.Select(udf1(_df["age"]).Alias("col")).Collect().ToArray();
59+
Assert.Equal(3, rows.Length);
60+
61+
var expected = new Date[]
62+
{
63+
new Date(2020, 1, 4),
64+
new Date(2050, 1, 4),
65+
new Date(2039, 1, 4)
66+
};
67+
for (int i = 0; i < rows.Length; ++i)
68+
{
69+
Assert.Equal(1, rows[i].Size());
70+
Assert.Equal(expected[i], rows[i].GetAs<Date>("col"));
71+
}
72+
}
73+
74+
// Chained UDFs.
75+
{
76+
Row[] rows = _df.Select(udf2(udf1(_df["age"]))).Collect().ToArray();
77+
Assert.Equal(3, rows.Length);
78+
79+
var expected = new string[] { "2020-01-04", "2050-01-04", "2039-01-04" };
80+
for (int i = 0; i < rows.Length; ++i)
81+
{
82+
Assert.Equal(1, rows[i].Size());
83+
Assert.Equal(expected[i], rows[i].GetAs<string>(0));
84+
}
85+
}
86+
}
87+
}
88+
}

src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.IO;
1010
using System.Linq;
1111
using Microsoft.Spark.Sql;
12+
using Microsoft.Spark.Sql.Types;
1213

1314
namespace Microsoft.Spark.Interop.Ipc
1415
{
@@ -22,6 +23,7 @@ internal class PayloadHelper
2223
private static readonly byte[] s_stringTypeId = new[] { (byte)'c' };
2324
private static readonly byte[] s_boolTypeId = new[] { (byte)'b' };
2425
private static readonly byte[] s_doubleTypeId = new[] { (byte)'d' };
26+
private static readonly byte[] s_dateTypeId = new[] { (byte)'D' };
2527
private static readonly byte[] s_jvmObjectTypeId = new[] { (byte)'j' };
2628
private static readonly byte[] s_byteArrayTypeId = new[] { (byte)'r' };
2729
private static readonly byte[] s_doubleArrayArrayTypeId = new[] { ( byte)'A' };
@@ -263,6 +265,10 @@ internal static void ConvertArgsToBytes(
263265
SerDe.Write(destination, argProvider.Reference.Id);
264266
break;
265267

268+
case Date argDate:
269+
SerDe.Write(destination, argDate.ToString());
270+
break;
271+
266272
default:
267273
throw new NotSupportedException(
268274
string.Format($"Type {arg.GetType()} is not supported"));
@@ -321,6 +327,11 @@ internal static byte[] GetTypeId(Type type)
321327
{
322328
return s_rowArrTypeId;
323329
}
330+
331+
if (typeof(Date).IsAssignableFrom(type))
332+
{
333+
return s_dateTypeId;
334+
}
324335
break;
325336
}
326337

src/csharp/Microsoft.Spark/Sql/CustomPicklers.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.IO;
6+
using Microsoft.Spark.Sql.Types;
67
using Razorvine.Pickle;
78

89
namespace Microsoft.Spark.Sql
@@ -28,4 +29,15 @@ public void pickle(object o, Stream outs, Pickler currentPickler)
2829
currentPickler.save(((GenericRow)o).Values);
2930
}
3031
}
32+
33+
/// <summary>
34+
/// Custom pickler for Date objects.
35+
/// </summary>
36+
internal class DatePickler : IObjectPickler
37+
{
38+
public void pickle(object o, Stream outs, Pickler currentPickler)
39+
{
40+
currentPickler.save(((Date)o).GetInterval());
41+
}
42+
}
3143
}

src/csharp/Microsoft.Spark/Sql/Row.cs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public object Get(string columnName) =>
9797
/// Returns the string version of this row.
9898
/// </summary>
9999
/// <returns>String version of this row</returns>
100-
public override string ToString() => _genericRow.ToString();
100+
public override string ToString() => _genericRow.ToString();
101101

102102
/// <summary>
103103
/// Returns the column value at the given index, as a type T.
@@ -143,23 +143,12 @@ public override bool Equals(object obj) =>
143143
/// </summary>
144144
private void Convert()
145145
{
146-
foreach (StructField field in Schema.Fields)
146+
for (int i = 0; i < Size(); ++i)
147147
{
148-
if (field.DataType is ArrayType)
148+
DataType dataType = Schema.Fields[i].DataType;
149+
if (dataType.NeedConversion())
149150
{
150-
throw new NotImplementedException();
151-
}
152-
else if (field.DataType is MapType)
153-
{
154-
throw new NotImplementedException();
155-
}
156-
else if (field.DataType is DecimalType)
157-
{
158-
throw new NotImplementedException();
159-
}
160-
else if (field.DataType is DateType)
161-
{
162-
throw new NotImplementedException();
151+
Values[i] = dataType.FromInternal(Values[i]);
163152
}
164153
}
165154
}

src/csharp/Microsoft.Spark/Sql/SparkSession.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,14 @@ public DataFrame CreateDataFrame(IEnumerable<double> data) =>
188188
public DataFrame CreateDataFrame(IEnumerable<bool> data) =>
189189
CreateDataFrame(ToGenericRows(data), SchemaWithSingleColumn(new BooleanType()));
190190

191+
/// <summary>
192+
/// Creates a Dataframe given data as <see cref="IEnumerable"/> of type <see cref="Date"/>
193+
/// </summary>
194+
/// <param name="data"></param>
195+
/// <returns>Dataframe object</returns>
196+
public DataFrame CreateDataFrame(IEnumerable<Date> data) =>
197+
CreateDataFrame(ToGenericRows(data), SchemaWithSingleColumn(new DateType()));
198+
191199
/// <summary>
192200
/// Executes a SQL query using Spark, returning the result as a DataFrame.
193201
/// </summary>

src/csharp/Microsoft.Spark/Sql/Types/ComplexTypes.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ private DataType FromJson(JObject json)
6868
ContainsNull = (bool)json["containsNull"];
6969
return this;
7070
}
71+
72+
internal override bool NeedConversion() => true;
73+
74+
internal override object FromInternal(object obj) => throw new NotImplementedException();
7175
}
7276

7377
/// <summary>
@@ -137,6 +141,10 @@ private DataType FromJson(JObject json)
137141
ValueContainsNull = (bool)json["valueContainsNull"];
138142
return this;
139143
}
144+
145+
internal override bool NeedConversion() => true;
146+
147+
internal override object FromInternal(object obj) => throw new NotImplementedException();
140148
}
141149

142150
/// <summary>

0 commit comments

Comments
 (0)