Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Elastic.Clients.Elasticsearch</PackageId>
Expand Down Expand Up @@ -31,6 +31,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Elastic.Esql" Version="0.10.0" />
<PackageReference Include="Elastic.Transport" Version="0.15.1" />
<PackageReference Include="PolySharp" Version="1.15.0">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Esql;
using Elastic.Esql.Execution;
using Elastic.Esql.QueryModel;
using Elastic.Transport;

#if NET10_0_OR_GREATER
using System.IO.Pipelines;
#endif

namespace Elastic.Clients.Elasticsearch.Esql;

/// <summary>
/// Implements <see cref="IEsqlQueryExecutor"/> by delegating to the native
/// <see cref="EsqlNamespacedClient"/> typed request/response pipeline.
/// </summary>
internal sealed class EsqlQueryExecutor : IEsqlQueryExecutor
{
private readonly EsqlNamespacedClient _client;

public EsqlQueryExecutor(EsqlNamespacedClient client)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
}

public IEsqlResponse ExecuteQuery(string esql, EsqlParameters? parameters, object? options)
{
var queryOptions = ResolveOptions(options);
var request = BuildQueryRequest(esql, parameters, queryOptions);
request.BeforeRequest();
var response = _client.DoRequest<EsqlQueryRequest, StreamResponse, EsqlQueryRequestParameters>(request);
return new EsqlTransportResponse(response);
}

public async Task<IEsqlAsyncResponse> ExecuteQueryAsync(string esql, EsqlParameters? parameters, object? options, CancellationToken cancellationToken)
{
var queryOptions = ResolveOptions(options);
var request = BuildQueryRequest(esql, parameters, queryOptions);
request.BeforeRequest();
var response = await _client.DoRequestAsync<EsqlQueryRequest, StreamResponse, EsqlQueryRequestParameters>(request, cancellationToken)
.ConfigureAwait(false);
return new EsqlTransportAsyncResponse(response);
}

public IEsqlResponse SubmitAsyncQuery(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions)
{
var queryOptions = ResolveOptions(options);
var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions);
request.BeforeRequest();
var response = _client.DoRequest<AsyncQueryRequest, StreamResponse, AsyncQueryRequestParameters>(request);
return new EsqlTransportResponse(response);
}

public async Task<IEsqlAsyncResponse> SubmitAsyncQueryAsync(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions, CancellationToken cancellationToken)
{
var queryOptions = ResolveOptions(options);
var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions);
request.BeforeRequest();
var response = await _client.DoRequestAsync<AsyncQueryRequest, StreamResponse, AsyncQueryRequestParameters>(request, cancellationToken)
.ConfigureAwait(false);
return new EsqlTransportAsyncResponse(response);
}

public IEsqlResponse PollAsyncQuery(string queryId, object? options)
{
var queryOptions = ResolveOptions(options);
var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json };
if (queryOptions?.RequestConfiguration is not null)
request.RequestConfiguration = queryOptions.RequestConfiguration;
request.BeforeRequest();
var response = _client.DoRequest<AsyncQueryGetRequest, StreamResponse, AsyncQueryGetRequestParameters>(request);
return new EsqlTransportResponse(response);
}

public async Task<IEsqlAsyncResponse> PollAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken)
{
var queryOptions = ResolveOptions(options);
var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json };
if (queryOptions?.RequestConfiguration is not null)
request.RequestConfiguration = queryOptions.RequestConfiguration;
request.BeforeRequest();
var response = await _client.DoRequestAsync<AsyncQueryGetRequest, StreamResponse, AsyncQueryGetRequestParameters>(request, cancellationToken)
.ConfigureAwait(false);
return new EsqlTransportAsyncResponse(response);
}

public void DeleteAsyncQuery(string queryId, object? options)
{
var queryOptions = ResolveOptions(options);
var request = new AsyncQueryDeleteRequest(queryId);
if (queryOptions?.RequestConfiguration is not null)
request.RequestConfiguration = queryOptions.RequestConfiguration;
request.BeforeRequest();
_client.DoRequest<AsyncQueryDeleteRequest, AsyncQueryDeleteResponse, AsyncQueryDeleteRequestParameters>(request);
}

