Skip to content

Commit eed2789

Browse files
HackPointGenady ShmunikCurtHagenlocher
authored
GH-44800: [C#] Implement Flight SQL Client (#44783)
GH-44800: [C#] Implement Flight SQL Client ## Rationale for this Change This pull request introduces a **new implementation of `FlightSqlClient` and `PreparedStatement` in C#**. Previously, there was no C# client for Flight SQL, leaving a significant gap for .NET developers who wished to interact with Flight SQL servers. The implementation aligns with the existing **C++ Flight SQL client** API, ensuring consistent and familiar behavior across languages and providing a robust client for the Apache Arrow ecosystem in .NET. --- ## What's Included in this PR? ### Key Features 1. **`FlightSqlClient`**: - Provides query execution (`ExecuteAsync`, `ExecuteUpdateAsync`) and schema retrieval (`GetCatalogsAsync`, `GetDbSchemasAsync`, etc.). - Implements metadata operations for catalogs, schemas, tables, and more. - Fully integrated with gRPC and Apache Arrow ecosystems. - Supports extensibility for advanced features like transactions. 2. **`PreparedStatement`**: - Implements parameterized query execution (`SetParameters`, `ExecuteAsync`, and `ExecuteUpdateAsync`). - Supports lifecycle management (`CloseAsync`) for effective resource handling. - Aligns with the prepared statement design in the C++ client. ### API Consistency This implementation mirrors the **C++ Flight SQL client** to ensure API alignment across supported languages: - Consistent naming conventions and parameter semantics. - Ensures .NET developers can work seamlessly with existing Flight SQL servers. --- ## Are These Changes Tested? ### Testing Overview 1. **Unit Tests**: - Added tests for query execution and parameter binding in `PreparedStatement`. - Verified schema retrieval methods like `GetCatalogsAsync` and `GetDbSchemasAsync`. 2. **Integration Tests**: - Tested against a live Flight SQL server to validate query execution, schema retrieval, and metadata operations. 3. **End-to-End Tests**: - Covered real-world scenarios for parameterized updates and queries, ensuring robustness. ### Example Test Cases - Verify that parameterized queries return correct results with valid input. - Ensure schema retrieval throws appropriate exceptions for invalid descriptors. - Validate row counts after `ExecuteUpdateAsync`. --- ## Are There Any Breaking Changes? This PR introduces **new functionality** and does not affect any existing features. There are **no breaking changes**. --- ## Are There Any User-Facing Changes? ### New Capabilities 1. **FlightSqlClient**: - Query execution and schema retrieval for SQL queries on Flight SQL servers. - Metadata retrieval for catalogs, schemas, tables, and more. 2. **PreparedStatement**: - Supports parameterized queries with proper parameter binding. - Provides robust lifecycle management and execution. ### API Consistency - Aligns with the C++ Flight SQL client for interoperability and familiar API design. --- ## Additional Notes - This PR **does not include transaction support** at this stage, as it requires additional server-side capabilities. - All methods follow idiomatic C# practices, including `async/await` for non-blocking operations. - Extensible for future enhancements, including advanced features like savepoints. --- ## Resources - **C++ Flight SQL Client Reference**: [Apache Arrow Flight SQL Documentation](https://arrow.apache.org/docs/) - **Apache Arrow Contribution Guide**: [Contributing to Apache Arrow](https://arrow.apache.org/docs/dev/developers/guide/) --- ## Feedback and Suggestions Thank you for reviewing this contribution! Suggestions and feedback are welcome to ensure the implementation meets the project's standards and requirements. * GitHub Issue: #44800 Lead-authored-by: HackPoint <[email protected]> Co-authored-by: Genady Shmunik <[email protected]> Co-authored-by: HackP0!nt <[email protected]> Co-authored-by: HackPoint <[email protected]> Co-authored-by: Curt Hagenlocher <[email protected]> Signed-off-by: Curt Hagenlocher <[email protected]>
1 parent fc6b4b0 commit eed2789

23 files changed

+3563
-6
lines changed

csharp/Apache.Arrow.sln

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Flight", "src\
1717
EndProject
1818
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Flight.AspNetCore", "src\Apache.Arrow.Flight.AspNetCore\Apache.Arrow.Flight.AspNetCore.csproj", "{E4F74938-E8FF-4AC1-A495-FEE95FC1EFDF}"
1919
EndProject
20-
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.IntegrationTest", "test\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj", "{E8264B7F-B680-4A55-939B-85DB628164BB}"
21-
EndProject
2220
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression", "src\Apache.Arrow.Compression\Apache.Arrow.Compression.csproj", "{B62E77D2-D0B0-4C0C-BA78-1C117DE4C299}"
2321
EndProject
2422
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression.Tests", "test\Apache.Arrow.Compression.Tests\Apache.Arrow.Compression.Tests.csproj", "{5D7FF380-B7DF-4752-B415-7C08C70C9F06}"
@@ -29,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "
2927
EndProject
3028
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
3129
EndProject
30+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.IntegrationTest", "test\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj", "{E8264B7F-B680-4A55-939B-85DB628164BB}"
31+
EndProject
3232
Global
3333
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3434
Debug|Any CPU = Debug|Any CPU

csharp/src/Apache.Arrow.Flight.Sql/Client/FlightSqlClient.cs

Lines changed: 1047 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System.Threading;
17+
using System.Threading.Tasks;
18+
using Apache.Arrow.Flight.Client;
19+
using Grpc.Core;
20+
21+
namespace Apache.Arrow.Flight.Sql;
22+
23+
public class DoPutResult
24+
{
25+
public FlightClientRecordBatchStreamWriter Writer { get; }
26+
public IAsyncStreamReader<FlightPutResult> Reader { get; }
27+
28+
public DoPutResult(FlightClientRecordBatchStreamWriter writer, IAsyncStreamReader<FlightPutResult> reader)
29+
{
30+
Writer = writer;
31+
Reader = reader;
32+
}
33+
34+
/// <summary>
35+
/// Reads the metadata asynchronously from the reader.
36+
/// </summary>
37+
/// <returns>A ByteString containing the metadata read from the reader.</returns>
38+
public async Task<Google.Protobuf.ByteString> ReadMetadataAsync(CancellationToken cancellationToken = default)
39+
{
40+
if (await Reader.MoveNext(cancellationToken).ConfigureAwait(false))
41+
{
42+
return Reader.Current.ApplicationMetadata;
43+
}
44+
throw new RpcException(new Status(StatusCode.Internal, "No metadata available in the response stream."));
45+
}
46+
47+
/// <summary>
48+
/// Completes the writer by signaling the end of the writing process.
49+
/// </summary>
50+
/// <returns>A Task representing the completion of the writer.</returns>
51+
public async Task CompleteAsync()
52+
{
53+
await Writer.CompleteAsync().ConfigureAwait(false);
54+
}
55+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using System.Buffers;
18+
using System.Threading;
19+
using Grpc.Core;
20+
21+
namespace Apache.Arrow.Flight.Sql;
22+
23+
public class FlightCallOptions
24+
{
25+
public FlightCallOptions()
26+
{
27+
Timeout = TimeSpan.FromSeconds(-1);
28+
}
29+
30+
// Implement any necessary options for RPC calls
31+
public Metadata Headers { get; set; } = new();
32+
33+
/// <summary>
34+
/// Gets or sets the optional timeout for this call.
35+
/// Negative durations mean an implementation-defined default behavior will be used instead.
36+
/// </summary>
37+
public TimeSpan Timeout { get; set; }
38+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using Google.Protobuf;
18+
using Google.Protobuf.WellKnownTypes;
19+
20+
namespace Apache.Arrow.Flight.Sql;
21+
22+
internal static class FlightExtensions
23+
{
24+
public static byte[] PackAndSerialize(this IMessage command) => Any.Pack(command).ToByteArray();
25+
26+
public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() =>
27+
Any.Parser.ParseFrom(source).Unpack<T>();
28+
29+
public static int ExtractRowCount(this RecordBatch batch)
30+
{
31+
if (batch.ColumnCount == 0) return 0;
32+
int length = batch.Column(0).Length;
33+
foreach (var column in batch.Arrays)
34+
{
35+
if (column.Length != length)
36+
throw new InvalidOperationException("Inconsistent column lengths in RecordBatch.");
37+
}
38+
39+
return length;
40+
}
41+
}

0 commit comments

Comments
 (0)