From 1372cc4340d9578e6b7eabccceb9bf5dabfbd638 Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Mon, 2 Mar 2026 16:53:36 -0300 Subject: [PATCH 1/5] feat: consolidate registerFunction to accept handler or HttpInvocationConfig Removes registerHttpFunction/register_http_function from all three SDKs (Node, Python, Rust) and unifies the API so registerFunction accepts either a local handler or an HttpInvocationConfig for HTTP-invoked functions. Also adds Rust e2e integration tests for HTTP external functions and a developer Makefile. --- Makefile | 170 +++++++ packages/node/iii/src/iii.ts | 119 +++-- packages/node/iii/src/types.ts | 16 +- .../iii/tests/http-external-functions.test.ts | 14 +- packages/python/iii/src/iii/iii.py | 59 +-- packages/python/iii/src/iii/types.py | 2 +- ...est_http_external_functions_integration.py | 22 +- packages/rust/iii/src/iii.rs | 255 +++++----- packages/rust/iii/src/lib.rs | 4 +- packages/rust/iii/src/types.rs | 2 +- .../rust/iii/tests/http_external_functions.rs | 435 ++++++++++++++++++ 11 files changed, 822 insertions(+), 276 deletions(-) create mode 100644 Makefile create mode 100644 packages/rust/iii/tests/http_external_functions.rs diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7aad8f6 --- /dev/null +++ b/Makefile @@ -0,0 +1,170 @@ +III_BIN ?= iii +III_ENGINE_DIR ?= ../iii-engine +III_PID_FILE ?= /tmp/iii-engine.pid +III_LOG_FILE ?= /tmp/iii-engine.log +III_ENGINE_DATA ?= /tmp/iii-engine-data + +III_CONFIG_NODE = packages/node/iii/tests/fixtures/config-test.yaml +III_CONFIG_SHARED = .github/engine-config/test-config.yml + +.PHONY: help engine-build engine-start engine-stop engine-status engine-logs engine-clean +.PHONY: setup setup-node setup-python +.PHONY: lint lint-node lint-python lint-rust +.PHONY: test test-node test-python test-rust +.PHONY: integration integration-node integration-python integration-rust +.PHONY: ci ci-node ci-python ci-rust + +help: + @echo "III SDK Developer Makefile" + @echo "" + @echo "Engine management:" + @echo " engine-build Build engine from source" + @echo " engine-start Start engine (requires III_CONFIG, III_HEALTH_PORT)" + @echo " engine-stop Stop engine" + @echo " engine-status Check if engine is running" + @echo " engine-logs Tail engine log file" + @echo " engine-clean Remove engine data, logs, and PID file" + @echo "" + @echo "Setup:" + @echo " setup Install deps for all SDKs" + @echo " setup-node pnpm install" + @echo " setup-python uv sync in packages/python/iii" + @echo "" + @echo "Lint:" + @echo " lint Lint all SDKs" + @echo " lint-node Biome + tsc" + @echo " lint-python Ruff + mypy" + @echo " lint-rust cargo fmt + clippy" + @echo "" + @echo "Tests (engine must be running):" + @echo " test Run all SDK tests" + @echo " test-node Node.js tests" + @echo " test-python Python tests" + @echo " test-rust Rust tests" + @echo "" + @echo "Integration (start engine, test, stop):" + @echo " integration Run all integration tests" + @echo " integration-node integration-python integration-rust" + @echo "" + @echo "CI-like (build engine + integration):" + @echo " ci Build engine, run all integration tests" + @echo " ci-node ci-python ci-rust" + @echo "" + @echo "Variables (override with VAR=value):" + @echo " III_BIN Engine binary path or alias (default: iii)" + @echo " III_ENGINE_DIR Engine source dir for engine-build (default: ../iii-engine)" + @echo "" + @echo "Examples:" + @echo " make ci-node III_ENGINE_DIR=../iii-engine" + @echo " make integration" + @echo " make integration-rust III_BIN=/path/to/iii" + +engine-build: + @echo "Building III engine in $(III_ENGINE_DIR)..." + @cd "$(III_ENGINE_DIR)" && cargo build --release + @echo "Engine built at $(III_ENGINE_DIR)/target/release/iii" + +engine-start: + @test -n "$(III_CONFIG)" || { echo "error: III_CONFIG is required (e.g. make engine-start III_CONFIG=path/to/config.yml III_HEALTH_PORT=49134)"; exit 1; } + @test -n "$(III_HEALTH_PORT)" || { echo "error: III_HEALTH_PORT is required (e.g. make engine-start III_CONFIG=path/to/config.yml III_HEALTH_PORT=49134)"; exit 1; } + @$(MAKE) engine-stop 2>/dev/null || true + @pid=$$(lsof -ti :$(III_HEALTH_PORT) 2>/dev/null); [ -z "$$pid" ] || { kill -9 $$pid 2>/dev/null || true; sleep 2; } + @rm -rf "$(III_ENGINE_DATA)" + @mkdir -p "$(III_ENGINE_DATA)" + @cp "$(CURDIR)/$(III_CONFIG)" "$(III_ENGINE_DATA)/config.yml" + @ENGINE_BIN="$(III_BIN)"; \ + case "$$ENGINE_BIN" in /*) ;; *) ENGINE_BIN="$(CURDIR)/$$ENGINE_BIN";; esac; \ + cd "$(III_ENGINE_DATA)" && $$ENGINE_BIN --config config.yml > "$(III_LOG_FILE)" 2>&1 & echo $$! > "$(III_PID_FILE)" + @echo "Waiting for III Engine on port $(III_HEALTH_PORT)..." + @for i in $$(seq 1 30); do \ + if nc -z 127.0.0.1 $(III_HEALTH_PORT) 2>/dev/null; then \ + echo "III Engine is ready!"; exit 0; \ + fi; \ + echo " attempt $$i/30..."; sleep 2; \ + done; \ + echo "Engine did not become ready. Logs:"; tail -n 50 "$(III_LOG_FILE)" || true; exit 1 + +engine-stop: + @if [ -f "$(III_PID_FILE)" ]; then \ + kill "$$(cat $(III_PID_FILE))" 2>/dev/null || true; \ + rm -f "$(III_PID_FILE)"; \ + echo "Engine stopped"; \ + else \ + echo "No engine PID file found"; \ + fi + +engine-status: + @if [ -f "$(III_PID_FILE)" ] && kill -0 "$$(cat $(III_PID_FILE))" 2>/dev/null; then \ + echo "Engine is running (PID $$(cat $(III_PID_FILE)))"; \ + else \ + echo "Engine is not running"; exit 1; \ + fi + +engine-logs: + @tail -f "$(III_LOG_FILE)" + +engine-clean: + @rm -rf "$(III_ENGINE_DATA)" + @rm -f "$(III_LOG_FILE)" "$(III_PID_FILE)" + @echo "Engine data cleaned" + +setup: setup-node setup-python + +setup-node: + @cd packages/node && pnpm install + +setup-python: + @cd packages/python/iii && uv sync --extra dev + +lint: lint-node lint-python lint-rust + +lint-node: + @npx @biomejs/biome check packages/node + @cd packages/node && pnpm --filter iii-sdk exec tsc --noEmit + +lint-python: + @cd packages/python/iii && uv run ruff check src && uv run mypy src + +lint-rust: + @cd packages/rust/iii && cargo fmt --all -- --check + @cd packages/rust/iii && cargo clippy --all-targets --all-features -- -D warnings + +test: test-node test-python test-rust + +test-node: + @cd packages/node && pnpm --filter iii-sdk test + +test-python: + @cd packages/python/iii && uv run pytest -q + +test-rust: + @cd packages/rust/iii && cargo test --all-features --quiet + +integration-node: + @$(MAKE) engine-start III_CONFIG="$(III_CONFIG_NODE)" III_HEALTH_PORT=49199 + @trap '$(MAKE) engine-stop' 0 1 2 3 15; \ + III_BRIDGE_URL=ws://localhost:49199 III_HTTP_URL=http://localhost:3199 $(MAKE) test-node + +integration-python: + @$(MAKE) engine-start III_CONFIG="$(III_CONFIG_SHARED)" III_HEALTH_PORT=49134 + @trap '$(MAKE) engine-stop' 0 1 2 3 15; \ + III_BRIDGE_URL=ws://localhost:49134 III_HTTP_URL=http://localhost:3199 $(MAKE) test-python + +integration-rust: + @$(MAKE) engine-start III_CONFIG="$(III_CONFIG_SHARED)" III_HEALTH_PORT=49134 + @trap '$(MAKE) engine-stop' 0 1 2 3 15; \ + III_BRIDGE_URL=ws://localhost:49134 III_HTTP_URL=http://localhost:3199 $(MAKE) test-rust + +integration: integration-node integration-python integration-rust + +ci-node: engine-build + @$(MAKE) integration-node III_BIN="$(III_ENGINE_DIR)/target/release/iii" + +ci-python: engine-build + @$(MAKE) integration-python III_BIN="$(III_ENGINE_DIR)/target/release/iii" + +ci-rust: engine-build + @$(MAKE) integration-rust III_BIN="$(III_ENGINE_DIR)/target/release/iii" + +ci: engine-build + @$(MAKE) integration III_BIN="$(III_ENGINE_DIR)/target/release/iii" diff --git a/packages/node/iii/src/iii.ts b/packages/node/iii/src/iii.ts index 50bcd1f..f472ba2 100644 --- a/packages/node/iii/src/iii.ts +++ b/packages/node/iii/src/iii.ts @@ -102,7 +102,6 @@ export type InitOptions = { class Sdk implements ISdk { private ws?: WebSocket private functions = new Map() - private httpFunctions = new Map() private services = new Map>() private invocations = new Map() private triggers = new Map() @@ -187,44 +186,63 @@ class Sdk implements ISdk { registerFunction = ( message: Omit, - handler: RemoteFunctionHandler, + handlerOrInvocation: RemoteFunctionHandler | HttpInvocationConfig, ): FunctionRef => { if (!message.id || message.id.trim() === '') { throw new Error('id is required') } - if (this.httpFunctions.has(message.id)) { + if (this.functions.has(message.id)) { throw new Error(`function id already registered: ${message.id}`) } - this.sendMessage(MessageType.RegisterFunction, message, true) - this.functions.set(message.id, { - message: { ...message, message_type: MessageType.RegisterFunction }, - handler: async (input, traceparent?: string, baggage?: string) => { - // If we have a tracer, wrap in a span and pass it to the context - if (getTracer()) { - // Extract both traceparent and baggage into a parent context - const parentContext = extractContext(traceparent, baggage) - - return context.with(parentContext, () => - withSpan(`call ${message.id}`, { kind: SpanKind.SERVER }, async span => { - const traceId = currentTraceId() ?? crypto.randomUUID() - const spanId = currentSpanId() - const logger = new Logger(traceId, message.id, spanId) - const ctx = { logger, trace: span } - - return withContext(async () => await handler(input), ctx) - }), - ) + const isHandler = typeof handlerOrInvocation === 'function' + + const fullMessage: RegisterFunctionMessage = isHandler + ? { ...message, message_type: MessageType.RegisterFunction } + : { + ...message, + message_type: MessageType.RegisterFunction, + invocation: { + url: handlerOrInvocation.url, + method: handlerOrInvocation.method ?? 'POST', + timeout_ms: handlerOrInvocation.timeout_ms, + headers: handlerOrInvocation.headers, + auth: handlerOrInvocation.auth, + }, } - // Fallback without tracing - const traceId = crypto.randomUUID() - const logger = new Logger(traceId, message.id) - const ctx = { logger } + this.sendMessage(MessageType.RegisterFunction, fullMessage, true) + + if (isHandler) { + const handler = handlerOrInvocation as RemoteFunctionHandler + this.functions.set(message.id, { + message: fullMessage, + handler: async (input, traceparent?: string, baggage?: string) => { + if (getTracer()) { + const parentContext = extractContext(traceparent, baggage) + + return context.with(parentContext, () => + withSpan(`call ${message.id}`, { kind: SpanKind.SERVER }, async span => { + const traceId = currentTraceId() ?? crypto.randomUUID() + const spanId = currentSpanId() + const logger = new Logger(traceId, message.id, spanId) + const ctx = { logger, trace: span } + + return withContext(async () => await handler(input), ctx) + }), + ) + } - return withContext(async () => await handler(input), ctx) - }, - }) + const traceId = crypto.randomUUID() + const logger = new Logger(traceId, message.id) + const ctx = { logger } + + return withContext(async () => await handler(input), ctx) + }, + }) + } else { + this.functions.set(message.id, { message: fullMessage }) + } return { id: message.id, @@ -235,38 +253,6 @@ class Sdk implements ISdk { } } - registerHttpFunction = (id: string, config: HttpInvocationConfig): FunctionRef => { - if (!id || id.trim() === '') { - throw new Error('id is required') - } - if (this.functions.has(id) || this.httpFunctions.has(id)) { - throw new Error(`function id already registered: ${id}`) - } - - const message: RegisterFunctionMessage = { - message_type: MessageType.RegisterFunction, - id, - invocation: { - url: config.url, - method: config.method ?? 'POST', - timeout_ms: config.timeout_ms, - headers: config.headers, - auth: config.auth, - }, - } - - this.sendMessage(MessageType.RegisterFunction, message, true) - this.httpFunctions.set(id, message) - - return { - id, - unregister: () => { - this.sendMessage(MessageType.UnregisterFunction, { id }, true) - this.httpFunctions.delete(id) - }, - } - } - registerService = (message: Omit): void => { this.sendMessage(MessageType.RegisterService, message, true) this.services.set(message.id, { ...message, message_type: MessageType.RegisterService }) @@ -643,9 +629,6 @@ class Sdk implements ISdk { this.functions.forEach(({ message }) => { this.sendMessage(MessageType.RegisterFunction, message, true) }) - this.httpFunctions.forEach(message => { - this.sendMessage(MessageType.RegisterFunction, message, true) - }) this.triggers.forEach(trigger => { this.sendMessage(MessageType.RegisterTrigger, trigger, true) }) @@ -798,7 +781,7 @@ class Sdk implements ISdk { const resolvedInput = this.resolveChannelValue(input) as TInput - if (fn) { + if (fn?.handler) { if (!invocation_id) { try { await fn.handler(resolvedInput, traceparent, baggage) @@ -827,10 +810,14 @@ class Sdk implements ISdk { }) } } else { + const errorCode = fn ? 'function_not_invokable' : 'function_not_found' + const errorMessage = fn + ? 'Function is HTTP-invoked and cannot be invoked locally' + : 'Function not found' this.sendMessage(MessageType.InvocationResult, { invocation_id, function_id, - error: { code: 'function_not_found', message: 'Function not found' }, + error: { code: errorCode, message: errorMessage }, traceparent, baggage, }) diff --git a/packages/node/iii/src/types.ts b/packages/node/iii/src/types.ts index 0176505..1390925 100644 --- a/packages/node/iii/src/types.ts +++ b/packages/node/iii/src/types.ts @@ -70,7 +70,7 @@ export type InternalFunctionHandler = ( export type RemoteFunctionData = { message: RegisterFunctionMessage - handler: InternalFunctionHandler + handler?: InternalFunctionHandler } export type RemoteServiceFunctionData = { @@ -100,19 +100,13 @@ export interface ISdk { /** * Registers a new function. A function is a unit of work that can be invoked by other services. + * Pass a handler for local execution, or an HttpInvocationConfig for HTTP-invoked functions. * @param func - The function to register - * @param handler - The handler for the function - * @returns A function object that can be used to invoke the function + * @param handlerOrInvocation - The handler for local execution, or HTTP invocation config + * @returns A function object that can be used to unregister the function */ registerFunction(func: RegisterFunctionInput, handler: RemoteFunctionHandler): FunctionRef - - /** - * Registers an HTTP external function (Lambda, Cloudflare Workers, etc.). The engine invokes the URL when triggered. - * @param id - Function path (use :: for namespacing, e.g. external::my_lambda) - * @param config - HTTP endpoint config (url, method, timeout_ms, headers, auth) - * @returns A function ref for unregistering - */ - registerHttpFunction(id: string, config: HttpInvocationConfig): FunctionRef + registerFunction(func: RegisterFunctionInput, invocation: HttpInvocationConfig): FunctionRef /** * Invokes a function. diff --git a/packages/node/iii/tests/http-external-functions.test.ts b/packages/node/iii/tests/http-external-functions.test.ts index 7f95bdf..df2bd13 100644 --- a/packages/node/iii/tests/http-external-functions.test.ts +++ b/packages/node/iii/tests/http-external-functions.test.ts @@ -142,7 +142,7 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerHttpFunction(functionId, { + httpFn = iii.registerFunction({ id: functionId }, { url: webhookProbe.url(), method: 'POST', timeout_ms: 3000, @@ -183,7 +183,7 @@ describe('HTTP external functions', () => { let httpFn: { id: string; unregister(): void } | undefined try { - httpFn = iii.registerHttpFunction(functionId, { + httpFn = iii.registerFunction({ id: functionId }, { url: webhookProbe.url(), method: 'POST', timeout_ms: 3000, @@ -220,7 +220,7 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerHttpFunction(functionId, { + httpFn = iii.registerFunction({ id: functionId }, { url: webhookProbe.url(), method: 'POST', timeout_ms: 3000, @@ -277,13 +277,13 @@ describe('HTTP external functions', () => { let httpFnB: { unregister(): void } | undefined try { - httpFnA = iii.registerHttpFunction(functionIdA, { + httpFnA = iii.registerFunction({ id: functionIdA }, { url: webhookProbeA.url(), method: 'POST', timeout_ms: 3000, }) - httpFnB = iii.registerHttpFunction(functionIdB, { + httpFnB = iii.registerFunction({ id: functionIdB }, { url: webhookProbeB.url(), method: 'POST', timeout_ms: 3000, @@ -341,7 +341,7 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerHttpFunction(functionId, { + httpFn = iii.registerFunction({ id: functionId }, { url: webhookProbe.url(), method: 'POST', timeout_ms: 3000, @@ -400,7 +400,7 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerHttpFunction(functionId, { + httpFn = iii.registerFunction({ id: functionId }, { url: webhookProbe.url(), method: 'PUT', timeout_ms: 3000, diff --git a/packages/python/iii/src/iii/iii.py b/packages/python/iii/src/iii/iii.py index 11a3c75..48122a6 100644 --- a/packages/python/iii/src/iii/iii.py +++ b/packages/python/iii/src/iii/iii.py @@ -105,7 +105,6 @@ def __init__(self, address: str, options: InitOptions | None = None) -> None: self._options = options or InitOptions() self._ws: ClientConnection | None = None self._functions: dict[str, RemoteFunctionData] = {} - self._http_functions: dict[str, RegisterFunctionMessage] = {} self._services: dict[str, RegisterServiceMessage] = {} self._pending: dict[str, asyncio.Future[Any]] = {} self._triggers: dict[str, RegisterTriggerMessage] = {} @@ -214,8 +213,6 @@ async def _on_connected(self) -> None: await self._send(svc) for function_data in self._functions.values(): await self._send(function_data.message) - for msg in self._http_functions.values(): - await self._send(msg) for trigger in self._triggers.values(): await self._send(trigger) @@ -409,14 +406,16 @@ async def _handle_invoke( ) -> None: func = self._functions.get(path) - if not func: - log.warning(f"Function not found: {path}") + if not func or not func.handler: + error_code = "function_not_invokable" if func else "function_not_found" + error_msg = "Function is HTTP-invoked and cannot be invoked locally" if func else f"Function '{path}' not found" + log.warning(error_msg) if invocation_id: await self._send( InvocationResultMessage( invocation_id=invocation_id, function_id=path, - error={"code": "function_not_found", "message": f"Function '{path}' not found"}, + error={"code": error_code, "message": error_msg}, ) ) return @@ -561,24 +560,33 @@ def unregister() -> None: def register_function( self, path: str, - handler: RemoteFunctionHandler, + handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig, description: str | None = None, metadata: dict[str, Any] | None = None, ) -> FunctionRef: if not path or not path.strip(): raise ValueError("id is required") - if path in self._http_functions: - raise ValueError(f"function id '{path}' already registered as HTTP function") + if path in self._functions: + raise ValueError(f"function id '{path}' already registered") - msg = RegisterFunctionMessage(id=path, description=description, metadata=metadata) - self._send_if_connected(msg) + is_handler = isinstance(handler_or_invocation, HttpInvocationConfig) is False + + if is_handler: + handler = handler_or_invocation + msg = RegisterFunctionMessage(id=path, description=description, metadata=metadata) + self._send_if_connected(msg) - async def wrapped(input_data: Any) -> Any: - logger = Logger(function_name=path) - ctx = Context(logger=logger) - return await with_context(lambda _: handler(input_data), ctx) + async def wrapped(input_data: Any) -> Any: + logger = Logger(function_name=path) + ctx = Context(logger=logger) + return await with_context(lambda _: handler(input_data), ctx) - self._functions[path] = RemoteFunctionData(message=msg, handler=wrapped) + self._functions[path] = RemoteFunctionData(message=msg, handler=wrapped) + else: + config = handler_or_invocation + msg = RegisterFunctionMessage(id=path, invocation=config, description=description, metadata=metadata) + self._send_if_connected(msg) + self._functions[path] = RemoteFunctionData(message=msg) def unregister() -> None: self._functions.pop(path, None) @@ -586,25 +594,6 @@ def unregister() -> None: return FunctionRef(id=path, unregister=unregister) - def register_http_function(self, id: str, config: HttpInvocationConfig) -> FunctionRef: - if not id or not id.strip(): - raise ValueError("id is required") - if id in self._functions or id in self._http_functions: - raise ValueError(f"function id '{id}' already registered") - - msg = RegisterFunctionMessage( - id=id, - invocation=config, - ) - self._send_if_connected(msg) - self._http_functions[id] = msg - - def unregister() -> None: - self._http_functions.pop(id, None) - self._send_if_connected(UnregisterFunctionMessage(id=id)) - - return FunctionRef(id=id, unregister=unregister) - def register_service(self, id: str, description: str | None = None, parent_id: str | None = None) -> None: msg = RegisterServiceMessage(id=id, description=description, parent_service_id=parent_id) self._services[id] = msg diff --git a/packages/python/iii/src/iii/types.py b/packages/python/iii/src/iii/types.py index 519d493..464bcf4 100644 --- a/packages/python/iii/src/iii/types.py +++ b/packages/python/iii/src/iii/types.py @@ -36,7 +36,7 @@ class RemoteFunctionData(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) message: RegisterFunctionMessage - handler: RemoteFunctionHandler + handler: RemoteFunctionHandler | None = None class RemoteTriggerTypeData(BaseModel): diff --git a/packages/python/iii/tests/test_http_external_functions_integration.py b/packages/python/iii/tests/test_http_external_functions_integration.py index 053bfbc..0b66d0d 100644 --- a/packages/python/iii/tests/test_http_external_functions_integration.py +++ b/packages/python/iii/tests/test_http_external_functions_integration.py @@ -196,7 +196,7 @@ async def test_register_http_function_sends_invocation_message(monkeypatch: pyte client = await _make_connected_client() config = HttpInvocationConfig(url="https://example.com/invoke", method="POST", timeout_ms=3000) - ref = client.register_http_function("external::my_lambda", config) + ref = client.register_function("external::my_lambda", config) await asyncio.sleep(0.02) assert ref.id == "external::my_lambda" @@ -225,7 +225,7 @@ async def test_register_http_function_with_all_config_options(monkeypatch: pytes headers={"X-Custom-Header": "test-value", "X-Another": "123"}, auth=HttpAuthBearer(token_key="MY_SECRET_TOKEN"), ) - ref = client.register_http_function("external::full_config", config) + ref = client.register_function("external::full_config", config) await asyncio.sleep(0.02) assert ref.id == "external::full_config" @@ -250,7 +250,7 @@ async def test_unregister_removes_function_from_sent_messages(monkeypatch: pytes client = await _make_connected_client() config = HttpInvocationConfig(url="https://example.com/fn", method="POST") - ref = client.register_http_function("external::to_remove", config) + ref = client.register_function("external::to_remove", config) await asyncio.sleep(0.02) # Verify registration was sent. @@ -268,7 +268,7 @@ async def test_unregister_removes_function_from_sent_messages(monkeypatch: pytes # Verify the function is removed from internal tracking so it would not be # re-registered on reconnect. - assert "external::to_remove" not in client._http_functions + assert "external::to_remove" not in client._functions await client.shutdown() @@ -292,7 +292,7 @@ async def test_delivers_queue_events_to_external_http_function() -> None: http_fn = None try: - http_fn = client.register_http_function( + http_fn = client.register_function( function_id, HttpInvocationConfig(url=probe.url(), method="POST", timeout_ms=3000), ) @@ -329,7 +329,7 @@ async def test_registers_and_unregisters_external_function() -> None: http_fn = None try: - http_fn = client.register_http_function( + http_fn = client.register_function( function_id, HttpInvocationConfig(url=probe.url(), method="POST", timeout_ms=3000), ) @@ -382,7 +382,7 @@ async def test_delivers_events_with_custom_headers() -> None: http_fn = None try: - http_fn = client.register_http_function( + http_fn = client.register_function( function_id, HttpInvocationConfig( url=probe.url(), @@ -443,11 +443,11 @@ async def test_delivers_events_to_multiple_external_functions() -> None: trigger_b = None try: - http_fn_a = client.register_http_function( + http_fn_a = client.register_function( fn_id_a, HttpInvocationConfig(url=probe_a.url("/hook_a"), method="POST", timeout_ms=3000), ) - http_fn_b = client.register_http_function( + http_fn_b = client.register_function( fn_id_b, HttpInvocationConfig(url=probe_b.url("/hook_b"), method="POST", timeout_ms=3000), ) @@ -498,7 +498,7 @@ async def test_stops_delivering_after_unregister() -> None: http_fn = None try: - http_fn = client.register_http_function( + http_fn = client.register_function( function_id, HttpInvocationConfig(url=probe.url(), method="POST", timeout_ms=3000), ) @@ -546,7 +546,7 @@ async def test_delivers_with_put_method() -> None: http_fn = None try: - http_fn = client.register_http_function( + http_fn = client.register_function( function_id, HttpInvocationConfig(url=probe.url(), method="PUT", timeout_ms=3000), ) diff --git a/packages/rust/iii/src/iii.rs b/packages/rust/iii/src/iii.rs index f37e2c9..1533a21 100644 --- a/packages/rust/iii/src/iii.rs +++ b/packages/rust/iii/src/iii.rs @@ -174,17 +174,49 @@ fn inject_trace_headers() -> (Option, Option) { pub type FunctionsAvailableCallback = Arc) + Send + Sync>; #[derive(Clone)] -pub struct HttpFunctionRef { +pub struct FunctionRef { pub id: String, unregister_fn: Arc, } -impl HttpFunctionRef { +impl FunctionRef { pub fn unregister(&self) { (self.unregister_fn)(); } } +pub trait IntoFunctionHandler { + fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option; +} + +impl IntoFunctionHandler for HttpInvocationConfig { + fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option { + message.invocation = Some(self); + None + } +} + +impl IntoFunctionHandler for F +where + F: Fn(Value) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, +{ + fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option { + let function_id = message.id.clone(); + let user_handler = Arc::new(move |input: Value| Box::pin(self(input))); + let wrapped: RemoteFunctionHandler = Arc::new(move |input: Value| { + let function_id = function_id.clone(); + let user_handler = user_handler.clone(); + Box::pin(async move { + let logger = Logger::new(Some(function_id)); + let context = Context { logger, span: None }; + with_context(context, || user_handler(input)).await + }) + }); + Some(wrapped) + } +} + struct IIIInner { address: String, outbound: mpsc::UnboundedSender, @@ -193,7 +225,6 @@ struct IIIInner { started: AtomicBool, pending: Mutex>, functions: Mutex>, - http_functions: Mutex>, trigger_types: Mutex>, triggers: Mutex>, services: Mutex>, @@ -252,7 +283,6 @@ impl III { started: AtomicBool::new(false), pending: Mutex::new(HashMap::new()), functions: Mutex::new(HashMap::new()), - http_functions: Mutex::new(HashMap::new()), trigger_types: Mutex::new(HashMap::new()), triggers: Mutex::new(HashMap::new()), services: Mutex::new(HashMap::new()), @@ -359,142 +389,86 @@ impl III { telemetry::shutdown_otel().await; } - pub fn register_function(&self, id: impl Into, handler: F) - where - F: Fn(Value) -> Fut + Send + Sync + 'static, - Fut: std::future::Future> + Send + 'static, - { - let message = RegisterFunctionMessage { - id: id.into(), - description: None, - request_format: None, - response_format: None, - metadata: None, - invocation: None, - }; - - self.register_function_with(message, handler); - } - - pub fn register_function_with_description( + fn register_function_inner( &self, - id: impl Into, - description: impl Into, - handler: F, - ) where - F: Fn(Value) -> Fut + Send + Sync + 'static, - Fut: std::future::Future> + Send + 'static, - { - let message = RegisterFunctionMessage { - id: id.into(), - description: Some(description.into()), - request_format: None, - response_format: None, - metadata: None, - invocation: None, - }; - - self.register_function_with(message, handler); - } - - pub fn register_function_with(&self, message: RegisterFunctionMessage, handler: F) - where - F: Fn(Value) -> Fut + Send + Sync + 'static, - Fut: std::future::Future> + Send + 'static, - { - if self - .inner - .http_functions - .lock_or_recover() - .contains_key(&message.id) - { - panic!( - "function id '{}' already registered as HTTP function", - message.id - ); + message: RegisterFunctionMessage, + handler: Option, + ) -> FunctionRef { + let id = message.id.clone(); + if self.inner.functions.lock_or_recover().contains_key(&id) { + panic!("function id '{}' already registered", id); } - let function_id = message.id.clone(); - - let user_handler = Arc::new(move |input: Value| Box::pin(handler(input))); - - let wrapped_handler: RemoteFunctionHandler = Arc::new(move |input: Value| { - let function_id = function_id.clone(); - let user_handler = user_handler.clone(); - - Box::pin(async move { - let logger = Logger::new(Some(function_id.clone())); - let context = Context { logger, span: None }; - - with_context(context, || user_handler(input)).await - }) - }); - let data = RemoteFunctionData { message: message.clone(), - handler: wrapped_handler, + handler, }; - self.inner .functions .lock_or_recover() - .insert(message.id.clone(), data); + .insert(id.clone(), data); let _ = self.send_message(message.to_message()); + + let iii = self.clone(); + let unregister_id = id.clone(); + let unregister_fn = Arc::new(move || { + let _ = iii.inner.functions.lock_or_recover().remove(&unregister_id); + let _ = iii.send_message(Message::UnregisterFunction { + id: unregister_id.clone(), + }); + }); + + FunctionRef { + id, + unregister_fn, + } } - pub fn register_http_function( + pub fn register_function( &self, id: impl Into, - config: HttpInvocationConfig, - ) -> Result { + handler: H, + ) -> FunctionRef { let id = id.into(); if id.trim().is_empty() { - return Err(IIIError::Remote { - code: "invalid_id".into(), - message: "id is required".into(), - }); - } - if self.inner.functions.lock_or_recover().contains_key(&id) - || self - .inner - .http_functions - .lock_or_recover() - .contains_key(&id) - { - return Err(IIIError::Remote { - code: "duplicate_id".into(), - message: "function id already registered".into(), - }); + panic!("id is required"); } - - let message = RegisterFunctionMessage { + let mut message = RegisterFunctionMessage { id: id.clone(), description: None, request_format: None, response_format: None, metadata: None, - invocation: Some(config), + invocation: None, }; + let handler = handler.into_parts(&mut message); + self.register_function_inner(message, handler) + } - self.inner - .http_functions - .lock_or_recover() - .insert(id.clone(), message.clone()); - let _ = self.send_message(message.to_message()); - - let iii = self.clone(); - let unregister_id = id.clone(); - let unregister_fn = Arc::new(move || { - let _ = iii - .inner - .http_functions - .lock_or_recover() - .remove(&unregister_id); - let _ = iii.send_message(Message::UnregisterFunction { - id: unregister_id.clone(), - }); - }); + pub fn register_function_with_description( + &self, + id: impl Into, + description: impl Into, + handler: H, + ) -> FunctionRef { + let mut message = RegisterFunctionMessage { + id: id.into(), + description: Some(description.into()), + request_format: None, + response_format: None, + metadata: None, + invocation: None, + }; + let handler = handler.into_parts(&mut message); + self.register_function_inner(message, handler) + } - Ok(HttpFunctionRef { id, unregister_fn }) + pub fn register_function_with( + &self, + mut message: RegisterFunctionMessage, + handler: H, + ) -> FunctionRef { + let handler = handler.into_parts(&mut message); + self.register_function_inner(message, handler) } pub fn register_service(&self, id: impl Into, description: Option) { @@ -948,10 +922,6 @@ impl III { messages.push(function.message.to_message()); } - for message in self.inner.http_functions.lock_or_recover().values() { - messages.push(message.to_message()); - } - for trigger in self.inner.triggers.lock_or_recover().values() { messages.push(trigger.to_message()); } @@ -1093,23 +1063,26 @@ impl III { ) { tracing::debug!(function_id = %function_id, traceparent = ?traceparent, baggage = ?baggage, "Invoking function"); - let handler = self - .inner - .functions - .lock_or_recover() - .get(&function_id) - .map(|data| data.handler.clone()); + let func_data = self.inner.functions.lock_or_recover().get(&function_id).cloned(); + let handler = func_data.as_ref().and_then(|d| d.handler.clone()); let Some(handler) = handler else { - tracing::warn!(function_id = %function_id, "Invocation: Function not found"); + let (code, message) = match &func_data { + Some(_) => ( + "function_not_invokable".to_string(), + "Function is HTTP-invoked and cannot be invoked locally".to_string(), + ), + None => ( + "function_not_found".to_string(), + "Function not found".to_string(), + ), + }; + tracing::warn!(function_id = %function_id, "Invocation: {}", message); if let Some(invocation_id) = invocation_id { let (resp_tp, resp_bg) = inject_trace_headers(); - let error = ErrorBody { - code: "function_not_found".to_string(), - message: "Function not found".to_string(), - }; + let error = ErrorBody { code, message }; let result = self.send_message(Message::InvocationResult { invocation_id, function_id, @@ -1291,7 +1264,7 @@ mod tests { } #[test] - fn register_http_function_stores_and_unregister_removes() { + fn register_function_with_http_config_stores_and_unregister_removes() { let iii = III::new("ws://localhost:1234"); let config = HttpInvocationConfig { url: "https://example.com/invoke".to_string(), @@ -1301,20 +1274,19 @@ mod tests { auth: None, }; - let http_fn = iii - .register_http_function("external::my_lambda", config) - .unwrap(); + let func_ref = iii.register_function("external::my_lambda", config); - assert_eq!(http_fn.id, "external::my_lambda"); - assert_eq!(iii.inner.http_functions.lock().unwrap().len(), 1); + assert_eq!(func_ref.id, "external::my_lambda"); + assert_eq!(iii.inner.functions.lock().unwrap().len(), 1); - http_fn.unregister(); + func_ref.unregister(); - assert_eq!(iii.inner.http_functions.lock().unwrap().len(), 0); + assert_eq!(iii.inner.functions.lock().unwrap().len(), 0); } #[test] - fn register_http_function_rejects_empty_id() { + #[should_panic(expected = "id is required")] + fn register_function_rejects_empty_id() { let iii = III::new("ws://localhost:1234"); let config = HttpInvocationConfig { url: "https://example.com/invoke".to_string(), @@ -1324,8 +1296,7 @@ mod tests { auth: None, }; - let result = iii.register_http_function("", config); - assert!(result.is_err()); + iii.register_function("", config); } #[tokio::test] diff --git a/packages/rust/iii/src/lib.rs b/packages/rust/iii/src/lib.rs index f4fb18b..a2ed812 100644 --- a/packages/rust/iii/src/lib.rs +++ b/packages/rust/iii/src/lib.rs @@ -17,8 +17,8 @@ pub use channels::{ pub use context::{Context, get_context, with_context}; pub use error::IIIError; pub use iii::{ - FunctionInfo, FunctionsAvailableGuard, HttpFunctionRef, III, TriggerInfo, WorkerInfo, - WorkerMetadata, + FunctionInfo, FunctionRef, FunctionsAvailableGuard, IntoFunctionHandler, III, TriggerInfo, + WorkerInfo, WorkerMetadata, }; pub use logger::Logger; pub use protocol::{ diff --git a/packages/rust/iii/src/types.rs b/packages/rust/iii/src/types.rs index 1be7449..9492d2d 100644 --- a/packages/rust/iii/src/types.rs +++ b/packages/rust/iii/src/types.rs @@ -152,7 +152,7 @@ pub struct StreamUpdateInput { #[derive(Clone)] pub struct RemoteFunctionData { pub message: RegisterFunctionMessage, - pub handler: RemoteFunctionHandler, + pub handler: Option, } #[derive(Clone)] diff --git a/packages/rust/iii/tests/http_external_functions.rs b/packages/rust/iii/tests/http_external_functions.rs new file mode 100644 index 0000000..984b157 --- /dev/null +++ b/packages/rust/iii/tests/http_external_functions.rs @@ -0,0 +1,435 @@ +//! Integration tests for HTTP external function invocation. +//! +//! Requires a running III engine at `ws://localhost:49134`. + +use std::collections::HashMap; +use std::time::Duration; + +use serde_json::{Value, json}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use iii_sdk::{HttpInvocationConfig, HttpMethod, III}; + +const ENGINE_WS_URL: &str = "ws://localhost:49134"; + +async fn settle() { + tokio::time::sleep(Duration::from_millis(300)).await; +} + +fn unique_function_id(prefix: &str) -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + format!("{}::{}::{}", prefix, ts, uuid::Uuid::new_v4().simple()) +} + +fn unique_topic(prefix: &str) -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + format!("{}.{}", prefix, ts) +} + +#[derive(Debug, Clone)] +struct CapturedWebhook { + method: String, + url: String, + headers: HashMap, + body: Option, +} + +struct WebhookProbe { + listener: TcpListener, +} + +impl WebhookProbe { + async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("failed to bind webhook server"); + Self { listener } + } + + fn url(&self) -> String { + let addr = self.listener.local_addr().expect("no local addr"); + format!("http://127.0.0.1:{}/webhook", addr.port()) + } + + async fn accept_one(&self) -> CapturedWebhook { + let (mut stream, _) = self.listener.accept().await.expect("accept failed"); + + let mut buf = vec![0u8; 8192]; + let n = stream.read(&mut buf).await.expect("read failed"); + let raw = String::from_utf8_lossy(&buf[..n]).to_string(); + + let mut lines = raw.lines(); + let request_line = lines.next().unwrap_or(""); + let parts: Vec<&str> = request_line.splitn(3, ' ').collect(); + let method = parts.first().copied().unwrap_or("POST").to_string(); + let url = parts.get(1).copied().unwrap_or("/").to_string(); + let url = url.split('?').next().unwrap_or("/").to_string(); + + let mut headers = HashMap::new(); + let raw_bytes = &buf[..n]; + + let mut header_end = 0; + for i in 0..n.saturating_sub(3) { + if raw_bytes[i] == b'\r' + && raw_bytes[i + 1] == b'\n' + && raw_bytes[i + 2] == b'\r' + && raw_bytes[i + 3] == b'\n' + { + header_end = i + 4; + break; + } + } + if header_end == 0 { + for i in 0..n.saturating_sub(1) { + if raw_bytes[i] == b'\n' && raw_bytes[i + 1] == b'\n' { + header_end = i + 2; + break; + } + } + } + let body_start = header_end; + + for line in raw.lines().skip(1) { + if line.is_empty() { + break; + } + if let Some((k, v)) = line.split_once(':') { + headers.insert(k.trim().to_lowercase(), v.trim().to_string()); + } + } + + let body_bytes = &raw_bytes[body_start..]; + let body = if body_bytes.is_empty() { + None + } else { + serde_json::from_slice(body_bytes).ok() + }; + + let response = b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 10\r\n\r\n{\"ok\":true}"; + let _ = stream.write_all(response).await; + + CapturedWebhook { method, url, headers, body } + } + + async fn wait_for_webhook(&self, timeout: Duration) -> Option { + tokio::time::timeout(timeout, self.accept_one()).await.ok() + } +} + +#[tokio::test] +async fn delivers_queue_events_to_external_http_function() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe = WebhookProbe::start().await; + let function_id = unique_function_id("test::http_external::target::rs"); + let topic = unique_topic("test.http_external.topic.rs"); + let payload = json!({"hello": "world", "count": 1}); + + let http_fn = iii.register_function( + &function_id, + HttpInvocationConfig { + url: probe.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + settle().await; + + let _trigger = iii + .register_trigger("queue", &function_id, json!({"topic": topic})) + .expect("register trigger"); + settle().await; + + iii.call("enqueue", json!({"topic": topic, "data": payload})) + .await + .expect("enqueue failed"); + + let webhook = probe + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook received"); + + assert_eq!(webhook.method, "POST"); + assert_eq!(webhook.url, "/webhook"); + assert_eq!(webhook.body.as_ref().unwrap()["hello"], "world"); + assert_eq!(webhook.body.as_ref().unwrap()["count"], 1); + + drop(_trigger); + http_fn.unregister(); + iii.shutdown_async().await; +} + +#[tokio::test] +async fn registers_and_unregisters_external_http_function() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe = WebhookProbe::start().await; + let function_id = unique_function_id("test::http_external::reg_unreg::rs"); + + let http_fn = iii.register_function( + &function_id, + HttpInvocationConfig { + url: probe.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + settle().await; + + let found = { + let functions = iii.list_functions().await.expect("list_functions failed"); + functions.iter().any(|f| f.function_id == function_id) + }; + assert!(found, "function should appear after registration"); + + http_fn.unregister(); + settle().await; + + let gone = { + let functions = iii.list_functions().await.expect("list_functions failed"); + !functions.iter().any(|f| f.function_id == function_id) + }; + assert!(gone, "function should be absent after unregister"); + + iii.shutdown_async().await; +} + +#[tokio::test] +async fn delivers_events_with_custom_headers() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe = WebhookProbe::start().await; + let function_id = unique_function_id("test::http_external::headers::rs"); + let topic = unique_topic("test.http_external.headers.rs"); + let payload = json!({"msg": "with-headers"}); + + let mut custom_headers = HashMap::new(); + custom_headers.insert("x-custom-header".to_string(), "test-value".to_string()); + custom_headers.insert("x-another".to_string(), "123".to_string()); + + let http_fn = iii.register_function( + &function_id, + HttpInvocationConfig { + url: probe.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: custom_headers, + auth: None, + }, + ); + settle().await; + + let _trigger = iii + .register_trigger("queue", &function_id, json!({"topic": topic})) + .expect("register trigger"); + settle().await; + + iii.call("enqueue", json!({"topic": topic, "data": payload})) + .await + .expect("enqueue failed"); + + let webhook = probe + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook received"); + + assert_eq!(webhook.method, "POST"); + assert_eq!(webhook.headers.get("x-custom-header").map(String::as_str), Some("test-value")); + assert_eq!(webhook.headers.get("x-another").map(String::as_str), Some("123")); + + drop(_trigger); + http_fn.unregister(); + iii.shutdown_async().await; +} + +#[tokio::test] +async fn delivers_events_to_multiple_external_functions() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe_a = WebhookProbe::start().await; + let probe_b = WebhookProbe::start().await; + let function_id_a = unique_function_id("test::http_external::multi_a::rs"); + let function_id_b = unique_function_id("test::http_external::multi_b::rs"); + let topic_a = unique_topic("test.http_external.multi_a.rs"); + let topic_b = unique_topic("test.http_external.multi_b.rs"); + let payload_a = json!({"source": "topic-a", "value": 1}); + let payload_b = json!({"source": "topic-b", "value": 2}); + + let http_fn_a = iii.register_function( + &function_id_a, + HttpInvocationConfig { + url: probe_a.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + let http_fn_b = iii.register_function( + &function_id_b, + HttpInvocationConfig { + url: probe_b.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + settle().await; + + let _trigger_a = iii + .register_trigger("queue", &function_id_a, json!({"topic": topic_a})) + .expect("register trigger a"); + let _trigger_b = iii + .register_trigger("queue", &function_id_b, json!({"topic": topic_b})) + .expect("register trigger b"); + settle().await; + + iii.call("enqueue", json!({"topic": topic_a, "data": payload_a})) + .await + .expect("enqueue a failed"); + iii.call("enqueue", json!({"topic": topic_b, "data": payload_b})) + .await + .expect("enqueue b failed"); + + let webhook_a = probe_a + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook received for a"); + let webhook_b = probe_b + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook received for b"); + + assert_eq!(webhook_a.body.as_ref().unwrap()["source"], "topic-a"); + assert_eq!(webhook_b.body.as_ref().unwrap()["source"], "topic-b"); + + drop(_trigger_a); + drop(_trigger_b); + http_fn_a.unregister(); + http_fn_b.unregister(); + iii.shutdown_async().await; +} + +#[tokio::test] +async fn stops_delivering_events_after_unregister() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe = WebhookProbe::start().await; + let function_id = unique_function_id("test::http_external::stop::rs"); + let topic = unique_topic("test.http_external.stop.rs"); + let payload_before = json!({"phase": "before-unregister"}); + let payload_after = json!({"phase": "after-unregister"}); + + let http_fn = iii.register_function( + &function_id, + HttpInvocationConfig { + url: probe.url(), + method: HttpMethod::Post, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + settle().await; + + let trigger = iii + .register_trigger("queue", &function_id, json!({"topic": topic})) + .expect("register trigger"); + settle().await; + + iii.call("enqueue", json!({"topic": topic, "data": payload_before})) + .await + .expect("enqueue before failed"); + + let webhook_before = probe + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook before unregister"); + assert_eq!(webhook_before.body.as_ref().unwrap()["phase"], "before-unregister"); + + drop(trigger); + http_fn.unregister(); + tokio::time::sleep(Duration::from_millis(500)).await; + + iii.call("enqueue", json!({"topic": topic, "data": payload_after})) + .await + .expect("enqueue after failed"); + + let received_after = probe + .wait_for_webhook(Duration::from_secs(2)) + .await + .is_some(); + assert!(!received_after, "should not receive webhook after unregister"); + + iii.shutdown_async().await; +} + +#[tokio::test] +async fn delivers_events_using_put_method() { + let iii = III::new(ENGINE_WS_URL); + iii.connect().await.expect("failed to connect"); + settle().await; + + let probe = WebhookProbe::start().await; + let function_id = unique_function_id("test::http_external::put_method::rs"); + let topic = unique_topic("test.http_external.put.rs"); + let payload = json!({"method_test": "put", "value": 42}); + + let http_fn = iii.register_function( + &function_id, + HttpInvocationConfig { + url: probe.url(), + method: HttpMethod::Put, + timeout_ms: Some(3000), + headers: HashMap::new(), + auth: None, + }, + ); + settle().await; + + let _trigger = iii + .register_trigger("queue", &function_id, json!({"topic": topic})) + .expect("register trigger"); + settle().await; + + iii.call("enqueue", json!({"topic": topic, "data": payload})) + .await + .expect("enqueue failed"); + + let webhook = probe + .wait_for_webhook(Duration::from_secs(7)) + .await + .expect("no webhook received"); + + assert_eq!(webhook.method, "PUT"); + assert_eq!(webhook.body.as_ref().unwrap()["method_test"], "put"); + assert_eq!(webhook.body.as_ref().unwrap()["value"], 42); + + drop(_trigger); + http_fn.unregister(); + iii.shutdown_async().await; +} From 52c42b67792ef17ec6fe7108f3fd69caa82664a7 Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Tue, 3 Mar 2026 06:17:57 -0300 Subject: [PATCH 2/5] refactor: improve function registration and error handling across SDKs - Enhanced the Makefile to support additional methods for checking running processes on the specified health port. - Updated the Sdk class in Node.js to ensure error messages are only sent if an invocation ID is present. - Refactored function registration in Rust to include validation for empty IDs and improved error handling for duplicate registrations. - Cleaned up formatting in various test files for better readability and consistency. --- Makefile | 3 +- packages/node/iii/src/iii.ts | 16 +-- .../iii/tests/http-external-functions.test.ts | 99 +++++++++++-------- packages/python/iii/src/iii/iii.py | 10 +- packages/rust/iii/src/iii.rs | 33 ++++--- packages/rust/iii/src/lib.rs | 2 +- .../rust/iii/tests/http_external_functions.rs | 27 ++++- 7 files changed, 122 insertions(+), 68 deletions(-) diff --git a/Makefile b/Makefile index 7aad8f6..21af972 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,8 @@ engine-start: @test -n "$(III_CONFIG)" || { echo "error: III_CONFIG is required (e.g. make engine-start III_CONFIG=path/to/config.yml III_HEALTH_PORT=49134)"; exit 1; } @test -n "$(III_HEALTH_PORT)" || { echo "error: III_HEALTH_PORT is required (e.g. make engine-start III_CONFIG=path/to/config.yml III_HEALTH_PORT=49134)"; exit 1; } @$(MAKE) engine-stop 2>/dev/null || true - @pid=$$(lsof -ti :$(III_HEALTH_PORT) 2>/dev/null); [ -z "$$pid" ] || { kill -9 $$pid 2>/dev/null || true; sleep 2; } + @pid=$$(lsof -ti :$(III_HEALTH_PORT) 2>/dev/null || ss -tlnp 2>/dev/null | grep ':$(III_HEALTH_PORT) ' | sed -n 's/.*pid=\([0-9]*\).*/\1/p' || fuser $(III_HEALTH_PORT)/tcp 2>/dev/null); \ + [ -z "$$pid" ] || { kill -9 $$pid 2>/dev/null || true; sleep 2; } @rm -rf "$(III_ENGINE_DATA)" @mkdir -p "$(III_ENGINE_DATA)" @cp "$(CURDIR)/$(III_CONFIG)" "$(III_ENGINE_DATA)/config.yml" diff --git a/packages/node/iii/src/iii.ts b/packages/node/iii/src/iii.ts index f472ba2..0677725 100644 --- a/packages/node/iii/src/iii.ts +++ b/packages/node/iii/src/iii.ts @@ -814,13 +814,15 @@ class Sdk implements ISdk { const errorMessage = fn ? 'Function is HTTP-invoked and cannot be invoked locally' : 'Function not found' - this.sendMessage(MessageType.InvocationResult, { - invocation_id, - function_id, - error: { code: errorCode, message: errorMessage }, - traceparent, - baggage, - }) + if (invocation_id) { + this.sendMessage(MessageType.InvocationResult, { + invocation_id, + function_id, + error: { code: errorCode, message: errorMessage }, + traceparent, + baggage, + }) + } } } diff --git a/packages/node/iii/tests/http-external-functions.test.ts b/packages/node/iii/tests/http-external-functions.test.ts index df2bd13..91e2199 100644 --- a/packages/node/iii/tests/http-external-functions.test.ts +++ b/packages/node/iii/tests/http-external-functions.test.ts @@ -142,11 +142,14 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerFunction({ id: functionId }, { - url: webhookProbe.url(), - method: 'POST', - timeout_ms: 3000, - }) + httpFn = iii.registerFunction( + { id: functionId }, + { + url: webhookProbe.url(), + method: 'POST', + timeout_ms: 3000, + }, + ) await sleep(300) trigger = iii.registerTrigger({ @@ -183,11 +186,14 @@ describe('HTTP external functions', () => { let httpFn: { id: string; unregister(): void } | undefined try { - httpFn = iii.registerFunction({ id: functionId }, { - url: webhookProbe.url(), - method: 'POST', - timeout_ms: 3000, - }) + httpFn = iii.registerFunction( + { id: functionId }, + { + url: webhookProbe.url(), + method: 'POST', + timeout_ms: 3000, + }, + ) await sleep(300) const functionsAfterRegister = await execute(async () => iii.listFunctions()) @@ -220,15 +226,18 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerFunction({ id: functionId }, { - url: webhookProbe.url(), - method: 'POST', - timeout_ms: 3000, - headers: { - 'X-Custom-Header': 'test-value', - 'X-Another': '123', + httpFn = iii.registerFunction( + { id: functionId }, + { + url: webhookProbe.url(), + method: 'POST', + timeout_ms: 3000, + headers: { + 'X-Custom-Header': 'test-value', + 'X-Another': '123', + }, }, - }) + ) await sleep(300) trigger = iii.registerTrigger({ @@ -277,17 +286,23 @@ describe('HTTP external functions', () => { let httpFnB: { unregister(): void } | undefined try { - httpFnA = iii.registerFunction({ id: functionIdA }, { - url: webhookProbeA.url(), - method: 'POST', - timeout_ms: 3000, - }) - - httpFnB = iii.registerFunction({ id: functionIdB }, { - url: webhookProbeB.url(), - method: 'POST', - timeout_ms: 3000, - }) + httpFnA = iii.registerFunction( + { id: functionIdA }, + { + url: webhookProbeA.url(), + method: 'POST', + timeout_ms: 3000, + }, + ) + + httpFnB = iii.registerFunction( + { id: functionIdB }, + { + url: webhookProbeB.url(), + method: 'POST', + timeout_ms: 3000, + }, + ) await sleep(300) triggerA = iii.registerTrigger({ @@ -341,11 +356,14 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerFunction({ id: functionId }, { - url: webhookProbe.url(), - method: 'POST', - timeout_ms: 3000, - }) + httpFn = iii.registerFunction( + { id: functionId }, + { + url: webhookProbe.url(), + method: 'POST', + timeout_ms: 3000, + }, + ) await sleep(300) trigger = iii.registerTrigger({ @@ -400,11 +418,14 @@ describe('HTTP external functions', () => { let httpFn: { unregister(): void } | undefined try { - httpFn = iii.registerFunction({ id: functionId }, { - url: webhookProbe.url(), - method: 'PUT', - timeout_ms: 3000, - }) + httpFn = iii.registerFunction( + { id: functionId }, + { + url: webhookProbe.url(), + method: 'PUT', + timeout_ms: 3000, + }, + ) await sleep(300) trigger = iii.registerTrigger({ diff --git a/packages/python/iii/src/iii/iii.py b/packages/python/iii/src/iii/iii.py index 48122a6..9a4ae82 100644 --- a/packages/python/iii/src/iii/iii.py +++ b/packages/python/iii/src/iii/iii.py @@ -408,7 +408,10 @@ async def _handle_invoke( if not func or not func.handler: error_code = "function_not_invokable" if func else "function_not_found" - error_msg = "Function is HTTP-invoked and cannot be invoked locally" if func else f"Function '{path}' not found" + if func: + error_msg = "Function is HTTP-invoked and cannot be invoked locally" + else: + error_msg = f"Function '{path}' not found" log.warning(error_msg) if invocation_id: await self._send( @@ -572,6 +575,11 @@ def register_function( is_handler = isinstance(handler_or_invocation, HttpInvocationConfig) is False if is_handler: + if not callable(handler_or_invocation): + actual_type = type(handler_or_invocation).__name__ + raise TypeError( + f"handler_or_invocation must be callable or HttpInvocationConfig, got {actual_type}" + ) handler = handler_or_invocation msg = RegisterFunctionMessage(id=path, description=description, metadata=metadata) self._send_if_connected(msg) diff --git a/packages/rust/iii/src/iii.rs b/packages/rust/iii/src/iii.rs index 1533a21..ab0a2a0 100644 --- a/packages/rust/iii/src/iii.rs +++ b/packages/rust/iii/src/iii.rs @@ -395,17 +395,23 @@ impl III { handler: Option, ) -> FunctionRef { let id = message.id.clone(); - if self.inner.functions.lock_or_recover().contains_key(&id) { - panic!("function id '{}' already registered", id); + if id.trim().is_empty() { + panic!("id is required"); } let data = RemoteFunctionData { message: message.clone(), handler, }; - self.inner - .functions - .lock_or_recover() - .insert(id.clone(), data); + let mut funcs = self.inner.functions.lock_or_recover(); + match funcs.entry(id.clone()) { + std::collections::hash_map::Entry::Occupied(_) => { + panic!("function id '{}' already registered", id); + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(data); + } + } + drop(funcs); let _ = self.send_message(message.to_message()); let iii = self.clone(); @@ -417,10 +423,7 @@ impl III { }); }); - FunctionRef { - id, - unregister_fn, - } + FunctionRef { id, unregister_fn } } pub fn register_function( @@ -429,9 +432,6 @@ impl III { handler: H, ) -> FunctionRef { let id = id.into(); - if id.trim().is_empty() { - panic!("id is required"); - } let mut message = RegisterFunctionMessage { id: id.clone(), description: None, @@ -1063,7 +1063,12 @@ impl III { ) { tracing::debug!(function_id = %function_id, traceparent = ?traceparent, baggage = ?baggage, "Invoking function"); - let func_data = self.inner.functions.lock_or_recover().get(&function_id).cloned(); + let func_data = self + .inner + .functions + .lock_or_recover() + .get(&function_id) + .cloned(); let handler = func_data.as_ref().and_then(|d| d.handler.clone()); let Some(handler) = handler else { diff --git a/packages/rust/iii/src/lib.rs b/packages/rust/iii/src/lib.rs index a2ed812..6cc9feb 100644 --- a/packages/rust/iii/src/lib.rs +++ b/packages/rust/iii/src/lib.rs @@ -17,7 +17,7 @@ pub use channels::{ pub use context::{Context, get_context, with_context}; pub use error::IIIError; pub use iii::{ - FunctionInfo, FunctionRef, FunctionsAvailableGuard, IntoFunctionHandler, III, TriggerInfo, + FunctionInfo, FunctionRef, FunctionsAvailableGuard, III, IntoFunctionHandler, TriggerInfo, WorkerInfo, WorkerMetadata, }; pub use logger::Logger; diff --git a/packages/rust/iii/tests/http_external_functions.rs b/packages/rust/iii/tests/http_external_functions.rs index 984b157..8abe923 100644 --- a/packages/rust/iii/tests/http_external_functions.rs +++ b/packages/rust/iii/tests/http_external_functions.rs @@ -117,7 +117,12 @@ impl WebhookProbe { let response = b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 10\r\n\r\n{\"ok\":true}"; let _ = stream.write_all(response).await; - CapturedWebhook { method, url, headers, body } + CapturedWebhook { + method, + url, + headers, + body, + } } async fn wait_for_webhook(&self, timeout: Duration) -> Option { @@ -253,8 +258,14 @@ async fn delivers_events_with_custom_headers() { .expect("no webhook received"); assert_eq!(webhook.method, "POST"); - assert_eq!(webhook.headers.get("x-custom-header").map(String::as_str), Some("test-value")); - assert_eq!(webhook.headers.get("x-another").map(String::as_str), Some("123")); + assert_eq!( + webhook.headers.get("x-custom-header").map(String::as_str), + Some("test-value") + ); + assert_eq!( + webhook.headers.get("x-another").map(String::as_str), + Some("123") + ); drop(_trigger); http_fn.unregister(); @@ -369,7 +380,10 @@ async fn stops_delivering_events_after_unregister() { .wait_for_webhook(Duration::from_secs(7)) .await .expect("no webhook before unregister"); - assert_eq!(webhook_before.body.as_ref().unwrap()["phase"], "before-unregister"); + assert_eq!( + webhook_before.body.as_ref().unwrap()["phase"], + "before-unregister" + ); drop(trigger); http_fn.unregister(); @@ -383,7 +397,10 @@ async fn stops_delivering_events_after_unregister() { .wait_for_webhook(Duration::from_secs(2)) .await .is_some(); - assert!(!received_after, "should not receive webhook after unregister"); + assert!( + !received_after, + "should not receive webhook after unregister" + ); iii.shutdown_async().await; } From e63447cfec08389be6dd510f77a7528f2b9e345d Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Tue, 3 Mar 2026 06:34:30 -0300 Subject: [PATCH 3/5] refactor: streamline function registration logic in III class - Simplified the registration process for functions by consolidating the handling of HttpInvocationConfig and callable handlers. - Improved error handling for function registration, ensuring clearer distinction between handler types and reducing redundancy in code. - Enhanced the messaging system for function registration to maintain consistency across the SDK. --- packages/python/iii/src/iii/iii.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/python/iii/src/iii/iii.py b/packages/python/iii/src/iii/iii.py index 9a4ae82..a21672e 100644 --- a/packages/python/iii/src/iii/iii.py +++ b/packages/python/iii/src/iii/iii.py @@ -572,9 +572,11 @@ def register_function( if path in self._functions: raise ValueError(f"function id '{path}' already registered") - is_handler = isinstance(handler_or_invocation, HttpInvocationConfig) is False - - if is_handler: + if isinstance(handler_or_invocation, HttpInvocationConfig): + msg = RegisterFunctionMessage(id=path, invocation=handler_or_invocation, description=description, metadata=metadata) + self._send_if_connected(msg) + self._functions[path] = RemoteFunctionData(message=msg) + else: if not callable(handler_or_invocation): actual_type = type(handler_or_invocation).__name__ raise TypeError( @@ -590,9 +592,6 @@ async def wrapped(input_data: Any) -> Any: return await with_context(lambda _: handler(input_data), ctx) self._functions[path] = RemoteFunctionData(message=msg, handler=wrapped) - else: - config = handler_or_invocation - msg = RegisterFunctionMessage(id=path, invocation=config, description=description, metadata=metadata) self._send_if_connected(msg) self._functions[path] = RemoteFunctionData(message=msg) From f9eec540c9fc1723b8c72b11f6be10961b733195 Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Tue, 3 Mar 2026 06:46:39 -0300 Subject: [PATCH 4/5] style: improve code formatting in III class function registration - Enhanced readability by adjusting the formatting of the RegisterFunctionMessage instantiation in the III class. - Ensured consistent style across the codebase, aligning with recent refactoring efforts. --- packages/python/iii/src/iii/iii.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/python/iii/src/iii/iii.py b/packages/python/iii/src/iii/iii.py index a21672e..3ca8909 100644 --- a/packages/python/iii/src/iii/iii.py +++ b/packages/python/iii/src/iii/iii.py @@ -573,7 +573,9 @@ def register_function( raise ValueError(f"function id '{path}' already registered") if isinstance(handler_or_invocation, HttpInvocationConfig): - msg = RegisterFunctionMessage(id=path, invocation=handler_or_invocation, description=description, metadata=metadata) + msg = RegisterFunctionMessage( + id=path, invocation=handler_or_invocation, description=description, metadata=metadata + ) self._send_if_connected(msg) self._functions[path] = RemoteFunctionData(message=msg) else: From 714d9a596617e8516f0a3454fc3ae833cbfa0adc Mon Sep 17 00:00:00 2001 From: Ytallo Date: Tue, 3 Mar 2026 13:22:52 -0300 Subject: [PATCH 5/5] fix --- packages/python/iii/src/iii/iii.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/python/iii/src/iii/iii.py b/packages/python/iii/src/iii/iii.py index 3ca8909..260d755 100644 --- a/packages/python/iii/src/iii/iii.py +++ b/packages/python/iii/src/iii/iii.py @@ -594,8 +594,6 @@ async def wrapped(input_data: Any) -> Any: return await with_context(lambda _: handler(input_data), ctx) self._functions[path] = RemoteFunctionData(message=msg, handler=wrapped) - self._send_if_connected(msg) - self._functions[path] = RemoteFunctionData(message=msg) def unregister() -> None: self._functions.pop(path, None)