public async Task DeleteAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken)
{
var queryOptions = ResolveOptions(options);
var request = new AsyncQueryDeleteRequest(queryId);
if (queryOptions?.RequestConfiguration is not null)
request.RequestConfiguration = queryOptions.RequestConfiguration;
request.BeforeRequest();
await _client.DoRequestAsync<AsyncQueryDeleteRequest, AsyncQueryDeleteResponse, AsyncQueryDeleteRequestParameters>(request, cancellationToken)
.ConfigureAwait(false);
}

private static EsqlQueryOptions? ResolveOptions(object? options) =>
options as EsqlQueryOptions;

private static EsqlQueryRequest BuildQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? options)
{
var request = new EsqlQueryRequest(esql)
{
Format = EsqlFormat.Json,
Columnar = false,
Params = MergeAndConvertParams(parameters, options?.NamedParameters)
};

ApplyQueryOptions(request, options);
return request;
}

private static AsyncQueryRequest BuildAsyncQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? queryOptions, EsqlAsyncQueryOptions? asyncOptions)
{
var request = new AsyncQueryRequest(esql)
{
Format = EsqlFormat.Json,
Columnar = false,
Params = MergeAndConvertParams(parameters, queryOptions?.NamedParameters)
};

ApplyQueryOptions(request, queryOptions);

if (asyncOptions is not null)
{
if (asyncOptions.WaitForCompletionTimeout is { } waitTimeout)
request.WaitForCompletionTimeout = new Duration(waitTimeout);

if (asyncOptions.KeepAlive is { } keepAlive)
request.KeepAlive = new Duration(keepAlive);

request.KeepOnCompletion = asyncOptions.KeepOnCompletion;
}

return request;
}

private static void ApplyQueryOptions(EsqlQueryRequest request, EsqlQueryOptions? options)
{
if (options is null)
return;

request.Locale = options.Locale;
request.TimeZone = options.TimeZone;
request.Filter = options.Filter;
request.AllowPartialResults = options.AllowPartialResults;
request.DropNullColumns = options.DropNullColumns;
request.ProjectRouting = options.ProjectRouting;

if (options.RequestConfiguration is not null)
request.RequestConfiguration = options.RequestConfiguration;
}

private static void ApplyQueryOptions(AsyncQueryRequest request, EsqlQueryOptions? options)
{
if (options is null)
return;

request.Locale = options.Locale;
request.TimeZone = options.TimeZone;
request.Filter = options.Filter;
request.AllowPartialResults = options.AllowPartialResults;
request.DropNullColumns = options.DropNullColumns;
request.ProjectRouting = options.ProjectRouting;

if (options.RequestConfiguration is not null)
request.RequestConfiguration = options.RequestConfiguration;
}

private static Union<ICollection<ICollection<FieldValue>>, ICollection<KeyValuePair<string, ICollection<FieldValue>>>>?
MergeAndConvertParams(EsqlParameters? translated, Dictionary<string, FieldValue>? userParams)
{
var hasTranslated = translated is not null && translated.HasParameters;
var hasUser = userParams is { Count: > 0 };

if (!hasTranslated && !hasUser)
return null;

var merged = new Dictionary<string, FieldValue>();

if (hasTranslated)
{
foreach (var kvp in translated!.Parameters)
merged[kvp.Key] = ConvertJsonElement(kvp.Value);
}

if (hasUser)
{
foreach (var kvp in userParams!)
merged[kvp.Key] = kvp.Value;
}

var namedParams = new List<KeyValuePair<string, ICollection<FieldValue>>>(merged.Count);
foreach (var kvp in merged)
{
namedParams.Add(new KeyValuePair<string, ICollection<FieldValue>>(
kvp.Key, [kvp.Value]));
}

return new Union<ICollection<ICollection<FieldValue>>, ICollection<KeyValuePair<string, ICollection<FieldValue>>>>(namedParams);

static FieldValue ConvertJsonElement(JsonElement element) =>
element.ValueKind switch
{
JsonValueKind.String => FieldValue.String(element.GetString()!),
JsonValueKind.Number when element.TryGetInt64(out var l) => FieldValue.Long(l),
JsonValueKind.Number => FieldValue.Double(element.GetDouble()),
JsonValueKind.True => FieldValue.True,
JsonValueKind.False => FieldValue.False,
JsonValueKind.Null or JsonValueKind.Undefined => FieldValue.Null,
_ => FieldValue.String(element.GetRawText())
};
}
}

