|
| 1 | +#!/usr/bin/env node |
| 2 | + |
| 3 | +/** |
| 4 | + * Workflow State Coordinator |
| 5 | + * |
| 6 | + * Manages coordination between multiple news generation workflows to prevent |
| 7 | + * duplicate work and improve efficiency: |
| 8 | + * - news-realtime-monitor.md (2x daily) |
| 9 | + * - news-evening-analysis.md (daily) |
| 10 | + * - news-article-generator.md (various schedules) |
| 11 | + * |
| 12 | + * Features: |
| 13 | + * - MCP query caching (2-hour TTL) |
| 14 | + * - Similarity-based article deduplication (>70% threshold) |
| 15 | + * - Recent article tracking (last 6 hours) |
| 16 | + * - Workflow coordination metadata |
| 17 | + * |
| 18 | + * Usage: |
| 19 | + * import { WorkflowStateCoordinator } from './workflow-state-coordinator.js'; |
| 20 | + * const coordinator = new WorkflowStateCoordinator(); |
| 21 | + * await coordinator.load(); |
| 22 | + * const isDuplicate = await coordinator.checkDuplicateArticle(title, topics); |
| 23 | + * |
| 24 | + * @see Issue #150 (News Realtime Monitor Enhancement) |
| 25 | + */ |
| 26 | + |
| 27 | +import fs from 'fs'; |
| 28 | +import path from 'path'; |
| 29 | +import { fileURLToPath } from 'url'; |
| 30 | +import crypto from 'crypto'; |
| 31 | + |
| 32 | +const __filename = fileURLToPath(import.meta.url); |
| 33 | +const __dirname = path.dirname(__filename); |
| 34 | + |
| 35 | +const STATE_FILE = path.join(__dirname, '..', 'news', 'metadata', 'workflow-state.json'); |
| 36 | +const MCP_CACHE_TTL_SECONDS = 2 * 60 * 60; // 2 hours |
| 37 | +const RECENT_ARTICLE_TTL_SECONDS = 6 * 60 * 60; // 6 hours |
| 38 | +const SIMILARITY_THRESHOLD = 0.70; // 70% similarity triggers deduplication |
| 39 | + |
| 40 | +/** |
| 41 | + * Workflow State Coordinator |
| 42 | + */ |
| 43 | +export class WorkflowStateCoordinator { |
| 44 | + constructor(stateFilePath = STATE_FILE) { |
| 45 | + this.stateFilePath = stateFilePath; |
| 46 | + this.state = { |
| 47 | + lastUpdate: null, |
| 48 | + recentArticles: [], |
| 49 | + mcpQueryCache: {}, |
| 50 | + workflows: {} |
| 51 | + }; |
| 52 | + } |
| 53 | + |
| 54 | + /** |
| 55 | + * Load state from disk |
| 56 | + */ |
| 57 | + async load() { |
| 58 | + try { |
| 59 | + if (fs.existsSync(this.stateFilePath)) { |
| 60 | + const content = fs.readFileSync(this.stateFilePath, 'utf-8'); |
| 61 | + this.state = JSON.parse(content); |
| 62 | + this.cleanupExpiredEntries(); |
| 63 | + } else { |
| 64 | + // Initialize empty state |
| 65 | + await this.save(); |
| 66 | + } |
| 67 | + } catch (error) { |
| 68 | + console.warn('Warning: Could not load workflow state:', error.message); |
| 69 | + // Continue with empty state |
| 70 | + } |
| 71 | + } |
| 72 | + |
| 73 | + /** |
| 74 | + * Save state to disk |
| 75 | + */ |
| 76 | + async save() { |
| 77 | + try { |
| 78 | + const dir = path.dirname(this.stateFilePath); |
| 79 | + if (!fs.existsSync(dir)) { |
| 80 | + fs.mkdirSync(dir, { recursive: true }); |
| 81 | + } |
| 82 | + |
| 83 | + this.state.lastUpdate = new Date().toISOString(); |
| 84 | + fs.writeFileSync(this.stateFilePath, JSON.stringify(this.state, null, 2), 'utf-8'); |
| 85 | + } catch (error) { |
| 86 | + console.error('Error saving workflow state:', error.message); |
| 87 | + throw error; |
| 88 | + } |
| 89 | + } |
| 90 | + |
| 91 | + /** |
| 92 | + * Clean up expired cache entries and old articles |
| 93 | + */ |
| 94 | + cleanupExpiredEntries() { |
| 95 | + const now = Date.now(); |
| 96 | + |
| 97 | + // Clean MCP cache (2-hour TTL) |
| 98 | + Object.keys(this.state.mcpQueryCache).forEach(key => { |
| 99 | + const entry = this.state.mcpQueryCache[key]; |
| 100 | + const entryTime = new Date(entry.timestamp).getTime(); |
| 101 | + if (now - entryTime > MCP_CACHE_TTL_SECONDS * 1000) { |
| 102 | + delete this.state.mcpQueryCache[key]; |
| 103 | + } |
| 104 | + }); |
| 105 | + |
| 106 | + // Clean recent articles (6-hour TTL) |
| 107 | + this.state.recentArticles = this.state.recentArticles.filter(article => { |
| 108 | + const articleTime = new Date(article.timestamp).getTime(); |
| 109 | + return (now - articleTime) <= RECENT_ARTICLE_TTL_SECONDS * 1000; |
| 110 | + }); |
| 111 | + } |
| 112 | + |
| 113 | + /** |
| 114 | + * Cache MCP query result |
| 115 | + * |
| 116 | + * @param {string} queryKey - Unique identifier for the query |
| 117 | + * @param {any} result - Query result to cache |
| 118 | + * @param {number} ttl - Time to live in seconds (default: 2 hours) |
| 119 | + */ |
| 120 | + async cacheMCPQuery(queryKey, result, ttl = MCP_CACHE_TTL_SECONDS) { |
| 121 | + const resultHash = this.hashObject(result); |
| 122 | + |
| 123 | + this.state.mcpQueryCache[queryKey] = { |
| 124 | + timestamp: new Date().toISOString(), |
| 125 | + ttl, |
| 126 | + resultHash, |
| 127 | + result |
| 128 | + }; |
| 129 | + |
| 130 | + await this.save(); |
| 131 | + } |
| 132 | + |
| 133 | + /** |
| 134 | + * Get cached MCP query result |
| 135 | + * |
| 136 | + * @param {string} queryKey - Unique identifier for the query |
| 137 | + * @returns {any|null} Cached result or null if expired/missing |
| 138 | + */ |
| 139 | + getCachedMCPQuery(queryKey) { |
| 140 | + this.cleanupExpiredEntries(); |
| 141 | + |
| 142 | + const entry = this.state.mcpQueryCache[queryKey]; |
| 143 | + if (!entry) return null; |
| 144 | + |
| 145 | + const now = Date.now(); |
| 146 | + const entryTime = new Date(entry.timestamp).getTime(); |
| 147 | + |
| 148 | + if (now - entryTime > (entry.ttl * 1000)) { |
| 149 | + delete this.state.mcpQueryCache[queryKey]; |
| 150 | + return null; |
| 151 | + } |
| 152 | + |
| 153 | + return entry.result; |
| 154 | + } |
| 155 | + |
| 156 | + /** |
| 157 | + * Add recent article to tracking |
| 158 | + * |
| 159 | + * @param {Object} article - Article metadata |
| 160 | + */ |
| 161 | + async addRecentArticle(article) { |
| 162 | + const articleEntry = { |
| 163 | + slug: article.slug, |
| 164 | + timestamp: new Date().toISOString(), |
| 165 | + workflow: article.workflow || 'unknown', |
| 166 | + title: article.title, |
| 167 | + topics: article.topics || [], |
| 168 | + mcpQueries: article.mcpQueries || [] |
| 169 | + }; |
| 170 | + |
| 171 | + this.state.recentArticles.push(articleEntry); |
| 172 | + await this.save(); |
| 173 | + } |
| 174 | + |
| 175 | + /** |
| 176 | + * Check if article is duplicate based on similarity |
| 177 | + * |
| 178 | + * @param {string} title - Article title |
| 179 | + * @param {string[]} topics - Article topics |
| 180 | + * @param {string[]} sources - Article sources |
| 181 | + * @returns {Object} { isDuplicate: boolean, matchedArticle: Object|null, similarityScore: number } |
| 182 | + */ |
| 183 | + async checkDuplicateArticle(title, topics = [], sources = []) { |
| 184 | + this.cleanupExpiredEntries(); |
| 185 | + |
| 186 | + let maxSimilarity = 0; |
| 187 | + let matchedArticle = null; |
| 188 | + |
| 189 | + for (const recentArticle of this.state.recentArticles) { |
| 190 | + const similarity = this.calculateSimilarity( |
| 191 | + title, |
| 192 | + topics, |
| 193 | + sources, |
| 194 | + recentArticle.title, |
| 195 | + recentArticle.topics, |
| 196 | + recentArticle.mcpQueries |
| 197 | + ); |
| 198 | + |
| 199 | + if (similarity > maxSimilarity) { |
| 200 | + maxSimilarity = similarity; |
| 201 | + matchedArticle = recentArticle; |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + const isDuplicate = maxSimilarity >= SIMILARITY_THRESHOLD; |
| 206 | + |
| 207 | + return { |
| 208 | + isDuplicate, |
| 209 | + matchedArticle: isDuplicate ? matchedArticle : null, |
| 210 | + similarityScore: maxSimilarity |
| 211 | + }; |
| 212 | + } |
| 213 | + |
| 214 | + /** |
| 215 | + * Calculate similarity between two articles |
| 216 | + * |
| 217 | + * Uses weighted combination of: |
| 218 | + * - Title similarity (50%) |
| 219 | + * - Topic overlap (30%) |
| 220 | + * - Source overlap (20%) |
| 221 | + * |
| 222 | + * @returns {number} Similarity score 0.0-1.0 |
| 223 | + */ |
| 224 | + calculateSimilarity(title1, topics1, sources1, title2, topics2, sources2) { |
| 225 | + const titleSim = this.stringSimilarity(title1, title2); |
| 226 | + const topicSim = this.setOverlap(topics1, topics2); |
| 227 | + const sourceSim = this.setOverlap(sources1, sources2); |
| 228 | + |
| 229 | + return (titleSim * 0.5) + (topicSim * 0.3) + (sourceSim * 0.2); |
| 230 | + } |
| 231 | + |
| 232 | + /** |
| 233 | + * Calculate string similarity using Jaccard similarity of word sets |
| 234 | + * |
| 235 | + * @param {string} str1 - First string |
| 236 | + * @param {string} str2 - Second string |
| 237 | + * @returns {number} Similarity 0.0-1.0 |
| 238 | + */ |
| 239 | + stringSimilarity(str1, str2) { |
| 240 | + if (!str1 || !str2) return 0; |
| 241 | + |
| 242 | + const words1 = new Set(str1.toLowerCase().split(/\s+/).filter(w => w.length > 2)); |
| 243 | + const words2 = new Set(str2.toLowerCase().split(/\s+/).filter(w => w.length > 2)); |
| 244 | + |
| 245 | + return this.setOverlap([...words1], [...words2]); |
| 246 | + } |
| 247 | + |
| 248 | + /** |
| 249 | + * Calculate set overlap (Jaccard similarity) |
| 250 | + * |
| 251 | + * @param {Array} set1 - First set |
| 252 | + * @param {Array} set2 - Second set |
| 253 | + * @returns {number} Overlap 0.0-1.0 |
| 254 | + */ |
| 255 | + setOverlap(set1, set2) { |
| 256 | + if (!set1 || !set2 || set1.length === 0 || set2.length === 0) return 0; |
| 257 | + |
| 258 | + const s1 = new Set(set1.map(x => String(x).toLowerCase())); |
| 259 | + const s2 = new Set(set2.map(x => String(x).toLowerCase())); |
| 260 | + |
| 261 | + const intersection = new Set([...s1].filter(x => s2.has(x))); |
| 262 | + const union = new Set([...s1, ...s2]); |
| 263 | + |
| 264 | + return intersection.size / union.size; |
| 265 | + } |
| 266 | + |
| 267 | + /** |
| 268 | + * Hash object for cache comparison |
| 269 | + * |
| 270 | + * @param {any} obj - Object to hash |
| 271 | + * @returns {string} SHA-256 hash |
| 272 | + */ |
| 273 | + hashObject(obj) { |
| 274 | + const str = JSON.stringify(obj, Object.keys(obj).sort()); |
| 275 | + return crypto.createHash('sha256').update(str).digest('hex').substring(0, 16); |
| 276 | + } |
| 277 | + |
| 278 | + /** |
| 279 | + * Record workflow execution |
| 280 | + * |
| 281 | + * @param {string} workflowName - Name of workflow |
| 282 | + * @param {Object} metadata - Execution metadata |
| 283 | + */ |
| 284 | + async recordWorkflowExecution(workflowName, metadata = {}) { |
| 285 | + if (!this.state.workflows[workflowName]) { |
| 286 | + this.state.workflows[workflowName] = { |
| 287 | + lastRun: null, |
| 288 | + runCount: 0, |
| 289 | + articlesGenerated: 0 |
| 290 | + }; |
| 291 | + } |
| 292 | + |
| 293 | + this.state.workflows[workflowName].lastRun = new Date().toISOString(); |
| 294 | + this.state.workflows[workflowName].runCount++; |
| 295 | + |
| 296 | + if (metadata.articlesGenerated) { |
| 297 | + this.state.workflows[workflowName].articlesGenerated += metadata.articlesGenerated; |
| 298 | + } |
| 299 | + |
| 300 | + await this.save(); |
| 301 | + } |
| 302 | + |
| 303 | + /** |
| 304 | + * Get recent articles from last N hours |
| 305 | + * |
| 306 | + * @param {number} hours - Hours to look back |
| 307 | + * @returns {Array} Recent articles |
| 308 | + */ |
| 309 | + getRecentArticles(hours = 6) { |
| 310 | + this.cleanupExpiredEntries(); |
| 311 | + |
| 312 | + const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000); |
| 313 | + |
| 314 | + return this.state.recentArticles.filter(article => { |
| 315 | + return new Date(article.timestamp) >= cutoff; |
| 316 | + }); |
| 317 | + } |
| 318 | + |
| 319 | + /** |
| 320 | + * Get workflow statistics |
| 321 | + * |
| 322 | + * @returns {Object} Statistics by workflow |
| 323 | + */ |
| 324 | + getWorkflowStatistics() { |
| 325 | + return { |
| 326 | + ...this.state.workflows, |
| 327 | + cacheSize: Object.keys(this.state.mcpQueryCache).length, |
| 328 | + recentArticlesCount: this.state.recentArticles.length |
| 329 | + }; |
| 330 | + } |
| 331 | +} |
| 332 | + |
| 333 | +// Export for direct usage |
| 334 | +export { |
| 335 | + MCP_CACHE_TTL_SECONDS, |
| 336 | + RECENT_ARTICLE_TTL_SECONDS, |
| 337 | + SIMILARITY_THRESHOLD |
| 338 | +}; |
0 commit comments