Skip to content

Commit 0eb69ee

Browse files
authored
[MEDI] Pipeline (#6993)
1 parent 8df3159 commit 0eb69ee

File tree

13 files changed

+640
-8
lines changed

13 files changed

+640
-8
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace Microsoft.Extensions.DataIngestion;
5+
6+
internal static class DiagnosticsConstants
7+
{
8+
internal const string ActivitySourceName = "Experimental.Microsoft.Extensions.DataIngestion";
9+
internal const string ErrorTypeTagName = "error.type";
10+
11+
internal static class ProcessDirectory
12+
{
13+
internal const string ActivityName = "ProcessDirectory";
14+
internal const string DirectoryPathTagName = "rag.directory.path";
15+
internal const string SearchPatternTagName = "rag.directory.search.pattern";
16+
internal const string SearchOptionTagName = "rag.directory.search.option";
17+
}
18+
19+
internal static class ProcessFiles
20+
{
21+
internal const string ActivityName = "ProcessFiles";
22+
internal const string FileCountTagName = "rag.file.count";
23+
}
24+
25+
internal static class ProcessSource
26+
{
27+
internal const string DocumentIdTagName = "rag.document.id";
28+
}
29+
30+
internal static class ProcessFile
31+
{
32+
internal const string ActivityName = "ProcessFile";
33+
internal const string FilePathTagName = "rag.file.path";
34+
}
35+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.IO;
8+
using System.Runtime.CompilerServices;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Microsoft.Extensions.Logging;
12+
using Microsoft.Shared.Diagnostics;
13+
using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants;
14+
15+
namespace Microsoft.Extensions.DataIngestion;
16+
17+
#pragma warning disable IDE0058 // Expression value is never used
18+
#pragma warning disable IDE0063 // Use simple 'using' statement
19+
#pragma warning disable CA1031 // Do not catch general exception types
20+
21+
/// <summary>
22+
/// Represents a pipeline for ingesting data from documents and processing it into chunks.
23+
/// </summary>
24+
/// <typeparam name="T">The type of the chunk content.</typeparam>
25+
public sealed class IngestionPipeline<T> : IDisposable
26+
{
27+
private readonly IngestionDocumentReader _reader;
28+
private readonly IngestionChunker<T> _chunker;
29+
private readonly IngestionChunkWriter<T> _writer;
30+
private readonly ActivitySource _activitySource;
31+
private readonly ILogger? _logger;
32+
33+
/// <summary>
34+
/// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class.
35+
/// </summary>
36+
/// <param name="reader">The reader for ingestion documents.</param>
37+
/// <param name="chunker">The chunker to split documents into chunks.</param>
38+
/// <param name="writer">The writer for processing chunks.</param>
39+
/// <param name="options">The options for the ingestion pipeline.</param>
40+
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
41+
public IngestionPipeline(
42+
IngestionDocumentReader reader,
43+
IngestionChunker<T> chunker,
44+
IngestionChunkWriter<T> writer,
45+
IngestionPipelineOptions? options = default,
46+
ILoggerFactory? loggerFactory = default)
47+
{
48+
_reader = Throw.IfNull(reader);
49+
_chunker = Throw.IfNull(chunker);
50+
_writer = Throw.IfNull(writer);
51+
_activitySource = new((options ?? new()).ActivitySourceName);
52+
_logger = loggerFactory?.CreateLogger<IngestionPipeline<T>>();
53+
}
54+
55+
/// <inheritdoc/>
56+
public void Dispose()
57+
{
58+
_writer.Dispose();
59+
_activitySource.Dispose();
60+
}
61+
62+
/// <summary>
63+
/// Gets the document processors in the pipeline.
64+
/// </summary>
65+
public IList<IngestionDocumentProcessor> DocumentProcessors { get; } = [];
66+
67+
/// <summary>
68+
/// Gets the chunk processors in the pipeline.
69+
/// </summary>
70+
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = [];
71+
72+
/// <summary>
73+
/// Processes all files in the specified directory that match the given search pattern and option.
74+
/// </summary>
75+
/// <param name="directory">The directory to process.</param>
76+
/// <param name="searchPattern">The search pattern for file selection.</param>
77+
/// <param name="searchOption">The search option for directory traversal.</param>
78+
/// <param name="cancellationToken">The cancellation token for the operation.</param>
79+
/// <returns>A task representing the asynchronous operation.</returns>
80+
public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*",
81+
SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default)
82+
{
83+
Throw.IfNull(directory);
84+
Throw.IfNullOrEmpty(searchPattern);
85+
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
86+
87+
using (Activity? rootActivity = _activitySource.StartActivity(ProcessDirectory.ActivityName))
88+
{
89+
rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName)
90+
.SetTag(ProcessDirectory.SearchPatternTagName, searchPattern)
91+
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
92+
_logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);
93+
94+
await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
95+
{
96+
yield return ingestionResult;
97+
}
98+
}
99+
}
100+
101+
/// <summary>
102+
/// Processes the specified files.
103+
/// </summary>
104+
/// <param name="files">The collection of files to process.</param>
105+
/// <param name="cancellationToken">The cancellation token for the operation.</param>
106+
/// <returns>A task representing the asynchronous operation.</returns>
107+
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
108+
{
109+
Throw.IfNull(files);
110+
111+
using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName))
112+
{
113+
await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
114+
{
115+
yield return ingestionResult;
116+
}
117+
}
118+
}
119+
120+
private static string GetShortName(object any) => any.GetType().Name;
121+
122+
private static void TraceException(Activity? activity, Exception ex)
123+
{
124+
activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName)
125+
.SetStatus(ActivityStatusCode.Error, ex.Message);
126+
}
127+
128+
private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity,
129+
[EnumeratorCancellation] CancellationToken cancellationToken)
130+
{
131+
#if NET
132+
if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count))
133+
#else
134+
if (files is IReadOnlyCollection<FileInfo> { Count: int count })
135+
#endif
136+
{
137+
rootActivity?.SetTag(ProcessFiles.FileCountTagName, count);
138+
_logger?.LogFileCount(count);
139+
}
140+
141+
foreach (FileInfo fileInfo in files)
142+
{
143+
using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
144+
{
145+
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
146+
_logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));
147+
148+
IngestionDocument? document = null;
149+
Exception? failure = null;
150+
try
151+
{
152+
document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);
153+
154+
processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
155+
_logger?.ReadDocument(document.Identifier);
156+
157+
await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
158+
}
159+
catch (Exception ex)
160+
{
161+
TraceException(processFileActivity, ex);
162+
_logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName);
163+
164+
failure = ex;
165+
}
166+
167+
yield return new IngestionResult(fileInfo, document, failure);
168+
}
169+
}
170+
}
171+
172+
private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
173+
{
174+
foreach (IngestionDocumentProcessor processor in DocumentProcessors)
175+
{
176+
document = await processor.ProcessAsync(document, cancellationToken).ConfigureAwait(false);
177+
178+
// A DocumentProcessor might change the document identifier (for example by extracting it from its content), so update the ID tag.
179+
parentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
180+
}
181+
182+
IAsyncEnumerable<IngestionChunk<T>> chunks = _chunker.ProcessAsync(document, cancellationToken);
183+
foreach (var processor in ChunkProcessors)
184+
{
185+
chunks = processor.ProcessAsync(chunks, cancellationToken);
186+
}
187+
188+
_logger?.WritingChunks(GetShortName(_writer));
189+
await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false);
190+
_logger?.WroteChunks(document.Identifier);
191+
}
192+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Diagnostics;
5+
using Microsoft.Shared.Diagnostics;
6+
7+
namespace Microsoft.Extensions.DataIngestion;
8+
9+
#pragma warning disable SA1500 // Braces for multi-line statements should not share line
10+
#pragma warning disable SA1513 // Closing brace should be followed by blank line
11+
12+
/// <summary>
13+
/// Options for configuring the ingestion pipeline.
14+
/// </summary>
15+
public sealed class IngestionPipelineOptions
16+
{
17+
/// <summary>
18+
/// Gets or sets the name of the <see cref="ActivitySource"/> used for diagnostics.
19+
/// </summary>
20+
public string ActivitySourceName
21+
{
22+
get;
23+
set => field = Throw.IfNullOrEmpty(value);
24+
} = DiagnosticsConstants.ActivitySourceName;
25+
26+
internal IngestionPipelineOptions Clone() => new()
27+
{
28+
ActivitySourceName = ActivitySourceName,
29+
};
30+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.IO;
6+
using Microsoft.Shared.Diagnostics;
7+
8+
namespace Microsoft.Extensions.DataIngestion;
9+
10+
/// <summary>
11+
/// Represents the result of an ingestion operation.
12+
/// </summary>
13+
public sealed class IngestionResult
14+
{
15+
/// <summary>
16+
/// Gets the source file that was ingested.
17+
/// </summary>
18+
public FileInfo Source { get; }
19+
20+
/// <summary>
21+
/// Gets the ingestion document created from the source file, if reading the document has succeeded.
22+
/// </summary>
23+
public IngestionDocument? Document { get; }
24+
25+
/// <summary>
26+
/// Gets the exception that occurred during ingestion, if any.
27+
/// </summary>
28+
public Exception? Exception { get; }
29+
30+
/// <summary>
31+
/// Gets a value indicating whether the ingestion succeeded.
32+
/// </summary>
33+
public bool Succeeded => Exception is null;
34+
35+
internal IngestionResult(FileInfo source, IngestionDocument? document, Exception? exception)
36+
{
37+
Source = Throw.IfNull(source);
38+
Document = document;
39+
Exception = exception;
40+
}
41+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.IO;
6+
using Microsoft.Extensions.Logging;
7+
8+
#pragma warning disable S109 // Magic numbers should not be used
9+
10+
namespace Microsoft.Extensions.DataIngestion
11+
{
12+
internal static partial class Log
13+
{
14+
[LoggerMessage(0, LogLevel.Information, "Starting to process files in directory '{directory}' with search pattern '{searchPattern}' and search option '{searchOption}'.")]
15+
internal static partial void ProcessingDirectory(this ILogger logger, string directory, string searchPattern, System.IO.SearchOption searchOption);
16+
17+
[LoggerMessage(1, LogLevel.Information, "Processing {fileCount} files.")]
18+
internal static partial void LogFileCount(this ILogger logger, int fileCount);
19+
20+
[LoggerMessage(2, LogLevel.Information, "Reading file '{filePath}' using '{reader}'.")]
21+
internal static partial void ReadingFile(this ILogger logger, string filePath, string reader);
22+
23+
[LoggerMessage(3, LogLevel.Information, "Read document '{documentId}'.")]
24+
internal static partial void ReadDocument(this ILogger logger, string documentId);
25+
26+
[LoggerMessage(4, LogLevel.Information, "Writing chunks using {writer}.")]
27+
internal static partial void WritingChunks(this ILogger logger, string writer);
28+
29+
[LoggerMessage(5, LogLevel.Information, "Wrote chunks for document '{documentId}'.")]
30+
internal static partial void WroteChunks(this ILogger logger, string documentId);
31+
32+
[LoggerMessage(6, LogLevel.Error, "An error occurred while ingesting document '{identifier}'.")]
33+
internal static partial void IngestingFailed(this ILogger logger, Exception exception, string identifier);
34+
}
35+
}

src/Libraries/Microsoft.Extensions.DataIngestion/Microsoft.Extensions.DataIngestion.csproj

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
<TargetFrameworks>$(TargetFrameworks);netstandard2.0</TargetFrameworks>
55
<RootNamespace>Microsoft.Extensions.DataIngestion</RootNamespace>
66

7-
<!-- Project reference can be removed -->
8-
<NoWarn>$(NoWarn);RT0002</NoWarn>
7+
<UseLoggingGenerator>true</UseLoggingGenerator>
8+
<DisableMicrosoftExtensionsLoggingSourceGenerator>false</DisableMicrosoftExtensionsLoggingSourceGenerator>
99

1010
<!-- we are not ready to publish yet -->
1111
<IsPackable>false</IsPackable>
@@ -18,11 +18,16 @@
1818
</ItemGroup>
1919

2020
<ItemGroup>
21-
<PackageReference Include="System.Collections.Immutable" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
21+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
2222
<PackageReference Include="Microsoft.Extensions.VectorData.Abstractions" />
2323
<PackageReference Include="Microsoft.ML.Tokenizers" />
2424
</ItemGroup>
2525

26+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
27+
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
28+
<PackageReference Include="System.Collections.Immutable" />
29+
</ItemGroup>
30+
2631
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">
2732
<!-- Workaround https://github.com/microsoft/semantic-kernel/issues/13316 -->
2833
<PackageReference Include="System.Text.Json" VersionOverride="$(SystemTextJsonVersion)" />

0 commit comments

Comments
 (0)