Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/Eftdb.Benchmarks/WriteRecordsBenchmarkBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public async Task GlobalCleanup()
public void IterationSetup()
{
Trades.Clear();
var random = new Random();
Random random = new();
string[] tickers = ["AAPL", "GOOGL", "MSFT", "TSLA", "AMZN"];
var baseTimestamp = DateTime.UtcNow.AddMinutes(-30);
DateTime baseTimestamp = DateTime.UtcNow.AddMinutes(-30);

for (int i = 0; i < NumberOfRecords; i++)
{
var trade = CreateTradeInstance(i, baseTimestamp, tickers[random.Next(tickers.Length)], random);
T trade = CreateTradeInstance(i, baseTimestamp, tickers[random.Next(tickers.Length)], random);
Trades.Add(trade);
}

Expand Down
6 changes: 2 additions & 4 deletions src/Eftdb.Design/TimescaleDatabaseModelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ public override DatabaseModel Create(DbConnection connection, DatabaseModelFacto
DatabaseModel databaseModel = base.Create(connection, options);

// Extract all TimescaleDB features from the database
var allFeatureData = _features
.Select(feature => feature.Extractor.Extract(connection))
.ToList();
List<Dictionary<(string Schema, string TableName), object>> allFeatureData = [.. _features.Select(feature => feature.Extractor.Extract(connection))];

// Apply annotations to tables/views in the model
foreach (DatabaseTable table in databaseModel.Tables)
Expand All @@ -42,7 +40,7 @@ public override DatabaseModel Create(DbConnection connection, DatabaseModelFacto
// Apply each feature's annotations if the table has that feature
for (int i = 0; i < _features.Count; i++)
{
var featureData = allFeatureData[i];
Dictionary<(string Schema, string TableName), object> featureData = allFeatureData[i];
if (featureData.TryGetValue(tableKey, out object? featureInfo))
{
_features[i].Applier.ApplyAnnotations(table, featureInfo);
Expand Down
10 changes: 5 additions & 5 deletions src/Eftdb/Abstractions/TimescaleCopyConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public TimescaleCopyConfig()
if (MapClrTypeToNpgsqlDbType(property.PropertyType, out NpgsqlDbType dbType))
{
// Auto-discover properties and create compiled getters for them.
var parameter = Expression.Parameter(typeof(T), "x");
var member = Expression.Property(parameter, property);
var conversion = Expression.Convert(member, typeof(object));
var lambda = Expression.Lambda<Func<T, object>>(conversion, parameter);
ParameterExpression parameter = Expression.Parameter(typeof(T), "x");
MemberExpression member = Expression.Property(parameter, property);
UnaryExpression conversion = Expression.Convert(member, typeof(object));
Expression<Func<T, object>> lambda = Expression.Lambda<Func<T, object>>(conversion, parameter);

ColumnMappings[property.Name] = (lambda.Compile(), dbType);
}
Expand Down Expand Up @@ -124,7 +124,7 @@ public TimescaleCopyConfig<T> MapColumn(string columnName, Expression<Func<T, ob
private static bool MapClrTypeToNpgsqlDbType(Type clrType, out NpgsqlDbType dbType)
{
// Handle nullable value types by getting the underlying type
var underlyingType = Nullable.GetUnderlyingType(clrType) ?? clrType;
Type underlyingType = Nullable.GetUnderlyingType(clrType) ?? clrType;

// This map contains the default CLR type to NpgsqlDbType mappings
// based on the Npgsql "Write Mappings" documentation.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using CmdScale.EntityFrameworkCore.TimescaleDB.Operations;
using System.Text;

namespace CmdScale.EntityFrameworkCore.TimescaleDB.Generators
{
Expand Down Expand Up @@ -114,7 +115,7 @@ public List<string> Generate(CreateContinuousAggregateOperation operation)
}

// Build the complete CREATE MATERIALIZED VIEW statement as a single string
var sqlBuilder = new System.Text.StringBuilder();
StringBuilder sqlBuilder = new();
sqlBuilder.Append($"CREATE MATERIALIZED VIEW {qualifiedIdentifier}");
sqlBuilder.AppendLine();
sqlBuilder.Append($"WITH ({string.Join(", ", withOptions)}) AS");
Expand Down
100 changes: 74 additions & 26 deletions src/Eftdb/Generators/HypertableOperationGenerator.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
using CmdScale.EntityFrameworkCore.TimescaleDB.Operations;
using System.Text;

namespace CmdScale.EntityFrameworkCore.TimescaleDB.Generators
{
Expand All @@ -16,56 +17,58 @@ public HypertableOperationGenerator(bool isDesignTime = false)
}

sqlHelper = new SqlBuilderHelper(quoteString);

}

public List<string> Generate(CreateHypertableOperation operation)
{
string qualifiedTableName = sqlHelper.Regclass(operation.TableName, operation.Schema);
string qualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.TableName, operation.Schema);

string migrateDataParam = operation.MigrateData ? ", migrate_data => true" : "";
List<string> statements = [];
List<string> communityStatements = [];

List<string> statements =
[
$"SELECT create_hypertable({qualifiedTableName}, '{operation.TimeColumnName}'{migrateDataParam});"
];
// Build create_hypertable with chunk_time_interval if provided
StringBuilder createHypertableCall = new();
createHypertableCall.Append($"SELECT create_hypertable({qualifiedTableName}, '{operation.TimeColumnName}'");
createHypertableCall.Append(operation.MigrateData ? ", migrate_data => true" : "");

// ChunkTimeInterval
if (!string.IsNullOrEmpty(operation.ChunkTimeInterval))
{
// Check if the interval is a plain number (e.g., for microseconds).
if (long.TryParse(operation.ChunkTimeInterval, out _))
{
// If it's a number, don't wrap it in quotes.
statements.Add($"SELECT set_chunk_time_interval({qualifiedTableName}, {operation.ChunkTimeInterval}::bigint);");
createHypertableCall.Append($", chunk_time_interval => {operation.ChunkTimeInterval}::bigint");
}
else
{
// If it's a string like '7 days', wrap it in quotes.
statements.Add($"SELECT set_chunk_time_interval({qualifiedTableName}, INTERVAL '{operation.ChunkTimeInterval}');");
createHypertableCall.Append($", chunk_time_interval => INTERVAL '{operation.ChunkTimeInterval}'");
}
}

// EnableCompression
createHypertableCall.Append(");");
statements.Add(createHypertableCall.ToString());

// EnableCompression (Community Edition only)
if (operation.EnableCompression || operation.ChunkSkipColumns?.Count > 0)
{
bool enableCompression = operation.EnableCompression || operation.ChunkSkipColumns != null && operation.ChunkSkipColumns.Count > 0;
statements.Add($"ALTER TABLE {qualifiedIdentifier} SET (timescaledb.compress = {enableCompression.ToString().ToLower()});");
communityStatements.Add($"ALTER TABLE {qualifiedIdentifier} SET (timescaledb.compress = {enableCompression.ToString().ToLower()});");
}

// ChunkSkipColumns
// ChunkSkipColumns (Community Edition only)
if (operation.ChunkSkipColumns != null && operation.ChunkSkipColumns.Count > 0)
{
statements.Add("SET timescaledb.enable_chunk_skipping = 'ON';");
communityStatements.Add("SET timescaledb.enable_chunk_skipping = 'ON';");

foreach (string column in operation.ChunkSkipColumns)
{
statements.Add($"SELECT enable_chunk_skipping({qualifiedTableName}, '{column}');");
communityStatements.Add($"SELECT enable_chunk_skipping({qualifiedTableName}, '{column}');");
}
}

// AdditionalDimensions
// AdditionalDimensions (Available in both editions)
if (operation.AdditionalDimensions != null && operation.AdditionalDimensions.Count > 0)
{
foreach (Dimension dimension in operation.AdditionalDimensions)
Expand All @@ -87,6 +90,11 @@ public List<string> Generate(CreateHypertableOperation operation)
}
}

// Add wrapped community statements if any exist
if (communityStatements.Count > 0)
{
statements.Add(WrapCommunityFeatures(communityStatements));
}
return statements;
}

Expand All @@ -96,45 +104,52 @@ public List<string> Generate(AlterHypertableOperation operation)
string qualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.TableName, operation.Schema);

List<string> statements = [];
List<string> communityStatements = [];

// Check for ChunkTimeInterval change
// Check for ChunkTimeInterval change (Available in both editions)
if (operation.ChunkTimeInterval != operation.OldChunkTimeInterval)
{
StringBuilder setChunkTimeInterval = new();
setChunkTimeInterval.Append($"SELECT set_chunk_time_interval({qualifiedTableName}, ");

// Check if the interval is a plain number (e.g., for microseconds).
if (long.TryParse(operation.ChunkTimeInterval, out _))
{
// If it's a number, don't wrap it in quotes.
statements.Add($"SELECT set_chunk_time_interval({qualifiedTableName}, {operation.ChunkTimeInterval}::bigint);");
setChunkTimeInterval.Append($"{operation.ChunkTimeInterval}::bigint");
}
else
{
// If it's a string like '7 days', wrap it in quotes.
statements.Add($"SELECT set_chunk_time_interval({qualifiedTableName}, INTERVAL '{operation.ChunkTimeInterval}');");
setChunkTimeInterval.Append($"INTERVAL '{operation.ChunkTimeInterval}'");
}

setChunkTimeInterval.Append(");");
statements.Add(setChunkTimeInterval.ToString());
}

// Check for EnableCompression change
// Check for EnableCompression change (Community Edition only)
bool newCompressionState = operation.EnableCompression || operation.ChunkSkipColumns != null && operation.ChunkSkipColumns.Any();
bool oldCompressionState = operation.OldEnableCompression || operation.OldChunkSkipColumns != null && operation.OldChunkSkipColumns.Any();

if (newCompressionState != oldCompressionState)
{
string compressionValue = newCompressionState.ToString().ToLower();
statements.Add($"ALTER TABLE {qualifiedIdentifier} SET (timescaledb.compress = {compressionValue});");
communityStatements.Add($"ALTER TABLE {qualifiedIdentifier} SET (timescaledb.compress = {compressionValue});");
}

// Handle ChunkSkipColumns
// Handle ChunkSkipColumns (Community Edition only)
IReadOnlyList<string> newColumns = operation.ChunkSkipColumns ?? [];
IReadOnlyList<string> oldColumns = operation.OldChunkSkipColumns ?? [];
List<string> addedColumns = [.. newColumns.Except(oldColumns)];

if (addedColumns.Count != 0)
{
statements.Add("SET timescaledb.enable_chunk_skipping = 'ON';");
communityStatements.Add("SET timescaledb.enable_chunk_skipping = 'ON';");

foreach (string column in addedColumns)
{
statements.Add($"SELECT enable_chunk_skipping({qualifiedTableName}, '{column}');");
communityStatements.Add($"SELECT enable_chunk_skipping({qualifiedTableName}, '{column}');");
}
}

Expand All @@ -143,7 +158,7 @@ public List<string> Generate(AlterHypertableOperation operation)
{
foreach (string column in removedColumns)
{
statements.Add($"SELECT disable_chunk_skipping({qualifiedTableName}, '{column}');");
communityStatements.Add($"SELECT disable_chunk_skipping({qualifiedTableName}, '{column}');");
}
}

Expand Down Expand Up @@ -194,8 +209,41 @@ public List<string> Generate(AlterHypertableOperation operation)
statements.Add($"-- WARNING: TimescaleDB does not support removing dimensions. The following dimensions cannot be removed: {dimensionList}");
}

// Add wrapped community statements if any exist
if (communityStatements.Count > 0)
{
statements.Add(WrapCommunityFeatures(communityStatements));
}
return statements;
}
}
}

