diff --git a/AGENTS.md b/AGENTS.md
index b1c5fad81..fd380a65f 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -8,6 +8,7 @@ Ignore the "archived" directory.
- `build.sh` is passing without errors or warnings
- Test coverage is greater than 80% and `coverage.sh` is not failing
- Problems are not hidden, problems are addressed.
+- Report breaking changes and ask how to handle them, do not assume migrations are required.
# C# Code Style
diff --git a/clean.sh b/clean.sh
index 6661aba3b..ad7170f24 100755
--- a/clean.sh
+++ b/clean.sh
@@ -12,3 +12,5 @@ rm -rf src/Core/obj
rm -rf src/Main/bin
rm -rf src/Main/obj
+
+rm -rf tests/Core.Tests/TestResults
diff --git a/src/Core/Core.csproj b/src/Core/Core.csproj
index 3a5c20e56..2b8e87b72 100644
--- a/src/Core/Core.csproj
+++ b/src/Core/Core.csproj
@@ -6,4 +6,9 @@
net10.0
+
+
+
+
+
diff --git a/src/Core/Storage/ContentStorageDbContext.cs b/src/Core/Storage/ContentStorageDbContext.cs
new file mode 100644
index 000000000..f3e031a9e
--- /dev/null
+++ b/src/Core/Storage/ContentStorageDbContext.cs
@@ -0,0 +1,178 @@
+using System.Globalization;
+using KernelMemory.Core.Storage.Entities;
+using Microsoft.EntityFrameworkCore;
+
+namespace KernelMemory.Core.Storage;
+
+///
+/// Database context for content storage.
+/// Manages Content and Operations tables with proper SQLite configuration.
+///
+public class ContentStorageDbContext : DbContext
+{
+ public DbSet Content { get; set; } = null!;
+ public DbSet Operations { get; set; } = null!;
+
+ public ContentStorageDbContext(DbContextOptions options) : base(options)
+ {
+ }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ base.OnModelCreating(modelBuilder);
+
+ // Configure Content table
+ modelBuilder.Entity(entity =>
+ {
+ // Hardcoded table name as per specification
+ entity.ToTable("km_content");
+
+ // Primary key
+ entity.HasKey(e => e.Id);
+
+ // Required fields
+ entity.Property(e => e.Id)
+ .IsRequired()
+ .HasMaxLength(32); // Cuid2 is typically 25-32 characters
+
+ entity.Property(e => e.Content)
+ .IsRequired();
+
+ entity.Property(e => e.MimeType)
+ .IsRequired()
+ .HasMaxLength(255);
+
+ entity.Property(e => e.ByteSize)
+ .IsRequired();
+
+ entity.Property(e => e.Ready)
+ .IsRequired();
+
+ // DateTimeOffset stored as ISO 8601 string in SQLite
+ entity.Property(e => e.ContentCreatedAt)
+ .IsRequired()
+ .HasConversion(
+ v => v.ToString("O"),
+ v => DateTimeOffset.Parse(v, CultureInfo.InvariantCulture));
+
+ entity.Property(e => e.RecordCreatedAt)
+ .IsRequired()
+ .HasConversion(
+ v => v.ToString("O"),
+ v => DateTimeOffset.Parse(v, CultureInfo.InvariantCulture));
+
+ entity.Property(e => e.RecordUpdatedAt)
+ .IsRequired()
+ .HasConversion(
+ v => v.ToString("O"),
+ v => DateTimeOffset.Parse(v, CultureInfo.InvariantCulture));
+
+ // Optional fields
+ entity.Property(e => e.Title)
+ .IsRequired()
+ .HasDefaultValue(string.Empty);
+
+ entity.Property(e => e.Description)
+ .IsRequired()
+ .HasDefaultValue(string.Empty);
+
+ // JSON fields - store as TEXT in SQLite
+ entity.Property(e => e.TagsJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("[]");
+
+ entity.Property(e => e.MetadataJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("{}");
+
+ // Ignore computed properties (not stored in database)
+ entity.Ignore(e => e.Tags);
+ entity.Ignore(e => e.Metadata);
+
+ // Indexes
+ entity.HasIndex(e => e.Ready)
+ .HasDatabaseName("IX_km_content_Ready");
+ });
+
+ // Configure Operations table
+ modelBuilder.Entity(entity =>
+ {
+ // Hardcoded table name as per specification
+ entity.ToTable("km_operations");
+
+ // Primary key
+ entity.HasKey(e => e.Id);
+
+ // Required fields
+ entity.Property(e => e.Id)
+ .IsRequired()
+ .HasMaxLength(32);
+
+ entity.Property(e => e.Complete)
+ .IsRequired();
+
+ entity.Property(e => e.Cancelled)
+ .IsRequired();
+
+ entity.Property(e => e.ContentId)
+ .IsRequired()
+ .HasMaxLength(32);
+
+ // DateTimeOffset stored as ISO 8601 string in SQLite
+ entity.Property(e => e.Timestamp)
+ .IsRequired()
+ .HasConversion(
+ v => v.ToString("O"),
+ v => DateTimeOffset.Parse(v, CultureInfo.InvariantCulture));
+
+ entity.Property(e => e.LastFailureReason)
+ .IsRequired()
+ .HasDefaultValue(string.Empty);
+
+ // Nullable DateTimeOffset for locking
+ entity.Property(e => e.LastAttemptTimestamp)
+ .HasConversion(
+ v => v.HasValue ? v.Value.ToString("O") : null,
+ v => v == null ? (DateTimeOffset?)null : DateTimeOffset.Parse(v, CultureInfo.InvariantCulture));
+
+ // JSON fields - store as TEXT in SQLite
+ entity.Property(e => e.PlannedStepsJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("[]");
+
+ entity.Property(e => e.CompletedStepsJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("[]");
+
+ entity.Property(e => e.RemainingStepsJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("[]");
+
+ entity.Property(e => e.PayloadJson)
+ .IsRequired()
+ .HasColumnType("TEXT")
+ .HasDefaultValue("{}");
+
+ // Ignore computed properties (not stored in database)
+ entity.Ignore(e => e.PlannedSteps);
+ entity.Ignore(e => e.CompletedSteps);
+ entity.Ignore(e => e.RemainingSteps);
+ entity.Ignore(e => e.Payload);
+
+ // Indexes as per specification
+ entity.HasIndex(e => new { e.ContentId, e.Timestamp })
+ .HasDatabaseName("IX_km_operations_ContentId_Timestamp");
+
+ entity.HasIndex(e => new { e.Complete, e.Timestamp })
+ .HasDatabaseName("IX_km_operations_Complete_Timestamp");
+
+ entity.HasIndex(e => e.Timestamp)
+ .HasDatabaseName("IX_km_operations_Timestamp");
+ });
+ }
+}
diff --git a/src/Core/Storage/ContentStorageService.cs b/src/Core/Storage/ContentStorageService.cs
new file mode 100644
index 000000000..8141d7c7d
--- /dev/null
+++ b/src/Core/Storage/ContentStorageService.cs
@@ -0,0 +1,567 @@
+using System.Text;
+using System.Text.Json;
+using KernelMemory.Core.Storage.Entities;
+using KernelMemory.Core.Storage.Models;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.Logging;
+
+namespace KernelMemory.Core.Storage;
+
+///
+/// Implementation of IContentStorage using SQLite with queue-based execution.
+/// Follows two-phase write pattern with distributed locking.
+///
+public class ContentStorageService : IContentStorage
+{
+ private readonly ContentStorageDbContext _context;
+ private readonly ICuidGenerator _cuidGenerator;
+ private readonly ILogger _logger;
+
+ public ContentStorageService(
+ ContentStorageDbContext context,
+ ICuidGenerator cuidGenerator,
+ ILogger logger)
+ {
+ this._context = context;
+ this._cuidGenerator = cuidGenerator;
+ this._logger = logger;
+ }
+
+ ///
+ /// Upserts content following the two-phase write pattern.
+ ///
+ ///
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Best-effort error handling for phase 2 and processing - operation is queued successfully in phase 1")]
+ public async Task UpsertAsync(UpsertRequest request, CancellationToken cancellationToken = default)
+ {
+ // Generate ID if not provided
+ var contentId = string.IsNullOrWhiteSpace(request.Id)
+ ? this._cuidGenerator.Generate()
+ : request.Id;
+
+ this._logger.LogInformation("Starting upsert operation for content ID: {ContentId}", contentId);
+
+ // Phase 1: Queue the operation (MUST succeed)
+ var operationId = await this.QueueUpsertOperationAsync(contentId, request, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Phase 1 complete: Operation {OperationId} queued for content {ContentId}", operationId, contentId);
+
+ // Phase 2: Try to cancel superseded operations (best effort)
+ try
+ {
+ await this.TryCancelSupersededUpsertOperationsAsync(contentId, operationId, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Phase 2 complete: Cancelled superseded operations for content {ContentId}", contentId);
+ }
+ catch (Exception ex)
+ {
+ // Best effort - log but don't fail
+ this._logger.LogWarning(ex, "Phase 2 failed to cancel superseded operations for content {ContentId} - continuing anyway", contentId);
+ }
+
+ // Processing: Try to process the new operation synchronously
+ try
+ {
+ await this.TryProcessNextOperationAsync(contentId, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Processing complete for content {ContentId}", contentId);
+ }
+ catch (Exception ex)
+ {
+ // Log but don't fail - operation is queued and will be processed eventually
+ this._logger.LogWarning(ex, "Failed to process operation synchronously for content {ContentId} - will be processed by background worker", contentId);
+ }
+
+ return contentId;
+ }
+
+ ///
+ /// Deletes content following the two-phase write pattern.
+ ///
+ ///
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Best-effort error handling for phase 2 and processing - operation is queued successfully in phase 1")]
+ public async Task DeleteAsync(string id, CancellationToken cancellationToken = default)
+ {
+ this._logger.LogInformation("Starting delete operation for content ID: {ContentId}", id);
+
+ // Phase 1: Queue the operation (MUST succeed)
+ var operationId = await this.QueueDeleteOperationAsync(id, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Phase 1 complete: Operation {OperationId} queued for content {ContentId}", operationId, id);
+
+ // Phase 2: Try to cancel ALL previous operations (best effort)
+ try
+ {
+ await this.TryCancelAllOperationsAsync(id, operationId, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Phase 2 complete: Cancelled all previous operations for content {ContentId}", id);
+ }
+ catch (Exception ex)
+ {
+ // Best effort - log but don't fail
+ this._logger.LogWarning(ex, "Phase 2 failed to cancel previous operations for content {ContentId} - continuing anyway", id);
+ }
+
+ // Processing: Try to process the new operation synchronously
+ try
+ {
+ await this.TryProcessNextOperationAsync(id, cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Processing complete for content {ContentId}", id);
+ }
+ catch (Exception ex)
+ {
+ // Log but don't fail - operation is queued and will be processed eventually
+ this._logger.LogWarning(ex, "Failed to process operation synchronously for content {ContentId} - will be processed by background worker", id);
+ }
+ }
+
+ ///
+ /// Retrieves content by ID.
+ ///
+ ///
+ ///
+ public async Task GetByIdAsync(string id, CancellationToken cancellationToken = default)
+ {
+ var record = await this._context.Content
+ .AsNoTracking()
+ .FirstOrDefaultAsync(c => c.Id == id, cancellationToken).ConfigureAwait(false);
+
+ if (record == null)
+ {
+ return null;
+ }
+
+ return new ContentDto
+ {
+ Id = record.Id,
+ Content = record.Content,
+ MimeType = record.MimeType,
+ ByteSize = record.ByteSize,
+ ContentCreatedAt = record.ContentCreatedAt,
+ RecordCreatedAt = record.RecordCreatedAt,
+ RecordUpdatedAt = record.RecordUpdatedAt,
+ Title = record.Title,
+ Description = record.Description,
+ Tags = record.Tags,
+ Metadata = record.Metadata
+ };
+ }
+
+ ///
+ /// Counts total number of content records.
+ ///
+ ///
+ public async Task CountAsync(CancellationToken cancellationToken = default)
+ {
+ return await this._context.Content.LongCountAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ // ========== Phase 1: Queue Operations (REQUIRED) ==========
+
+ ///
+ /// Phase 1: Queue an upsert operation. Must succeed.
+ ///
+ ///
+ ///
+ ///
+ private async Task QueueUpsertOperationAsync(string contentId, UpsertRequest request, CancellationToken cancellationToken)
+ {
+ var operation = new OperationRecord
+ {
+ Id = this._cuidGenerator.Generate(),
+ Complete = false,
+ Cancelled = false,
+ ContentId = contentId,
+ Timestamp = DateTimeOffset.UtcNow,
+ PlannedSteps = ["upsert"],
+ CompletedSteps = [],
+ RemainingSteps = ["upsert"],
+ PayloadJson = JsonSerializer.Serialize(request),
+ LastFailureReason = string.Empty,
+ LastAttemptTimestamp = null
+ };
+
+ this._context.Operations.Add(operation);
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+
+ this._logger.LogDebug("Queued upsert operation {OperationId} for content {ContentId}", operation.Id, contentId);
+ return operation.Id;
+ }
+
+ ///
+ /// Phase 1: Queue a delete operation. Must succeed.
+ ///
+ ///
+ ///
+ private async Task QueueDeleteOperationAsync(string contentId, CancellationToken cancellationToken)
+ {
+ var operation = new OperationRecord
+ {
+ Id = this._cuidGenerator.Generate(),
+ Complete = false,
+ Cancelled = false,
+ ContentId = contentId,
+ Timestamp = DateTimeOffset.UtcNow,
+ PlannedSteps = ["delete"],
+ CompletedSteps = [],
+ RemainingSteps = ["delete"],
+ PayloadJson = JsonSerializer.Serialize(new { Id = contentId }),
+ LastFailureReason = string.Empty,
+ LastAttemptTimestamp = null
+ };
+
+ this._context.Operations.Add(operation);
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+
+ this._logger.LogDebug("Queued delete operation {OperationId} for content {ContentId}", operation.Id, contentId);
+ return operation.Id;
+ }
+
+ // ========== Phase 2: Optimize Queue (OPTIONAL - Best Effort) ==========
+
+ ///
+ /// Phase 2: Try to cancel superseded upsert operations (best effort).
+ /// Only cancels incomplete Upsert operations older than the new one.
+ /// Does NOT cancel Delete operations.
+ ///
+ ///
+ ///
+ ///
+ private async Task TryCancelSupersededUpsertOperationsAsync(string contentId, string newOperationId, CancellationToken cancellationToken)
+ {
+ // Find incomplete operations with same ContentId and older Timestamp
+ // Exclude Delete operations (they must complete)
+ var timestamp = await this._context.Operations
+ .Where(o => o.Id == newOperationId)
+ .Select(o => o.Timestamp)
+ .FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
+
+ var superseded = await this._context.Operations
+ .Where(o => o.ContentId == contentId
+ && o.Id != newOperationId
+ && !o.Complete
+ && o.Timestamp < timestamp
+ && o.PlannedStepsJson.Contains("upsert")) // Only cancel upserts
+ .ToListAsync(cancellationToken).ConfigureAwait(false);
+
+ foreach (var op in superseded)
+ {
+ op.Cancelled = true;
+ this._logger.LogDebug("Cancelled superseded operation {OperationId} for content {ContentId}", op.Id, contentId);
+ }
+
+ if (superseded.Count > 0)
+ {
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ /// Phase 2: Try to cancel ALL previous operations for delete (best effort).
+ /// Cancels all incomplete operations older than the delete operation.
+ ///
+ ///
+ ///
+ ///
+ private async Task TryCancelAllOperationsAsync(string contentId, string newOperationId, CancellationToken cancellationToken)
+ {
+ // Find incomplete operations with same ContentId and older Timestamp
+ var timestamp = await this._context.Operations
+ .Where(o => o.Id == newOperationId)
+ .Select(o => o.Timestamp)
+ .FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
+
+ var superseded = await this._context.Operations
+ .Where(o => o.ContentId == contentId
+ && o.Id != newOperationId
+ && !o.Complete
+ && o.Timestamp < timestamp)
+ .ToListAsync(cancellationToken).ConfigureAwait(false);
+
+ foreach (var op in superseded)
+ {
+ op.Cancelled = true;
+ this._logger.LogDebug("Cancelled operation {OperationId} for content {ContentId} due to delete", op.Id, contentId);
+ }
+
+ if (superseded.Count > 0)
+ {
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ // ========== Processing: Execute Operations ==========
+
+ ///
+ /// Try to process the next operation for a content ID.
+ /// Skips locked operations (no recovery attempts).
+ ///
+ ///
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Catch all to ensure operation failure is logged and content remains locked for retry")]
+ private async Task TryProcessNextOperationAsync(string contentId, CancellationToken cancellationToken)
+ {
+ // Step 1: Get next operation to process
+ var operation = await this._context.Operations
+ .Where(o => o.ContentId == contentId && !o.Complete)
+ .OrderBy(o => o.Timestamp)
+ .FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
+
+ if (operation == null)
+ {
+ this._logger.LogDebug("No operations to process for content {ContentId}", contentId);
+ return;
+ }
+
+ // Check if operation is locked
+ if (operation.LastAttemptTimestamp.HasValue)
+ {
+ this._logger.LogDebug("Operation {OperationId} is locked - skipping (no recovery)", operation.Id);
+ return; // Skip locked operations
+ }
+
+ // If cancelled, mark complete and skip execution
+ if (operation.Cancelled)
+ {
+ this._logger.LogDebug("Operation {OperationId} was cancelled - marking complete", operation.Id);
+ operation.Complete = true;
+ operation.LastFailureReason = "Cancelled";
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+
+ // Try to process next operation recursively
+ await this.TryProcessNextOperationAsync(contentId, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ // Step 2: Acquire lock (Transaction 1)
+ var lockAcquired = await this.TryAcquireLockAsync(operation.Id, contentId, cancellationToken).ConfigureAwait(false);
+ if (!lockAcquired)
+ {
+ this._logger.LogDebug("Failed to acquire lock for operation {OperationId} - another VM got there first", operation.Id);
+ return; // Another VM got the lock
+ }
+
+ this._logger.LogDebug("Lock acquired for operation {OperationId}", operation.Id);
+
+ try
+ {
+ // Step 3: Execute planned steps
+ await this.ExecuteStepsAsync(operation, cancellationToken).ConfigureAwait(false);
+
+ // Step 4: Complete and unlock (Transaction 2)
+ await this.CompleteAndUnlockAsync(operation.Id, contentId, cancellationToken).ConfigureAwait(false);
+
+ this._logger.LogInformation("Operation {OperationId} completed successfully for content {ContentId}", operation.Id, contentId);
+
+ // Step 5: Process next operation (if any)
+ await this.TryProcessNextOperationAsync(contentId, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // Update failure reason
+ operation.LastFailureReason = ex.Message;
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+
+ this._logger.LogError(ex, "Operation {OperationId} failed - content {ContentId} remains locked", operation.Id, contentId);
+ throw; // Propagate error (operation and content remain locked)
+ }
+ }
+
+ ///
+ /// Step 2: Try to acquire lock on operation and content atomically.
+ /// Uses raw SQL for atomic UPDATE with WHERE clause check.
+ ///
+ ///
+ ///
+ ///
+ private async Task TryAcquireLockAsync(string operationId, string contentId, CancellationToken cancellationToken)
+ {
+ // Start a transaction for atomic lock acquisition
+ using var transaction = await this._context.Database.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
+
+ try
+ {
+ var now = DateTimeOffset.UtcNow.ToString("O"); // ISO 8601 format
+
+ // Lock operation - only if LastAttemptTimestamp IS NULL
+ const string OperationSql = @"
+ UPDATE km_operations
+ SET LastAttemptTimestamp = @p0
+ WHERE Id = @p1
+ AND LastAttemptTimestamp IS NULL";
+
+ var operationRows = await this._context.Database.ExecuteSqlRawAsync(
+ OperationSql,
+ [now, operationId],
+ cancellationToken).ConfigureAwait(false);
+
+ if (operationRows == 0)
+ {
+ // Failed to lock operation - already locked
+ await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
+ return false;
+ }
+
+ // Lock content - set Ready = false (only if Ready = true)
+ const string ContentSql = @"
+ UPDATE km_content
+ SET Ready = 0,
+ RecordUpdatedAt = @p0
+ WHERE Id = @p1";
+
+ // Note: We don't check Ready = true because content might not exist yet (insert case)
+ // We execute this to ensure content is locked if it exists
+ await this._context.Database.ExecuteSqlRawAsync(
+ ContentSql,
+ [now, contentId],
+ cancellationToken).ConfigureAwait(false);
+
+ // Commit transaction
+ await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
+ return true;
+ }
+ catch
+ {
+ await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
+ throw;
+ }
+ }
+
+ ///
+ /// Step 3: Execute all remaining steps for an operation.
+ ///
+ ///
+ ///
+ private async Task ExecuteStepsAsync(OperationRecord operation, CancellationToken cancellationToken)
+ {
+ foreach (var step in operation.RemainingSteps)
+ {
+ this._logger.LogDebug("Executing step '{Step}' for operation {OperationId}", step, operation.Id);
+
+ switch (step)
+ {
+ case "upsert":
+ await this.ExecuteUpsertStepAsync(operation, cancellationToken).ConfigureAwait(false);
+ break;
+ case "delete":
+ await this.ExecuteDeleteStepAsync(operation, cancellationToken).ConfigureAwait(false);
+ break;
+ default:
+ throw new InvalidOperationException($"Unknown step type: {step}");
+ }
+
+ // Move step from Remaining to Completed
+ var completed = operation.CompletedSteps.Concat([step]).ToArray();
+ var remaining = operation.RemainingSteps.Where(s => s != step).ToArray();
+
+ operation.CompletedSteps = completed;
+ operation.RemainingSteps = remaining;
+
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Step '{Step}' completed for operation {OperationId}", step, operation.Id);
+ }
+ }
+
+ ///
+ /// Execute upsert step: delete existing + create new (if exists).
+ ///
+ ///
+ ///
+ private async Task ExecuteUpsertStepAsync(OperationRecord operation, CancellationToken cancellationToken)
+ {
+ var request = JsonSerializer.Deserialize(operation.PayloadJson)
+ ?? throw new InvalidOperationException($"Failed to deserialize upsert payload for operation {operation.Id}");
+
+ var now = DateTimeOffset.UtcNow;
+ var contentCreatedAt = request.ContentCreatedAt ?? now;
+
+ // Delete existing record if it exists
+ var existing = await this._context.Content.FirstOrDefaultAsync(c => c.Id == operation.ContentId, cancellationToken).ConfigureAwait(false);
+ if (existing != null)
+ {
+ this._context.Content.Remove(existing);
+ this._logger.LogDebug("Deleted existing content {ContentId} for upsert", operation.ContentId);
+ }
+
+ // Create new record
+ var content = new ContentRecord
+ {
+ Id = operation.ContentId,
+ Content = request.Content,
+ MimeType = request.MimeType,
+ ByteSize = Encoding.UTF8.GetByteCount(request.Content),
+ Ready = false, // Will be set to true when operation completes
+ ContentCreatedAt = contentCreatedAt,
+ RecordCreatedAt = existing?.RecordCreatedAt ?? now, // Preserve original creation time if exists
+ RecordUpdatedAt = now,
+ Title = request.Title,
+ Description = request.Description,
+ Tags = request.Tags,
+ Metadata = request.Metadata
+ };
+
+ this._context.Content.Add(content);
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+
+ this._logger.LogDebug("Created new content record {ContentId}", operation.ContentId);
+ }
+
+ ///
+ /// Execute delete step: delete content if exists (idempotent).
+ ///
+ ///
+ ///
+ private async Task ExecuteDeleteStepAsync(OperationRecord operation, CancellationToken cancellationToken)
+ {
+ var existing = await this._context.Content.FirstOrDefaultAsync(c => c.Id == operation.ContentId, cancellationToken).ConfigureAwait(false);
+
+ if (existing != null)
+ {
+ this._context.Content.Remove(existing);
+ await this._context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Deleted content {ContentId}", operation.ContentId);
+ }
+ else
+ {
+ this._logger.LogDebug("Content {ContentId} not found - delete is idempotent, no error", operation.ContentId);
+ }
+ }
+
+ ///
+ /// Step 4: Complete operation and unlock content.
+ ///
+ ///
+ ///
+ ///
+ private async Task CompleteAndUnlockAsync(string operationId, string contentId, CancellationToken cancellationToken)
+ {
+ using var transaction = await this._context.Database.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
+
+ try
+ {
+ var now = DateTimeOffset.UtcNow.ToString("O");
+
+ // Mark operation complete
+ const string OperationSql = @"
+ UPDATE km_operations
+ SET Complete = 1
+ WHERE Id = @p0";
+
+ await this._context.Database.ExecuteSqlRawAsync(OperationSql, [operationId], cancellationToken).ConfigureAwait(false);
+
+ // Unlock content (set Ready = true)
+ const string ContentSql = @"
+ UPDATE km_content
+ SET Ready = 1,
+ RecordUpdatedAt = @p0
+ WHERE Id = @p1";
+
+ await this._context.Database.ExecuteSqlRawAsync(ContentSql, [now, contentId], cancellationToken).ConfigureAwait(false);
+
+ await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
+ this._logger.LogDebug("Operation {OperationId} completed and content {ContentId} unlocked", operationId, contentId);
+ }
+ catch
+ {
+ await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
+ throw;
+ }
+ }
+}
diff --git a/src/Core/Storage/CuidGenerator.cs b/src/Core/Storage/CuidGenerator.cs
new file mode 100644
index 000000000..4ab894399
--- /dev/null
+++ b/src/Core/Storage/CuidGenerator.cs
@@ -0,0 +1,21 @@
+using Visus.Cuid;
+
+namespace KernelMemory.Core.Storage;
+
+///
+/// Default implementation of ICuidGenerator using Cuid.Net library.
+///
+public class CuidGenerator : ICuidGenerator
+{
+ ///
+ /// Generates a new lowercase Cuid2 identifier.
+ ///
+ /// A unique lowercase Cuid2 string.
+ public string Generate()
+ {
+ // Create new Cuid2 with default length (24 characters)
+ // Cuid2 generates lowercase IDs by default
+ var cuid = new Cuid2();
+ return cuid.ToString();
+ }
+}
diff --git a/src/Core/Storage/Entities/ContentRecord.cs b/src/Core/Storage/Entities/ContentRecord.cs
new file mode 100644
index 000000000..3fcb0de0b
--- /dev/null
+++ b/src/Core/Storage/Entities/ContentRecord.cs
@@ -0,0 +1,54 @@
+using System.Text.Json;
+
+namespace KernelMemory.Core.Storage.Entities;
+
+///
+/// Entity representing a content record in the Content table.
+/// Source of truth for all content in the system.
+///
+public class ContentRecord
+{
+ // Mandatory fields
+ public string Id { get; set; } = string.Empty;
+ public string Content { get; set; } = string.Empty;
+ public string MimeType { get; set; } = string.Empty;
+ public long ByteSize { get; set; }
+ public bool Ready { get; set; }
+ public DateTimeOffset ContentCreatedAt { get; set; }
+ public DateTimeOffset RecordCreatedAt { get; set; }
+ public DateTimeOffset RecordUpdatedAt { get; set; }
+
+ // Optional fields
+ public string Title { get; set; } = string.Empty;
+ public string Description { get; set; } = string.Empty;
+
+ // JSON-backed fields (stored as JSON strings in SQLite)
+ // Tags array
+ public string TagsJson { get; set; } = "[]";
+
+ ///
+ /// Gets or sets the tags array. Not mapped to database - uses TagsJson for persistence.
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1819:Properties should not return arrays")]
+ public string[] Tags
+ {
+ get => string.IsNullOrWhiteSpace(this.TagsJson)
+ ? []
+ : JsonSerializer.Deserialize(this.TagsJson) ?? [];
+ set => this.TagsJson = JsonSerializer.Serialize(value);
+ }
+
+ // Metadata key-value pairs
+ public string MetadataJson { get; set; } = "{}";
+
+ ///
+ /// Gets or sets the metadata dictionary. Not mapped to database - uses MetadataJson for persistence.
+ ///
+ public Dictionary Metadata
+ {
+ get => string.IsNullOrWhiteSpace(this.MetadataJson)
+ ? new Dictionary()
+ : JsonSerializer.Deserialize>(this.MetadataJson) ?? new Dictionary();
+ set => this.MetadataJson = JsonSerializer.Serialize(value);
+ }
+}
diff --git a/src/Core/Storage/Entities/OperationRecord.cs b/src/Core/Storage/Entities/OperationRecord.cs
new file mode 100644
index 000000000..7de369916
--- /dev/null
+++ b/src/Core/Storage/Entities/OperationRecord.cs
@@ -0,0 +1,80 @@
+using System.Text.Json;
+
+namespace KernelMemory.Core.Storage.Entities;
+
+///
+/// Entity representing an operation in the Operations table.
+/// Used for queue-based processing with distributed locking.
+///
+public class OperationRecord
+{
+ public string Id { get; set; } = string.Empty;
+ public bool Complete { get; set; }
+ public bool Cancelled { get; set; }
+ public string ContentId { get; set; } = string.Empty;
+ public DateTimeOffset Timestamp { get; set; }
+ public string LastFailureReason { get; set; } = string.Empty;
+
+ ///
+ /// When last attempt was made (nullable). Used for distributed locking.
+ /// If NOT NULL and Complete=false: operation is locked (executing or crashed).
+ ///
+ public DateTimeOffset? LastAttemptTimestamp { get; set; }
+
+ // JSON-backed array fields
+ public string PlannedStepsJson { get; set; } = "[]";
+
+ ///
+ /// Gets or sets the planned steps array. Not mapped to database - uses PlannedStepsJson for persistence.
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1819:Properties should not return arrays")]
+ public string[] PlannedSteps
+ {
+ get => string.IsNullOrWhiteSpace(this.PlannedStepsJson)
+ ? []
+ : JsonSerializer.Deserialize(this.PlannedStepsJson) ?? [];
+ set => this.PlannedStepsJson = JsonSerializer.Serialize(value);
+ }
+
+ public string CompletedStepsJson { get; set; } = "[]";
+
+ ///
+ /// Gets or sets the completed steps array. Not mapped to database - uses CompletedStepsJson for persistence.
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1819:Properties should not return arrays")]
+ public string[] CompletedSteps
+ {
+ get => string.IsNullOrWhiteSpace(this.CompletedStepsJson)
+ ? []
+ : JsonSerializer.Deserialize(this.CompletedStepsJson) ?? [];
+ set => this.CompletedStepsJson = JsonSerializer.Serialize(value);
+ }
+
+ public string RemainingStepsJson { get; set; } = "[]";
+
+ ///
+ /// Gets or sets the remaining steps array. Not mapped to database - uses RemainingStepsJson for persistence.
+ ///
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1819:Properties should not return arrays")]
+ public string[] RemainingSteps
+ {
+ get => string.IsNullOrWhiteSpace(this.RemainingStepsJson)
+ ? []
+ : JsonSerializer.Deserialize(this.RemainingStepsJson) ?? [];
+ set => this.RemainingStepsJson = JsonSerializer.Serialize(value);
+ }
+
+ // Payload stored as JSON
+ public string PayloadJson { get; set; } = "{}";
+
+ ///
+ /// Gets or sets the payload object. Not mapped to database - uses PayloadJson for persistence.
+ ///
+ public object? Payload
+ {
+ get => string.IsNullOrWhiteSpace(this.PayloadJson)
+ ? null
+ : JsonSerializer.Deserialize