Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
"./connection/UndiciConnection": "./lib/connection/UndiciConnection.js",
"./lib/connection/UndiciConnection": "./lib/connection/UndiciConnection.js",
"./pool/WeightedConnectionPool": "./lib/pool/WeightedConnectionPool.js",
"./lib/pool/WeightedConnectionPool": "./lib/pool/WeightedConnectionPool.js"
"./lib/pool/WeightedConnectionPool": "./lib/pool/WeightedConnectionPool.js",
"./middleware": "./lib/middleware/index.js",
"./lib/middleware": "./lib/middleware/index.js"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof, reminder that I want to refactor both libraries to properly support ESM. this pattern was a naive attempt to move that direction, but I'm almost certain there was a simpler way. 🙃

},
"scripts": {
"test": "npm run build && npm run lint && tap",
Expand Down
113 changes: 113 additions & 0 deletions src/middleware/CompressionMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import zlib from 'node:zlib'
import { promisify } from 'node:util'
import { Middleware, MiddlewareContext, MiddlewareResult } from './types'

const gzip = promisify(zlib.gzip)

export interface CompressionOptions {
enabled?: boolean
}

/**
* POC: Compression middleware demonstrating functional approach
* Returns new context instead of mutating original
*/
export class CompressionMiddleware implements Middleware {
readonly name = 'compression'
readonly priority = 20 // Execute after auth (if present)

constructor (private readonly options: CompressionOptions = {}) {}

/**
* Setup compression headers
* Return MiddlewareResult with new context, don't mutate
*/
onBeforeRequest = async (ctx: MiddlewareContext): Promise<MiddlewareResult | undefined> => {
if (!this.shouldCompress(ctx)) {
return undefined // void = no changes needed
}
Comment on lines +30 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
onBeforeRequest = async (ctx: MiddlewareContext): Promise<MiddlewareResult | undefined> => {
if (!this.shouldCompress(ctx)) {
return undefined // void = no changes needed
}
onBeforeRequest = async (ctx: MiddlewareContext): Promise<MiddlewareResult | void> => {
if (!this.shouldCompress(ctx)) {
return // void = no changes needed
}

void is a little clearer than undefined, IMO.


// Return NEW context with updates
return {
context: {
request: {
headers: {
...ctx.request.headers,
'accept-encoding': 'gzip,deflate'
}
},
shared: new Map([
...ctx.shared.entries(),
['compressionEnabled', true]
])
}
}
}

/**
* Compress request body
* Functional transformation of body and headers
*/
onRequest = async (ctx: MiddlewareContext): Promise<MiddlewareResult | undefined> => {
if (ctx.shared.get('compressionEnabled') !== true) {
return undefined // void = no compression needed
}

const { body } = ctx.request
if (body == null || body === '') {
return undefined // void = no body to compress
}

try {
// Only compress string and Buffer bodies (not streams)
if (typeof body === 'string' || Buffer.isBuffer(body)) {
// Compress body
const compressedBody = await gzip(body)

// Return NEW context with compressed body
return {
context: {
request: {
body: compressedBody,
headers: {
...ctx.request.headers,
'content-encoding': 'gzip',
'content-length': Buffer.byteLength(compressedBody).toString()
}
}
}
}
} else {
// For streams, we can't compress here - would need stream handling
console.debug('Skipping compression for stream body')
return undefined // void = no changes
}
} catch (error) {
console.warn('Compression failed:', error)
return undefined // void = continue without compression
}
}

/**
* Log compression info
* Always void return for cleanup
*/
onComplete = async (ctx: MiddlewareContext): Promise<void> => {
if (ctx.shared.get('compressionEnabled') === true) {
console.debug('Request processed with compression enabled')
}
// void return = no context changes
}

/**
* Determine if compression should be applied
*/
private shouldCompress (ctx: MiddlewareContext): boolean {
return this.options.enabled === true
}
}
105 changes: 105 additions & 0 deletions src/middleware/MiddlewareEngine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { Middleware, MiddlewareContext, MiddlewareResult, MiddlewarePhase } from './types'

/**
* POC: Minimal middleware execution engine
* Sequential execution with immutable context transformations
*/
export class MiddlewareEngine {
private readonly middleware: Middleware[] = []

/**
* Register middleware with automatic priority sorting
*/
register (middleware: Middleware): void {
this.middleware.push(middleware)
// Sort by priority (lower numbers execute first)
this.middleware.sort((a, b) => (a.priority ?? 100) - (b.priority ?? 100))
}

/**
* Execute middleware phase functionally
* Each middleware gets result of previous middleware
*/
async executePhase (
phase: MiddlewarePhase,
initialContext: MiddlewareContext
): Promise<{ context: MiddlewareContext, error?: Error }> {
let currentContext = initialContext

for (const middleware of this.middleware) {
const handler = middleware[phase]
if (handler == null) continue

try {
// Execute middleware handler
const result = await handler(currentContext)

if (result === undefined) {
// void return = no changes, continue to next middleware
continue
}

if (result.error != null) {
return { context: currentContext, error: result.error }
}

if (result.continue === false) {
// Middleware requested to stop execution
return { context: currentContext }
}

if (result.context != null) {
// Merge returned context changes immutably
currentContext = this.mergeContext(currentContext, result.context)
}
} catch (error) {
// Log middleware error but continue execution with other middleware (fault tolerance)
console.warn(`Middleware ${middleware.name} failed in ${phase}:`, error)
}
}

return { context: currentContext }
}

/**
* Immutable context merging
* Creates new context object without mutating the original
*/
private mergeContext (
current: MiddlewareContext,
updates: NonNullable<MiddlewareResult['context']>
): MiddlewareContext {
return {
...current,

// Merge request object if provided
request: updates.request != null
? {
...current.request,
...updates.request,
headers: updates.request.headers != null
? {
...current.request.headers,
...updates.request.headers
}
: current.request.headers
}
: current.request,

// Replace shared map if provided (maps are immutable)
shared: updates.shared ?? current.shared
}
}
Comment on lines +77 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bit we were worried would be a perf issue: deep-merging objects on a very hot code path could lead to a lot more garbage collection, which is a CPU hog.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried sth out here with benchmarked comparison b/w original code - original deep merge - optimized deep merge

#307


/**
* Get registered middleware for debugging
*/
getRegisteredMiddleware (): readonly Middleware[] {
return [...this.middleware]
}
}
9 changes: 9 additions & 0 deletions src/middleware/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

export * from './types'
export * from './MiddlewareEngine'
export * from './CompressionMiddleware'
export * from './poc-example'
78 changes: 78 additions & 0 deletions src/middleware/poc-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

/**
* POC Example: How to use the middleware system
*/

import { MiddlewareEngine } from './MiddlewareEngine'
import { CompressionMiddleware } from './CompressionMiddleware'
import { MiddlewareContext } from './types'

// Example usage of the POC middleware system
export async function pocExample (): Promise<MiddlewareContext> {
const engine = new MiddlewareEngine()

// Register compression middleware
engine.register(new CompressionMiddleware({ enabled: true }))

// Create example context
const context: MiddlewareContext = {
request: {
method: 'POST',
path: '/test/_search',
body: 'large request body that should be compressed',
headers: {}
},
options: {},
shared: new Map()
}

console.log('Original context:', context)

// Execute middleware phases
console.log('\n=== Executing onBeforeRequest ===')
const beforeResult = await engine.executePhase('onBeforeRequest', context)
console.log('After onBeforeRequest:', beforeResult.context.request.headers)

console.log('\n=== Executing onRequest ===')
const requestResult = await engine.executePhase('onRequest', beforeResult.context)
console.log('After onRequest - body compressed:', Buffer.isBuffer(requestResult.context.request.body))
console.log('Content-Encoding header:', requestResult.context.request.headers['content-encoding'])

console.log('\n=== Executing onComplete ===')
await engine.executePhase('onComplete', requestResult.context)

return requestResult.context
}

// Example test
export function exampleTest (): void {
const middleware = new CompressionMiddleware({ enabled: true })

const testContext: MiddlewareContext = {
request: {
method: 'POST',
path: '/test',
body: 'test data',
headers: {}
},
options: {},
shared: new Map()
}

// Pure function testing
const result = middleware.onBeforeRequest(testContext)

// Original context unchanged (immutable)
console.assert(testContext.request.headers['accept-encoding'] == null)

// Result contains new context with changes
if (result != null && 'context' in result && result.context != null) {
const context = result.context as { request?: { headers?: Record<string, string> } }
console.assert(context.request?.headers?.['accept-encoding'] === 'gzip,deflate')
console.log('✅ Test passed: Compression middleware adds headers functionally')
}
}
66 changes: 66 additions & 0 deletions src/middleware/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import * as http from 'node:http'
import { TransportRequestOptions } from '../Transport'
import { RequestBody } from '../types'

/**
* POC: Minimal middleware context - immutable design
*/
export interface MiddlewareContext {
// Request data (immutable)
readonly request: {
readonly method: string
readonly path: string
readonly body?: RequestBody
readonly headers: Readonly<http.IncomingHttpHeaders>
}

// Request options (immutable)
readonly options: Readonly<TransportRequestOptions>

// Shared state between middleware (immutable map)
readonly shared: ReadonlyMap<string, any>
}

/**
* POC: Middleware result for functional transformations
* Return new state instead of mutating existing context
*/
export interface MiddlewareResult {
// Updated context (partial merge into existing context)
context?: {
request?: {
headers?: http.IncomingHttpHeaders
body?: RequestBody
}
shared?: ReadonlyMap<string, any>
}

// Continue to next middleware (default: true)
continue?: boolean

// Error to propagate (stops execution chain)
error?: Error
}

/**
* POC: Simplified middleware interface
* Functional lifecycle hooks
*/
export interface Middleware {
readonly name: string
readonly priority?: number

// Return MiddlewareResult | void instead of mutating
onBeforeRequest?: (ctx: MiddlewareContext) => Promise<MiddlewareResult | undefined> | MiddlewareResult | undefined
onRequest?: (ctx: MiddlewareContext) => Promise<MiddlewareResult | undefined> | MiddlewareResult | undefined

// Always void return for cleanup
onComplete?: (ctx: MiddlewareContext) => Promise<void> | void
}

export type MiddlewarePhase = 'onBeforeRequest' | 'onRequest' | 'onComplete'