/// <summary>
/// Wraps multiple SQL statements in a single license check block to ensure they only run on Community Edition.
/// </summary>
private static string WrapCommunityFeatures(List<string> sqlStatements)
{
StringBuilder sb = new();
sb.AppendLine("DO $$");
sb.AppendLine("DECLARE");
sb.AppendLine(" license TEXT;");
sb.AppendLine("BEGIN");
sb.AppendLine(" license := current_setting('timescaledb.license', true);");
sb.AppendLine(" ");
sb.AppendLine(" IF license IS NULL OR license != 'apache' THEN");

foreach (string sql in sqlStatements)
{
// Remove trailing semicolon and escape single quotes for EXECUTE
string cleanSql = sql.TrimEnd(';').Replace("'", "''");
sb.AppendLine($" EXECUTE '{cleanSql}';");
}

sb.AppendLine(" ELSE");
sb.AppendLine(" RAISE WARNING 'Skipping Community Edition features (compression, chunk skipping) - not available in Apache Edition';");
sb.AppendLine(" END IF;");
sb.AppendLine("END $$;");

return sb.ToString();
}
}
}
2 changes: 1 addition & 1 deletion src/Eftdb/TimescaleDbCopyExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static async Task BulkCopyAsync<T>(
await writer.StartRowAsync();

// Write each configured column in the specified order
foreach (var (Getter, DbType) in config.ColumnMappings.Values)
foreach ((Func<T, object?>? Getter, NpgsqlTypes.NpgsqlDbType DbType) in config.ColumnMappings.Values)
{
object? value = Getter(item);
await writer.WriteAsync(value, DbType);
Expand Down
Loading
Loading