55using Microsoft . KernelMemory ;
66using Microsoft . KernelMemory . Context ;
77using Microsoft . KernelMemory . DataFormats ;
8+ using Microsoft . KernelMemory . Diagnostics ;
89using Microsoft . KernelMemory . DocumentStorage . DevTools ;
910using Microsoft . KernelMemory . FileSystem . DevTools ;
1011using Microsoft . KernelMemory . Handlers ;
1112using Microsoft . KernelMemory . MemoryStorage . DevTools ;
13+ using Microsoft . KernelMemory . Pipeline ;
14+ using System . Security . Cryptography ;
15+ using System . Text ;
16+ using System . Linq ;
17+ using HandlebarsDotNet . Extensions ;
1218
1319namespace SemanticMemory . Samples ;
1420
@@ -48,6 +54,9 @@ public async Task RunSample(string fileToParse)
4854 TextPartitioningHandler textPartitioning = new ( "partition" , orchestrator ) ;
4955 await orchestrator . AddHandlerAsync ( textPartitioning ) ;
5056
57+ CustomSamplePartitioningHandler customMarkdownPartition = new ( "markdownpartition" , orchestrator ) ;
58+ await orchestrator . AddHandlerAsync ( customMarkdownPartition ) ;
59+
5160 GenerateEmbeddingsHandler textEmbedding = new ( "gen_embeddings" , orchestrator ) ;
5261 await orchestrator . AddHandlerAsync ( textEmbedding ) ;
5362
@@ -66,11 +75,12 @@ public async Task RunSample(string fileToParse)
6675 new TagCollection { { "example" , "books" } } )
6776 . AddUploadFile ( fileName , fileName , fileToParse )
6877 . Then ( "extract" )
69- . Then ( "partition" )
78+ //.Then("partition")
79+ . Then ( "markdownpartition" )
7080 . Then ( "gen_embeddings" )
7181 . Then ( "save_records" ) ;
7282
73- contextProvider . AddLLamaCloudParserOptions ( fileName , "This is a manual for Dreame vacuum cleaner, I need you to extract a series of sections that can be useful for an helpdesk to answer user questions. You will create sections where each sections contains a question and an answer taken from the text." ) ;
83+ contextProvider . AddLLamaCloudParserOptions ( fileName , "This is a manual for Dreame vacuum cleaner, I need you to extract a series of sections that can be useful for an helpdesk to answer user questions. You will create sections where each sections contains a question and an answer taken from the text. Each question will be separated with --- " ) ;
7484
7585 var pipeline = pipelineBuilder . Build ( ) ;
7686 await orchestrator . RunPipelineAsync ( pipeline ) ;
@@ -162,3 +172,179 @@ private static IKernelMemoryBuilder CreateBasicKernelMemoryBuilder(
162172 return kernelMemoryBuilder ;
163173 }
164174}
175+
176+ public sealed class CustomSamplePartitioningHandler : IPipelineStepHandler
177+ {
178+ private readonly IPipelineOrchestrator _orchestrator ;
179+ private readonly ILogger < TextPartitioningHandler > _log ;
180+
181+ /// <inheritdoc />
182+ public string StepName { get ; }
183+
184+ /// <summary>
185+ /// Handler responsible for partitioning text in small chunks.
186+ /// Note: stepName and other params are injected with DI.
187+ /// </summary>
188+ /// <param name="stepName">Pipeline step for which the handler will be invoked</param>
189+ /// <param name="orchestrator">Current orchestrator used by the pipeline, giving access to content and other helps.</param>
190+ /// <param name="options">The customize text partitioning option</param>
191+ /// <param name="loggerFactory">Application logger factory</param>
192+ public CustomSamplePartitioningHandler (
193+ string stepName ,
194+ IPipelineOrchestrator orchestrator ,
195+ ILoggerFactory ? loggerFactory = null )
196+ {
197+ this . StepName = stepName ;
198+ this . _orchestrator = orchestrator ;
199+
200+ this . _log = ( loggerFactory ?? DefaultLogger . Factory ) . CreateLogger < TextPartitioningHandler > ( ) ;
201+ this . _log . LogInformation ( "Handler '{0}' ready" , stepName ) ;
202+ }
203+
204+ /// <inheritdoc />
205+ public async Task < ( ReturnType returnType , DataPipeline updatedPipeline ) > InvokeAsync (
206+ DataPipeline pipeline , CancellationToken cancellationToken = default )
207+ {
208+ this . _log . LogDebug ( "Markdown question Partitioning text, pipeline '{0}/{1}'" , pipeline . Index , pipeline . DocumentId ) ;
209+
210+ if ( pipeline . Files . Count == 0 )
211+ {
212+ this . _log . LogWarning ( "Pipeline '{0}/{1}': there are no files to process, moving to next pipeline step." , pipeline . Index , pipeline . DocumentId ) ;
213+ return ( ReturnType . Success , pipeline ) ;
214+ }
215+
216+ var context = pipeline . GetContext ( ) ;
217+
218+ foreach ( DataPipeline . FileDetails uploadedFile in pipeline . Files )
219+ {
220+ // Track new files being generated (cannot edit originalFile.GeneratedFiles while looping it)
221+ Dictionary < string , DataPipeline . GeneratedFileDetails > newFiles = [ ] ;
222+
223+ foreach ( KeyValuePair < string , DataPipeline . GeneratedFileDetails > generatedFile in uploadedFile . GeneratedFiles )
224+ {
225+ var file = generatedFile . Value ;
226+ if ( file . AlreadyProcessedBy ( this ) )
227+ {
228+ this . _log . LogTrace ( "File {0} already processed by this handler" , file . Name ) ;
229+ continue ;
230+ }
231+
232+ // Partition only the original text
233+ if ( file . ArtifactType != DataPipeline . ArtifactTypes . ExtractedText )
234+ {
235+ this . _log . LogTrace ( "Skipping file {0} (not original text)" , file . Name ) ;
236+ continue ;
237+ }
238+
239+ // Use a different partitioning strategy depending on the file type
240+ BinaryData partitionContent = await this . _orchestrator . ReadFileAsync ( pipeline , file . Name , cancellationToken ) . ConfigureAwait ( false ) ;
241+ string partitionsMimeType = MimeTypes . MarkDown ;
242+
243+ // Skip empty partitions. Also: partitionContent.ToString() throws an exception if there are no bytes.
244+ if ( partitionContent . IsEmpty ) { continue ; }
245+ int partition = 1 ;
246+ switch ( file . MimeType )
247+ {
248+ case MimeTypes . MarkDown :
249+ {
250+ this . _log . LogDebug ( "Partitioning MarkDown file {0}" , file . Name ) ;
251+ string content = partitionContent . ToString ( ) ;
252+ partitionsMimeType = MimeTypes . MarkDown ;
253+
254+ var sb = new StringBuilder ( 1024 ) ;
255+ using ( var reader = new StringReader ( content ) )
256+ {
257+ string ? line ;
258+ while ( ( line = reader . ReadLine ( ) ) != null )
259+ {
260+ if ( string . IsNullOrWhiteSpace ( line ) )
261+ {
262+ continue ;
263+ }
264+
265+ if ( line . StartsWith ( "---" ) )
266+ {
267+ partition = await AddSegment ( pipeline , uploadedFile , newFiles , partitionsMimeType , partition , sb , cancellationToken ) . ConfigureAwait ( false ) ;
268+ sb . Clear ( ) ;
269+ continue ;
270+ }
271+
272+ sb . AppendLine ( line ) ;
273+ }
274+ }
275+
276+ // Write remaining content if any
277+ if ( sb . Length > 0 )
278+ {
279+ await AddSegment ( pipeline , uploadedFile , newFiles , partitionsMimeType , partition , sb , cancellationToken ) . ConfigureAwait ( false ) ;
280+ }
281+
282+ break ;
283+ }
284+
285+ default :
286+ this . _log . LogWarning ( "File {0} cannot be partitioned, type '{1}' not supported" , file . Name , file . MimeType ) ;
287+ // Don't partition other files
288+ continue ;
289+ }
290+ }
291+
292+ // Add new files to pipeline status
293+ foreach ( var file in newFiles )
294+ {
295+ uploadedFile . GeneratedFiles . Add ( file . Key , file . Value ) ;
296+ }
297+ }
298+
299+ return ( ReturnType . Success , pipeline ) ;
300+ }
301+
302+ private async Task < int > AddSegment ( DataPipeline pipeline , DataPipeline . FileDetails uploadedFile , Dictionary < string , DataPipeline . GeneratedFileDetails > newFiles , string partitionsMimeType , int partition , StringBuilder sb , CancellationToken cancellationToken )
303+ {
304+ if ( sb . Length == 0 )
305+ {
306+ //do not increment partition, an empty segment is not a segment.
307+ return partition ;
308+ }
309+
310+ var text = sb . ToString ( ) . Trim ( '\n ' , '\r ' , ' ' ) ;
311+
312+ //is empty after trimming?
313+ if ( string . IsNullOrWhiteSpace ( text ) )
314+ {
315+ //do not increment partition, an empty segment is not a segment.
316+ return partition ;
317+ }
318+
319+ var destFile = uploadedFile . GetPartitionFileName ( partition ) ;
320+ var textData = new BinaryData ( sb . ToString ( ) ) ;
321+ await this . _orchestrator . WriteFileAsync ( pipeline , destFile , textData , cancellationToken ) . ConfigureAwait ( false ) ;
322+
323+ var destFileDetails = new DataPipeline . GeneratedFileDetails
324+ {
325+ Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
326+ ParentId = uploadedFile . Id ,
327+ Name = destFile ,
328+ Size = sb . Length ,
329+ MimeType = partitionsMimeType ,
330+ ArtifactType = DataPipeline . ArtifactTypes . TextPartition ,
331+ PartitionNumber = partition ,
332+ SectionNumber = 1 ,
333+ Tags = pipeline . Tags ,
334+ ContentSHA256 = textData . CalculateSHA256 ( ) ,
335+ } ;
336+ newFiles . Add ( destFile , destFileDetails ) ;
337+ destFileDetails . MarkProcessedBy ( this ) ;
338+ partition ++ ;
339+ return partition ;
340+ }
341+ }
342+
343+ internal static class BinaryDataExtensions
344+ {
345+ public static string CalculateSHA256 ( this BinaryData binaryData )
346+ {
347+ byte [ ] byteArray = SHA256 . HashData ( binaryData . ToMemory ( ) . Span ) ;
348+ return Convert . ToHexString ( byteArray ) . ToLowerInvariant ( ) ;
349+ }
350+ }
0 commit comments