internal sealed class EsqlTransportResponse : IEsqlResponse
{
private readonly StreamResponse _response;

public EsqlTransportResponse(StreamResponse response) => _response = response;

public Stream Body => _response.Body;

public void Dispose() => _response.Dispose();
}

#if NET10_0_OR_GREATER
internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse
{
private readonly StreamResponse _response;

public EsqlTransportAsyncResponse(StreamResponse response)
{
_response = response;
Body = PipeReader.Create(response.Body);
}

public PipeReader Body { get; }

public async ValueTask DisposeAsync()
{
await Body.CompleteAsync().ConfigureAwait(false);
_response.Dispose();
}
}
#else
internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse
{
private readonly StreamResponse _response;

public EsqlTransportAsyncResponse(StreamResponse response) => _response = response;

public Stream Body => _response.Body;

public ValueTask DisposeAsync()
{
_response.Dispose();
return default;
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;

using Elastic.Transport;

namespace Elastic.Clients.Elasticsearch.Esql;

/// <summary>Per-query options for LINQ-to-ES|QL queries executed via WithOptions.</summary>
public sealed record EsqlQueryOptions
{
/// <summary>Per-request transport configuration (timeouts, headers, auth).</summary>
public IRequestConfiguration? RequestConfiguration { get; init; }

/// <summary>If true, partial results will be returned on shard failures.</summary>
public bool? AllowPartialResults { get; init; }

/// <summary>If true, entirely null columns are removed from the response.</summary>
public bool? DropNullColumns { get; init; }

/// <summary>A Query DSL filter applied to the document set before the ES|QL query runs.</summary>
public QueryDsl.Query? Filter { get; init; }

/// <summary>Locale for result formatting (e.g., "en-US").</summary>
public string? Locale { get; init; }

/// <summary>Project routing for serverless cross-project queries.</summary>
public string? ProjectRouting { get; init; }

/// <summary>Default timezone for date operations (e.g., "UTC").</summary>
public string? TimeZone { get; init; }

/// <summary>
/// User-supplied named parameters. Merged with parameters from the translated query.
/// If a key exists in both, NamedParameters takes precedence.
/// </summary>
public Dictionary<string, FieldValue>? NamedParameters { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Linq.Expressions;

using Elastic.Esql.Core;

namespace Elastic.Clients.Elasticsearch.Esql;

/// <summary>Extension methods for attaching query options to LINQ-to-ES|QL queries.</summary>
public static class EsqlQueryableExtensions
{
/// <summary>Attaches ES|QL query options to the query pipeline.</summary>
public static IEsqlQueryable<T> WithOptions<T>(this IEsqlQueryable<T> source, EsqlQueryOptions options)
{
var method = new Func<IEsqlQueryable<T>, EsqlQueryOptions, IEsqlQueryable<T>>(WithOptions).Method;
return (IEsqlQueryable<T>)source.Provider.CreateQuery<T>(
Expression.Call(null, method, source.Expression, Expression.Constant(options)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using Elastic.Esql.Core;
using Elastic.Esql.QueryModel;

namespace Elastic.Clients.Elasticsearch.Esql;

internal sealed class EsqlSourceInferenceInterceptor : IEsqlQueryInterceptor
{
private readonly Inferrer _inferrer;

public EsqlSourceInferenceInterceptor(Inferrer inferrer) => _inferrer = inferrer;

public EsqlQuery Intercept(EsqlQuery query)
{
if (query.Source is not null)
return query;

var indexName = _inferrer.IndexName(query.ElementType);
return query.WithSource(indexName);
}
}
Loading