|
| 1 | +// Copyright (c) Microsoft. All rights reserved. |
| 2 | + |
| 3 | +using System; |
| 4 | +using System.Collections.Generic; |
| 5 | +using System.Text.Json; |
| 6 | +using System.Threading; |
| 7 | +using System.Threading.Tasks; |
| 8 | +using Microsoft.Extensions.Logging; |
| 9 | +using Microsoft.KernelMemory.Diagnostics; |
| 10 | +using Microsoft.KernelMemory.Pipeline; |
| 11 | +using Microsoft.SemanticKernel; |
| 12 | +using Microsoft.SemanticKernel.ChatCompletion; |
| 13 | +using static Microsoft.KernelMemory.Pipeline.DataPipeline; |
| 14 | + |
| 15 | +namespace Microsoft.KernelMemory.Handlers; |
| 16 | + |
| 17 | +public sealed class KeywordExtractingHandler : IPipelineStepHandler |
| 18 | +{ |
| 19 | + public string StepName { get; } |
| 20 | + private readonly ILogger<KeywordExtractingHandler> _log; |
| 21 | + private readonly IPipelineOrchestrator _orchestrator; |
| 22 | + private readonly Kernel _kernel; |
| 23 | + private readonly KernelMemoryConfig? _config = null; |
| 24 | + |
| 25 | + public KeywordExtractingHandler( |
| 26 | + string stepName, |
| 27 | + IPipelineOrchestrator orchestrator, |
| 28 | + KernelMemoryConfig config = null, |
| 29 | + ILoggerFactory? loggerFactory = null |
| 30 | + ) |
| 31 | + { |
| 32 | + this.StepName = stepName; |
| 33 | + this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger<KeywordExtractingHandler>(); |
| 34 | + this._orchestrator = orchestrator; |
| 35 | + this._config = config; |
| 36 | + |
| 37 | + //init Semantic Kernel |
| 38 | + this._kernel = Kernel.CreateBuilder() |
| 39 | + .AddAzureOpenAIChatCompletion(deploymentName: (string)this._config.Services["AzureOpenAIText"]["Deployment"], |
| 40 | + endpoint: (string)this._config.Services["AzureOpenAIText"]["Endpoint"], |
| 41 | + apiKey: (string)this._config.Services["AzureOpenAIText"]["APIKey"]) |
| 42 | + .Build(); |
| 43 | + } |
| 44 | + |
| 45 | + public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default) |
| 46 | + { |
| 47 | + this._log.LogDebug("Extracting Keywords from the content", pipeline.Index, pipeline.DocumentId); |
| 48 | + |
| 49 | + foreach (FileDetails uploadedFile in pipeline.Files) |
| 50 | + { |
| 51 | + Dictionary<string, DataPipeline.GeneratedFileDetails> extractedTagsFile = new(); |
| 52 | + |
| 53 | + foreach (KeyValuePair<string, DataPipeline.GeneratedFileDetails> generatedFile in uploadedFile.GeneratedFiles) |
| 54 | + { |
| 55 | + var file = generatedFile.Value; |
| 56 | + |
| 57 | + if (file.AlreadyProcessedBy(this)) |
| 58 | + { |
| 59 | + this._log.LogDebug("File {FileName} has already been processed by {HandlerName}", file.Name, this.StepName); |
| 60 | + continue; |
| 61 | + } |
| 62 | + |
| 63 | + // Extract keywords from the file |
| 64 | + if (file.ArtifactType == DataPipeline.ArtifactTypes.ExtractedText) |
| 65 | + { |
| 66 | + this._log.LogDebug("Extracting Tags from the file {FileName}", file.Name); |
| 67 | + |
| 68 | + var sourceFile = uploadedFile.Name; |
| 69 | + string extactedFileContent = string.Empty; |
| 70 | + |
| 71 | + BinaryData fileContent = await this._orchestrator.ReadFileAsync(pipeline, file.Name, cancellationToken).ConfigureAwait(false); |
| 72 | + |
| 73 | + //set file content to extactedFileContent |
| 74 | + extactedFileContent = fileContent.ToString(); |
| 75 | + |
| 76 | + //extract tags as a Json file |
| 77 | + var destFile = $"{uploadedFile.Name}.tags.json"; |
| 78 | + |
| 79 | + var chat = this._kernel.GetRequiredService<IChatCompletionService>(); |
| 80 | + var chatHistory = new ChatHistory(); |
| 81 | + |
| 82 | + var systemMessage = """ |
| 83 | + You are an assistant to analyze Content and Extract Tags by Content. |
| 84 | + [EXTRACT TAGS RULES] |
| 85 | + IT SHOULD BE A LIST OF DICTIONARIES WITH CATEGORY AND TAGS |
| 86 | + TAGS SHOULD BE CATEGORY SPECIFIC |
| 87 | + TAGS SHOULD BE A LIST OF STRINGS |
| 88 | + TAGS COUNT CAN BE UP TO 10 UNDER A CATEGORY |
| 89 | + CATEGORY COUNT CAN BE UP TO 10 |
| 90 | + DON'T ADD ANY MARKDOWN EXPRESSION IN YOUR RESPONSE |
| 91 | + [END RULES] |
| 92 | +
|
| 93 | + [EXAMPLE] |
| 94 | + [ |
| 95 | + { |
| 96 | + [category1": ["tag1", "tag2", "tag3"] |
| 97 | + }, |
| 98 | + { |
| 99 | + "category2": ["tag1", "tag2", "tag3"] |
| 100 | + } |
| 101 | + ] |
| 102 | + [END EXAMPLE] |
| 103 | + """; |
| 104 | + |
| 105 | + chatHistory.AddSystemMessage(systemMessage); |
| 106 | + chatHistory.AddUserMessage($"Extract tags from this content : {extactedFileContent} \n The format should be Json but Markdown expression."); |
| 107 | + |
| 108 | + var executionParam = new PromptExecutionSettings() |
| 109 | + { |
| 110 | + ExtensionData = new Dictionary<string, object> |
| 111 | + { |
| 112 | + { "Temperature", 0 } |
| 113 | + } |
| 114 | + }; |
| 115 | + |
| 116 | + ChatMessageContent response = null; |
| 117 | + |
| 118 | + try |
| 119 | + { |
| 120 | + response = await chat.GetChatMessageContentAsync(chatHistory: chatHistory, executionSettings: executionParam, cancellationToken: cancellationToken).ConfigureAwait(true); |
| 121 | + |
| 122 | + //Make BinaryData from response |
| 123 | + BinaryData responseBinaryData = new(response.ToString()); |
| 124 | + await this._orchestrator.WriteFileAsync(pipeline, destFile, responseBinaryData, cancellationToken).ConfigureAwait(false); |
| 125 | + |
| 126 | + //Add Tags from Extracted Keywords |
| 127 | + List<Dictionary<string, List<string>>> tags = JsonSerializer.Deserialize<List<Dictionary<string, List<string>>>>(response.ToString()); |
| 128 | + |
| 129 | + Dictionary<string, List<string>> keyValueCollection = new Dictionary<string, List<string>>(); |
| 130 | + |
| 131 | + foreach (var category in tags) |
| 132 | + { |
| 133 | + foreach (var kvp in category) |
| 134 | + { |
| 135 | + pipeline.Tags.Add(kvp.Key, kvp.Value); |
| 136 | + } |
| 137 | + } |
| 138 | + } |
| 139 | + catch (Exception ex) |
| 140 | + { |
| 141 | + this._log.LogError(ex, "Error while extracting tags from the file {FileName}", file.Name); |
| 142 | + await this._orchestrator.WriteFileAsync(pipeline, destFile, new BinaryData("[]"), cancellationToken).ConfigureAwait(false); |
| 143 | + continue; |
| 144 | + } |
| 145 | + } |
| 146 | + } |
| 147 | + uploadedFile.MarkProcessedBy(this); |
| 148 | + } |
| 149 | + |
| 150 | + return (true, pipeline); |
| 151 | + } |
| 152 | + |
| 153 | +} |
0 commit comments