Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.DataIngestion;

/// <summary>
/// Reads source content and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <typeparam name="TSource">The type of the source to read from. Sample: <see cref="FileInfo"/>, <see cref="Stream"/>, etc.</typeparam>
public interface IIngestionDocumentReader<TSource>
{
/// <summary>
/// Reads a source and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="source">The source to read.</param>
/// <param name="identifier">The unique identifier for the document.</param>
/// <param name="mediaType">The media type of the file.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="identifier"/> is <see langword="null"/> or empty.</exception>
Task<IngestionDocument> ReadAsync(TSource source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Extensions.DataIngestion;
/// <summary>
/// Reads source content and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
public abstract class IngestionDocumentReader
public abstract class IngestionDocumentReader : IIngestionDocumentReader<FileInfo>, IIngestionDocumentReader<Stream>
{
/// <summary>
/// Reads a file and converts it to an <see cref="IngestionDocument"/>.
Expand All @@ -24,7 +24,7 @@ public abstract class IngestionDocumentReader
public Task<IngestionDocument> ReadAsync(FileInfo source, CancellationToken cancellationToken = default)
{
string identifier = Throw.IfNull(source).FullName; // entire path is more unique than just part of it.
return ReadAsync(source, identifier, GetMediaType(source), cancellationToken);
return ReadAsync(source, identifier, source.GetMediaType(), cancellationToken);
}

/// <summary>
Expand All @@ -42,7 +42,7 @@ public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string i
_ = Throw.IfNullOrEmpty(identifier);

using FileStream stream = new(source.FullName, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 1, FileOptions.Asynchronous);
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? GetMediaType(source) : mediaType!, cancellationToken).ConfigureAwait(false);
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? source.GetMediaType() : mediaType!, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -53,90 +53,5 @@ public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string i
/// <param name="mediaType">The media type of the content.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
public abstract Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default);

private static string GetMediaType(FileInfo source)
=> source.Extension switch
{
".123" => "application/vnd.lotus-1-2-3",
".602" => "application/x-t602",
".abw" => "application/x-abiword",
".bmp" => "image/bmp",
".cgm" => "image/cgm",
".csv" => "text/csv",
".cwk" => "application/x-cwk",
".dbf" => "application/vnd.dbf",
".dif" => "application/x-dif",
".doc" => "application/msword",
".docm" => "application/vnd.ms-word.document.macroEnabled.12",
".docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".dot" => "application/msword",
".dotm" => "application/vnd.ms-word.template.macroEnabled.12",
".dotx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
".epub" => "application/epub+zip",
".et" => "application/vnd.ms-excel",
".eth" => "application/ethos",
".fods" => "application/vnd.oasis.opendocument.spreadsheet",
".gif" => "image/gif",
".htm" => "text/html",
".html" => "text/html",
".hwp" => "application/x-hwp",
".jpeg" => "image/jpeg",
".jpg" => "image/jpeg",
".key" => "application/x-iwork-keynote-sffkey",
".lwp" => "application/vnd.lotus-wordpro",
".mcw" => "application/macwriteii",
".mw" => "application/macwriteii",
".numbers" => "application/x-iwork-numbers-sffnumbers",
".ods" => "application/vnd.oasis.opendocument.spreadsheet",
".pages" => "application/x-iwork-pages-sffpages",
".pbd" => "application/x-pagemaker",
".pdf" => "application/pdf",
".png" => "image/png",
".pot" => "application/vnd.ms-powerpoint",
".potm" => "application/vnd.ms-powerpoint.template.macroEnabled.12",
".potx" => "application/vnd.openxmlformats-officedocument.presentationml.template",
".ppt" => "application/vnd.ms-powerpoint",
".pptm" => "application/vnd.ms-powerpoint.presentation.macroEnabled.12",
".pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".prn" => "application/x-prn",
".qpw" => "application/x-quattro-pro",
".rtf" => "application/rtf",
".sda" => "application/vnd.stardivision.draw",
".sdd" => "application/vnd.stardivision.impress",
".sdp" => "application/sdp",
".sdw" => "application/vnd.stardivision.writer",
".sgl" => "application/vnd.stardivision.writer",
".slk" => "text/vnd.sylk",
".sti" => "application/vnd.sun.xml.impress.template",
".stw" => "application/vnd.sun.xml.writer.template",
".svg" => "image/svg+xml",
".sxg" => "application/vnd.sun.xml.writer.global",
".sxi" => "application/vnd.sun.xml.impress",
".sxw" => "application/vnd.sun.xml.writer",
".sylk" => "text/vnd.sylk",
".tiff" => "image/tiff",
".tsv" => "text/tab-separated-values",
".txt" => "text/plain",
".uof" => "application/vnd.uoml+xml",
".uop" => "application/vnd.openofficeorg.presentation",
".uos1" => "application/vnd.uoml+xml",
".uos2" => "application/vnd.uoml+xml",
".uot" => "application/x-uo",
".vor" => "application/vnd.stardivision.writer",
".webp" => "image/webp",
".wpd" => "application/wordperfect",
".wps" => "application/vnd.ms-works",
".wq1" => "application/x-lotus",
".wq2" => "application/x-lotus",
".xls" => "application/vnd.ms-excel",
".xlsb" => "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
".xlsm" => "application/vnd.ms-excel.sheet.macroEnabled.12",
".xlr" => "application/vnd.ms-works",
".xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xlw" => "application/vnd.ms-excel",
".xml" => "application/xml",
".zabw" => "application/x-abiword",
_ => "application/octet-stream"
};
public abstract Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
<NoWarn>$(NoWarn);S1694</NoWarn>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Microsoft.Extensions.DataIngestion\Utils\MediaTypeProvider.cs" Link="MediaTypeProvider.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Memory" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Extensions.DataIngestion;
/// <summary>
/// Reads documents by converting them to Markdown using the <see href="https://github.com/microsoft/markitdown">MarkItDown</see> MCP server.
/// </summary>
public class MarkItDownMcpReader : IngestionDocumentReader
public class MarkItDownMcpReader : IngestionDocumentReader, IIngestionDocumentReader<DataContent>
{
private readonly Uri _mcpServerUri;
private readonly McpClientOptions? _options;
Expand Down Expand Up @@ -65,7 +65,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand All @@ -79,10 +79,18 @@ public override async Task<IngestionDocument> ReadAsync(Stream source, string id
#endif
DataContent dataContent = new(
ms.GetBuffer().AsMemory(0, (int)ms.Length),
string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType);
string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType!);

string markdown = await ConvertToMarkdownAsync(dataContent, cancellationToken).ConfigureAwait(false);
return await ReadAsync(dataContent, identifier, mediaType, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task<IngestionDocument> ReadAsync(DataContent source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);

string markdown = await ConvertToMarkdownAsync(source, cancellationToken).ConfigureAwait(false);
return MarkdownParser.Parse(markdown, identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string

/// <inheritdoc/>
/// <remarks>The contents of <paramref name="source"/> are copied to a temporary file.</remarks>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal static class ProcessFiles

internal static class ProcessSource
{
internal const string ActivityName = "ProcessSource";
internal const string DocumentIdTagName = "rag.document.id";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants;

namespace Microsoft.Extensions.DataIngestion;

#pragma warning disable IDE0058 // Expression value is never used
#pragma warning disable IDE0063 // Use simple 'using' statement
#pragma warning disable CA1031 // Do not catch general exception types

/// <summary>
/// Provides a set of File System extension methods for the <see cref="IngestionPipeline{TChunk, TSource}"/> class.
/// </summary>
public static class FileSystemIngestionExtensions
{
/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// </summary>
/// <typeparam name="TChunk">The type of the chunk content.</typeparam>
/// <param name="pipeline">The ingestion pipeline.</param>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
DirectoryInfo directory, string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(pipeline);
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);

using (Activity? rootActivity = pipeline.ActivitySource.StartActivity(ProcessDirectory.ActivityName))
{
rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName)
.SetTag(ProcessDirectory.SearchPatternTagName, searchPattern)
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
pipeline.Logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);

var files = directory.GetFiles(searchPattern, searchOption);
await foreach (var ingestionResult in pipeline.ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
}
}

/// <summary>
/// Processes the specified files.
/// </summary>
/// <typeparam name="TChunk">The type of the chunk content.</typeparam>
/// <param name="pipeline">The ingestion pipeline.</param>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
IEnumerable<FileInfo> files,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(pipeline);
Throw.IfNull(files);

using (Activity? rootActivity = pipeline.ActivitySource.StartActivity(ProcessFiles.ActivityName))
{
await foreach (var ingestionResult in pipeline.ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
}
}

private static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
IEnumerable<FileInfo> files, Activity? rootActivity,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#if NET
if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count))
#else
if (files is IReadOnlyCollection<FileInfo> { Count: int count })
#endif
{
rootActivity?.SetTag(ProcessFiles.FileCountTagName, count);
pipeline.Logger?.LogFileCount(count);
}

foreach (FileInfo fileInfo in files)
{
using (Activity? processFileActivity = pipeline.ActivitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);

Exception? failure = null;
IngestionDocument? document = null;

try
{
document = await pipeline.ProcessAsync(fileInfo, fileInfo.FullName, fileInfo.GetMediaType(), cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
failure = e;
}

string documentId = document?.Identifier ?? fileInfo.FullName;
yield return new IngestionResult(documentId, document, failure);
}
}
}
}
Loading
Loading