Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,45 @@ await client.DisposeAsync();

## Features

- Unary write via gRPC
- **Unary Write** - Simple single-request writes via gRPC
- **Streaming Write** - High-throughput streaming via gRPC for multiple tables
- **Bulk Write** - Maximum throughput via Apache Arrow Flight
- Type coercion between .NET and GreptimeDB types
- Health check
- DI integration

## Streaming Write

For high-throughput scenarios with multiple tables:

```csharp
await using var writer = client.CreateStreamIngestWriter();

// Write multiple tables in a single stream
await writer.WriteAsync(table1);
await writer.WriteAsync(table2);
await writer.WriteAsync(table3);

var affectedRows = await writer.CompleteAsync();
```

## Bulk Write (Arrow Flight)

For maximum throughput using Apache Arrow Flight protocol:

```csharp
// Note: Tables must exist before using BulkWriter
await using var writer = client.CreateBulkWriter();

await writer.WriteAsync(table1);
await writer.WriteAsync(table2);

var affectedRows = await writer.CompleteAsync();
```

> **Note**: Unlike regular gRPC writes, Arrow Flight `DoPut` does not auto-create tables.
> Ensure tables exist before using `BulkWriter`.

## DI Integration

```csharp
Expand Down
107 changes: 107 additions & 0 deletions src/GreptimeDB.Ingester/Arrow/ArrowTypeMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using Apache.Arrow;
using Apache.Arrow.Types;
using GreptimeDB.Ingester.Types;

namespace GreptimeDB.Ingester.Arrow;

/// <summary>
/// Maps GreptimeDB column data types to Apache Arrow types.
/// </summary>
internal static class ArrowTypeMapper
{
/// <summary>
/// Converts a GreptimeDB column data type to an Apache Arrow data type.
/// </summary>
/// <param name="dataType">The GreptimeDB column data type.</param>
/// <returns>The corresponding Apache Arrow data type.</returns>
/// <exception cref="NotSupportedException">Thrown when the data type is not supported.</exception>
public static IArrowType ToArrowType(ColumnDataType dataType)
{
return dataType switch
{
// Boolean
ColumnDataType.Boolean => BooleanType.Default,

// Signed integers
ColumnDataType.Int8 => Int8Type.Default,
ColumnDataType.Int16 => Int16Type.Default,
ColumnDataType.Int32 => Int32Type.Default,
ColumnDataType.Int64 => Int64Type.Default,

// Unsigned integers
ColumnDataType.UInt8 => UInt8Type.Default,
ColumnDataType.UInt16 => UInt16Type.Default,
ColumnDataType.UInt32 => UInt32Type.Default,
ColumnDataType.UInt64 => UInt64Type.Default,

// Floating point
ColumnDataType.Float32 => FloatType.Default,
ColumnDataType.Float64 => DoubleType.Default,

// String and Binary
ColumnDataType.String => StringType.Default,
ColumnDataType.Binary => BinaryType.Default,
ColumnDataType.Json => StringType.Default, // JSON stored as string

// Date and DateTime
ColumnDataType.Date => Date32Type.Default,
ColumnDataType.DateTime => new TimestampType(TimeUnit.Millisecond, (string?)null),

// Timestamps with different precisions
ColumnDataType.TimestampSecond => new TimestampType(TimeUnit.Second, (string?)null),
ColumnDataType.TimestampMillisecond => new TimestampType(TimeUnit.Millisecond, (string?)null),
ColumnDataType.TimestampMicrosecond => new TimestampType(TimeUnit.Microsecond, (string?)null),
ColumnDataType.TimestampNanosecond => new TimestampType(TimeUnit.Nanosecond, (string?)null),

// Time with different precisions
ColumnDataType.TimeSecond => new Time32Type(TimeUnit.Second),
ColumnDataType.TimeMillisecond => new Time32Type(TimeUnit.Millisecond),
ColumnDataType.TimeMicrosecond => new Time64Type(TimeUnit.Microsecond),
ColumnDataType.TimeNanosecond => new Time64Type(TimeUnit.Nanosecond),

_ => throw new NotSupportedException($"Unsupported data type: {dataType}")
};
}

/// <summary>
/// Gets the TimeUnit for a timestamp column data type.
/// </summary>
/// <param name="dataType">The GreptimeDB column data type.</param>
/// <returns>The corresponding TimeUnit.</returns>
/// <exception cref="ArgumentException">Thrown when the data type is not a timestamp type.</exception>
public static TimeUnit GetTimestampUnit(ColumnDataType dataType)
{
return dataType switch
{
ColumnDataType.TimestampSecond => TimeUnit.Second,
ColumnDataType.TimestampMillisecond => TimeUnit.Millisecond,
ColumnDataType.TimestampMicrosecond => TimeUnit.Microsecond,
ColumnDataType.TimestampNanosecond => TimeUnit.Nanosecond,
ColumnDataType.DateTime => TimeUnit.Millisecond,
_ => throw new ArgumentException($"Data type {dataType} is not a timestamp type", nameof(dataType))
};
}

/// <summary>
/// Checks if the data type is a timestamp type.
/// </summary>
public static bool IsTimestampType(ColumnDataType dataType)
{
return dataType is ColumnDataType.TimestampSecond
or ColumnDataType.TimestampMillisecond
or ColumnDataType.TimestampMicrosecond
or ColumnDataType.TimestampNanosecond
or ColumnDataType.DateTime;
}

/// <summary>
/// Checks if the data type is a time type.
/// </summary>
public static bool IsTimeType(ColumnDataType dataType)
{
return dataType is ColumnDataType.TimeSecond
or ColumnDataType.TimeMillisecond
or ColumnDataType.TimeMicrosecond
or ColumnDataType.TimeNanosecond;
}
}
Loading
Loading