Skip to content

Commit 0e7644c

Browse files
author
David Coe
committed
move Databricks out of Spark
1 parent d6a66b5 commit 0e7644c

34 files changed

+1571
-188
lines changed

csharp/Apache.Arrow.Adbc.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks", "Benchmarks\Be
4040
EndProject
4141
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Adbc.Drivers.Databricks", "src\Drivers\Databricks\Apache.Arrow.Adbc.Drivers.Databricks.csproj", "{25042111-6B86-8B75-7EF6-5BFAA36F72B1}"
4242
EndProject
43+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Adbc.Tests.Drivers.Databricks", "test\Drivers\Databricks\Apache.Arrow.Adbc.Tests.Drivers.Databricks.csproj", "{BA07EB2C-5246-EB72-153C-493C7E7412D2}"
44+
EndProject
4345
Global
4446
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4547
Debug|Any CPU = Debug|Any CPU
@@ -106,6 +108,10 @@ Global
106108
{25042111-6B86-8B75-7EF6-5BFAA36F72B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
107109
{25042111-6B86-8B75-7EF6-5BFAA36F72B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
108110
{25042111-6B86-8B75-7EF6-5BFAA36F72B1}.Release|Any CPU.Build.0 = Release|Any CPU
111+
{BA07EB2C-5246-EB72-153C-493C7E7412D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
112+
{BA07EB2C-5246-EB72-153C-493C7E7412D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
113+
{BA07EB2C-5246-EB72-153C-493C7E7412D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
114+
{BA07EB2C-5246-EB72-153C-493C7E7412D2}.Release|Any CPU.Build.0 = Release|Any CPU
109115
EndGlobalSection
110116
GlobalSection(SolutionProperties) = preSolution
111117
HideSolutionNode = FALSE
@@ -126,6 +132,7 @@ Global
126132
{5B27FB02-D4AE-4ACB-AD88-5E64EEB61729} = {C7290227-E925-47E7-8B6B-A8B171645D58}
127133
{BAF2CF14-BA77-429E-AF54-A34B978E9F5C} = {5BD04C26-CE52-4893-8C1A-479705195CEF}
128134
{25042111-6B86-8B75-7EF6-5BFAA36F72B1} = {FEB257A0-4FD3-495E-9A47-9E1649755445}
135+
{BA07EB2C-5246-EB72-153C-493C7E7412D2} = {C7290227-E925-47E7-8B6B-A8B171645D58}
129136
EndGlobalSection
130137
GlobalSection(ExtensibilityGlobals) = postSolution
131138
SolutionGuid = {4795CF16-0FDB-4BE0-9768-5CF31564DC03}

csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44
<TargetFrameworks>netstandard2.0;net472;net6.0</TargetFrameworks>
55
</PropertyGroup>
66

7+
<ItemGroup>
8+
<Compile Remove="Spark\SparkDatabricksConnection.cs" />
9+
<Compile Remove="Spark\SparkDatabricksReader.cs" />
10+
<Compile Remove="Spark\SparkDatabricksSchemaParser.cs" />
11+
</ItemGroup>
12+
713
<ItemGroup>
814
<PackageReference Include="ApacheThrift" Version="0.21.0" />
915
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.8" />

csharp/src/Drivers/Apache/AssemblyInfo.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
using System.Runtime.CompilerServices;
1919

2020
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Drivers.Databricks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
21+
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]

csharp/src/Drivers/Apache/Spark/SparkConnectionFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static SparkConnection NewConnection(IReadOnlyDictionary<string, string>
3535

3636
return serverTypeValue switch
3737
{
38-
SparkServerType.Databricks => new SparkDatabricksConnection(properties),
38+
//SparkServerType.Databricks => new SparkDatabricksConnection(properties),
3939
SparkServerType.Http => new SparkHttpConnection(properties),
4040
// TODO: Re-enable when properly supported
4141
//SparkServerType.Standard => new SparkStandardConnection(properties),

csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -181,25 +181,7 @@ protected override TTransport CreateTransport()
181181
return transport;
182182
}
183183

184-
internal HttpClientHandler NewHttpClientHandler()
185-
{
186-
HttpClientHandler httpClientHandler = new();
187-
if (TlsOptions != HiveServer2TlsOption.Empty)
188-
{
189-
httpClientHandler.ServerCertificateCustomValidationCallback = (request, certificate, chain, policyErrors) =>
190-
{
191-
if (policyErrors == SslPolicyErrors.None) return true;
192-
193-
return
194-
(!policyErrors.HasFlag(SslPolicyErrors.RemoteCertificateChainErrors) || TlsOptions.HasFlag(HiveServer2TlsOption.AllowSelfSigned))
195-
&& (!policyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch) || TlsOptions.HasFlag(HiveServer2TlsOption.AllowHostnameMismatch));
196-
};
197-
}
198-
199-
return httpClientHandler;
200-
}
201-
202-
internal static AuthenticationHeaderValue? GetAuthenticationHeaderValue(SparkAuthType authType, string? token, string? username, string? password, string? access_token)
184+
private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(SparkAuthType authType, string? token, string? username, string? password, string? access_token)
203185
{
204186
if (!string.IsNullOrEmpty(token) && (authType == SparkAuthType.Empty || authType == SparkAuthType.Token))
205187
{

csharp/src/Drivers/Apache/Spark/SparkParameters.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static class SparkAuthTypeConstants
6666
public static class SparkServerTypeConstants
6767
{
6868
public const string Http = "http";
69-
public const string Databricks = "databricks";
69+
//public const string Databricks = "databricks";
7070
public const string Standard = "standard";
7171
}
7272
}

csharp/src/Drivers/Apache/Spark/SparkServerType.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
2020
internal enum SparkServerType
2121
{
2222
Http,
23-
Databricks,
23+
//Databricks,
2424
Standard,
2525
Empty = int.MaxValue,
2626
}
2727

2828
internal static class ServerTypeParser
2929
{
30-
internal const string SupportedList = SparkServerTypeConstants.Http + ", " + SparkServerTypeConstants.Databricks;
30+
internal const string SupportedList = SparkServerTypeConstants.Http;// + ", " + SparkServerTypeConstants.Databricks;
3131

3232
internal static bool TryParse(string? serverType, out SparkServerType serverTypeValue)
3333
{
@@ -37,9 +37,9 @@ internal static bool TryParse(string? serverType, out SparkServerType serverType
3737
case "":
3838
serverTypeValue = SparkServerType.Empty;
3939
return true;
40-
case SparkServerTypeConstants.Databricks:
41-
serverTypeValue = SparkServerType.Databricks;
42-
return true;
40+
//case SparkServerTypeConstants.Databricks:
41+
// serverTypeValue = SparkServerType.Databricks;
42+
// return true;
4343
case SparkServerTypeConstants.Http:
4444
serverTypeValue = SparkServerType.Http;
4545
return true;

csharp/src/Drivers/Databricks/AssemblyInfo.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717

1818
using System.Runtime.CompilerServices;
1919

20-
//[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
20+
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
Lines changed: 66 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one or more
33
* contributor license agreements. See the NOTICE file distributed with
44
* this work for additional information regarding copyright ownership.
@@ -15,70 +15,90 @@
1515
* limitations under the License.
1616
*/
1717

18-
using System;
1918
using System.Collections.Generic;
20-
using System.Net.Http;
21-
using System.Net.Http.Headers;
19+
using System.Threading;
20+
using System.Threading.Tasks;
2221
using Apache.Arrow.Adbc.Drivers.Apache;
2322
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
24-
using Thrift;
25-
using Thrift.Transport;
23+
using Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch;
24+
using Apache.Arrow.Ipc;
25+
using Apache.Hive.Service.Rpc.Thrift;
2626

2727
namespace Apache.Arrow.Adbc.Drivers.Databricks
2828
{
29-
/// <summary>
30-
/// Databricks-specific implementation of <see cref="AdbcConnection"/>
31-
/// </summary>
32-
internal class DatabricksConnection : SparkDatabricksConnection
29+
internal class DatabricksConnection : SparkHttpConnection
3330
{
34-
protected new const string ProductVersionDefault = "1.0.0";
35-
protected new const string DriverName = "ADBC Databricks Driver";
36-
private const string ArrowVersion = "1.0.0";
37-
private static readonly string s_userAgent = $"{DriverName.Replace(" ", "")}/{ProductVersionDefault}";
38-
3931
public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
4032
{
4133
}
4234

43-
protected override TTransport CreateTransport()
35+
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
4436
{
45-
// Assumption: parameters have already been validated.
46-
Properties.TryGetValue(SparkParameters.HostName, out string? hostName);
47-
Properties.TryGetValue(SparkParameters.Path, out string? path);
48-
Properties.TryGetValue(SparkParameters.Port, out string? port);
49-
Properties.TryGetValue(SparkParameters.AuthType, out string? authType);
50-
if (!SparkAuthTypeParser.TryParse(authType, out SparkAuthType authTypeValue))
37+
// Get result format from metadata response if available
38+
TSparkRowSetType resultFormat = TSparkRowSetType.ARROW_BASED_SET;
39+
bool isLz4Compressed = false;
40+
41+
if (metadataResp != null)
42+
{
43+
if (metadataResp.__isset.resultFormat)
44+
{
45+
resultFormat = metadataResp.ResultFormat;
46+
}
47+
48+
if (metadataResp.__isset.lz4Compressed)
49+
{
50+
isLz4Compressed = metadataResp.Lz4Compressed;
51+
}
52+
}
53+
54+
// Choose the appropriate reader based on the result format
55+
if (resultFormat == TSparkRowSetType.URL_BASED_SET)
5156
{
52-
throw new ArgumentOutOfRangeException(SparkParameters.AuthType, authType, $"Unsupported {SparkParameters.AuthType} value.");
57+
return new SparkCloudFetchReader(statement, schema, isLz4Compressed);
5358
}
54-
Properties.TryGetValue(SparkParameters.Token, out string? token);
55-
Properties.TryGetValue(SparkParameters.AccessToken, out string? access_token);
56-
Properties.TryGetValue(AdbcOptions.Username, out string? username);
57-
Properties.TryGetValue(AdbcOptions.Password, out string? password);
58-
Properties.TryGetValue(AdbcOptions.Uri, out string? uri);
59+
else
60+
{
61+
return new DatabricksReader(statement, schema);
62+
}
63+
}
5964

60-
Uri baseAddress = GetBaseAddress(uri, hostName, path, port, SparkParameters.HostName);
61-
AuthenticationHeaderValue? authenticationHeaderValue = GetAuthenticationHeaderValue(authTypeValue, token, username, password, access_token);
65+
internal override SchemaParser SchemaParser => new DatabricksSchemaParser();
6266

63-
HttpClientHandler httpClientHandler = NewHttpClientHandler();
64-
Lz4CompressionHandler lz4CompressionHandler = new Lz4CompressionHandler { InnerHandler = httpClientHandler };
65-
HttpClient httpClient = new(lz4CompressionHandler);
66-
httpClient.BaseAddress = baseAddress;
67-
httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue;
68-
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(s_userAgent);
69-
httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
70-
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
71-
httpClient.DefaultRequestHeaders.ExpectContinue = false;
67+
//internal override SparkServerType ServerType => SparkServerType.Databricks;
7268

73-
TConfiguration config = new();
74-
ThriftHttpTransport transport = new(httpClient, config)
69+
protected override TOpenSessionReq CreateSessionRequest()
70+
{
71+
var req = new TOpenSessionReq
7572
{
76-
// This value can only be set before the first call/request. So if a new value for query timeout
77-
// is set, we won't be able to update the value. Setting to ~infinite and relying on cancellation token
78-
// to ensure cancelled correctly.
79-
ConnectTimeout = int.MaxValue,
73+
Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
74+
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
75+
CanUseMultipleCatalogs = true,
8076
};
81-
return transport;
77+
return req;
8278
}
79+
80+
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
81+
Task.FromResult(response.DirectResults.ResultSetMetadata);
82+
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
83+
Task.FromResult(response.DirectResults.ResultSetMetadata);
84+
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response, CancellationToken cancellationToken = default) =>
85+
Task.FromResult(response.DirectResults.ResultSetMetadata);
86+
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken cancellationToken = default) =>
87+
Task.FromResult(response.DirectResults.ResultSetMetadata);
88+
protected internal override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken cancellationToken = default) =>
89+
Task.FromResult(response.DirectResults.ResultSetMetadata);
90+
91+
protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response, CancellationToken cancellationToken = default) =>
92+
Task.FromResult(response.DirectResults.ResultSet.Results);
93+
protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp response, CancellationToken cancellationToken = default) =>
94+
Task.FromResult(response.DirectResults.ResultSet.Results);
95+
protected override Task<TRowSet> GetRowSetAsync(TGetTablesResp response, CancellationToken cancellationToken = default) =>
96+
Task.FromResult(response.DirectResults.ResultSet.Results);
97+
protected override Task<TRowSet> GetRowSetAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
98+
Task.FromResult(response.DirectResults.ResultSet.Results);
99+
protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
100+
Task.FromResult(response.DirectResults.ResultSet.Results);
101+
protected internal override Task<TRowSet> GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken cancellationToken = default) =>
102+
Task.FromResult(response.DirectResults.ResultSet.Results);
83103
}
84104
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System.Collections.Generic;
19+
using System.Threading;
20+
using System.Threading.Tasks;
21+
using Apache.Arrow.Adbc.Drivers.Apache;
22+
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
23+
using Apache.Arrow.Ipc;
24+
using Apache.Hive.Service.Rpc.Thrift;
25+
26+
namespace Apache.Arrow.Adbc.Drivers.Databricks
27+
{
28+
internal sealed class DatabricksReader : IArrowArrayStream
29+
{
30+
HiveServer2Statement? statement;
31+
Schema schema;
32+
List<TSparkArrowBatch>? batches;
33+
int index;
34+
IArrowReader? reader;
35+
36+
public DatabricksReader(HiveServer2Statement statement, Schema schema)
37+
{
38+
this.statement = statement;
39+
this.schema = schema;
40+
}
41+
42+
public Schema Schema { get { return schema; } }
43+
44+
public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
45+
{
46+
while (true)
47+
{
48+
if (this.reader != null)
49+
{
50+
RecordBatch? next = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
51+
if (next != null)
52+
{
53+
return next;
54+
}
55+
this.reader = null;
56+
}
57+
58+
if (this.batches != null && this.index < this.batches.Count)
59+
{
60+
this.reader = new ArrowStreamReader(new ChunkStream(this.schema, this.batches[this.index++].Batch));
61+
continue;
62+
}
63+
64+
this.batches = null;
65+
this.index = 0;
66+
67+
if (this.statement == null)
68+
{
69+
return null;
70+
}
71+
72+
TFetchResultsReq request = new TFetchResultsReq(this.statement.OperationHandle!, TFetchOrientation.FETCH_NEXT, this.statement.BatchSize);
73+
TFetchResultsResp response = await this.statement.Connection.Client!.FetchResults(request, cancellationToken);
74+
this.batches = response.Results.ArrowBatches;
75+
76+
if (!response.HasMoreRows)
77+
{
78+
this.statement = null;
79+
}
80+
}
81+
}
82+
83+
public void Dispose()
84+
{
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)