diff --git a/apisix/plugins/sse.lua b/apisix/plugins/sse.lua new file mode 100644 index 000000000000..635995a1e19f --- /dev/null +++ b/apisix/plugins/sse.lua @@ -0,0 +1,103 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") + +local plugin_name = "sse" + +local schema = { + type = "object", + properties = { + proxy_read_timeout = { + type = "integer", + description = "Sets the timeout for reading a response from the proxied server, " .. + "in seconds. A value of 0 turns off this timeout.", + default = 3600, -- 1 hour + minimum = 0, + }, + override_content_type = { + type = "boolean", + description = "Whether to force the Content-Type header to 'text/event-stream'.", + default = true, + }, + connection_header = { + type = "string", + description = "Value for the 'Connection' response header.", + default = "keep-alive", + }, + cache_control = { + type = "string", + description = "Value for the 'Cache-Control' response header.", + default = "no-cache", + } + }, +} + +local _M = { + version = 0.1, + priority = 1004, -- Runs after authentication but before most other plugins. + name = plugin_name, + schema = schema, + stream_only = false, +} + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + +-- The rewrite phase is executed before the request is forwarded to the upstream. +-- This is the correct place to set Nginx variables that control proxy behavior. +function _M.rewrite(conf, ctx) + core.log.debug("sse plugin rewrite phase") + + -- Disable response buffering from the proxied server. + -- This is the key to making SSE work, as it allows data to be sent + -- to the client as soon as it's received from the upstream. + core.ctx.set_var(ctx, "proxy_buffering", "off") + core.log.debug("sse plugin set proxy_buffering to off") + + -- Also disable request buffering. While not strictly required for SSE + -- (which is server-to-client), it's good practice for streaming APIs. + core.ctx.set_var(ctx, "proxy_request_buffering", "off") + core.log.debug("sse plugin set proxy_request_buffering to off") + + -- Set a long read timeout, as SSE connections are long-lived. + -- The default is 60s, which would prematurely close the connection. + local timeout_str = conf.proxy_read_timeout .. "s" + core.ctx.set_var(ctx, "proxy_read_timeout", timeout_str) + core.log.debug("sse plugin set proxy_read_timeout to ", timeout_str) +end + +-- The header_filter phase is executed after the response headers are received +-- from the upstream and before they are sent to the client. +function _M.header_filter(conf, ctx) + core.log.debug("sse plugin header_filter phase") + + core.response.set_header("X-Accel-Buffering", "no") + core.log.debug("sse plugin set X-Accel-Buffering to no") + core.response.set_header("Cache-Control", conf.cache_control) + core.log.debug("sse plugin set Cache-Control to ", conf.cache_control) + core.response.set_header("Connection", conf.connection_header) + core.log.debug("sse plugin set Connection to ", conf.connection_header) + + if conf.override_content_type then + core.response.set_header("Content-Type", "text/event-stream; charset=utf-8") + core.log.debug("sse plugin set Content-Type to text/event-stream; charset=utf-8") + end +end + +return _M diff --git a/docs/en/latest/sse.md b/docs/en/latest/sse.md new file mode 100644 index 000000000000..b45cdbc57453 --- /dev/null +++ b/docs/en/latest/sse.md @@ -0,0 +1,103 @@ +--- +title: sse +--- + +# Summary + +The `sse` plugin enables support for **Server-Sent Events (SSE)** by configuring APISIX to correctly proxy long-lived HTTP connections used in streaming scenarios. + +SSE allows servers to push updates to clients over a single HTTP connection using the `text/event-stream` content type. This plugin ensures buffering is disabled, proper timeouts are set, and necessary response headers are applied. + +# Attributes + +| Name | Type | Default | Required | Description | +| ----------------------- | ------- | ------------ | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------- | +| `proxy_read_timeout` | Integer | `3600` | False | Timeout in seconds for reading a response from the upstream server. A value of `0` disables the timeout. This should be long for SSE connections. | +| `override_content_type` | Boolean | `true` | False | Whether to force the `Content-Type` header to `text/event-stream; charset=utf-8`. | +| `connection_header` | String | `keep-alive` | False | Sets the `Connection` response header. | +| `cache_control` | String | `no-cache` | False | Sets the `Cache-Control` response header. | + +# How It Works + +When enabled, the plugin makes the following adjustments: + +- Disables response and request buffering using NGINX variables. +- Sets a long read timeout (`proxy_read_timeout`) to support streaming. +- Optionally overrides the `Content-Type` to `text/event-stream; charset=utf-8`. +- Sets headers necessary for SSE: + - `X-Accel-Buffering: no` + - `Connection` + - `Cache-Control` + +These settings are required to ensure that the SSE connection remains open and data can be streamed to the client in real time. + +# Example Configuration + +```json +{ + "name": "sse", + "priority": 1005, + "config": { + "proxy_read_timeout": 7200, + "override_content_type": true, + "connection_header": "keep-alive", + "cache_control": "no-cache" + } +} +``` + + +# Enabling the Plugin on a Route + +``` +curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PUT -d ' +{ + "uri": "/sse", + "plugins": { + "sse": { + "proxy_read_timeout": 7200, + "override_content_type": true, + "connection_header": "keep-alive", + "cache_control": "no-cache" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "httpbin.org:80": 1 + } + } +}' +``` + +This example enables the plugin on the /sse route and sets a 2-hour timeout for SSE connections, ensuring the correct headers and proxy behavior are applied. + + +## Notes + +This plugin is only relevant for routes that serve SSE (e.g., real-time feeds, logs, event notifications). + +SSE is a one-way communication protocol (server → client). This plugin does not support bidirectional protocols like WebSocket. + +If your upstream already sets the correct Content-Type, you can disable the override using "override_content_type": false. + +Ensure your upstream service flushes events frequently to keep the SSE connection alive. + +## Disabling the Plugin + +To disable the sse plugin on a route: + +``` +curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PATCH -d ' +{ + "plugins": { + "sse": null + } +}' +``` + +# Changelog + +| Version | Description | +| ------- | ----------------------------------------------- | +| 0.1 | Initial version of the plugin with SSE support. | diff --git a/t/plugin/sse.t b/t/plugin/sse.t new file mode 100644 index 000000000000..7c0170009014 --- /dev/null +++ b/t/plugin/sse.t @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +no_shuffle(); + +run_tests(); + +__DATA__ + +=== TEST 1: sse plugin with default configuration +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/sse", + plugins = { + sse = {} + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/1", route_conf) + core.response.exit(200) + } +} +--- request +GET /sse +--- response_headers +Content-Type: text/event-stream; charset=utf-8 +X-Accel-Buffering: no +Cache-Control: no-cache +Connection: keep-alive +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] + + + +=== TEST 2: sse plugin with override_content_type = false +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns JSON + t.set_upstream(function() + ngx.header["Content-Type"] = "application/json" + ngx.say("{\"message\": \"hello\"}") + end) + + local route_conf = { + uri = "/sse_no_override", + plugins = { + sse = { + override_content_type = false + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/2", route_conf) + core.response.exit(200) + } +} +--- request +GET /sse_no_override +--- response_headers +Content-Type: application/json +X-Accel-Buffering: no +Cache-Control: no-cache +Connection: keep-alive +--- response_body +{"message": "hello"} +--- error_code: 200 +--- no_error_log +[error] + + + +=== TEST 3: sse plugin with custom connection and cache-control headers +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/sse_custom", + plugins = { + sse = { + connection_header = "Upgrade", + cache_control = "public, max-age=86400" + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/3", route_conf) + core.response.exit(200) + } +} +--- request +GET /sse_custom +--- response_headers +Content-Type: text/event-stream; charset=utf-8 +X-Accel-Buffering: no +Cache-Control: public, max-age=86400 +Connection: Upgrade +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] + + + +=== TEST 4: route without SSE plugin (negative case) +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns plain text + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/no_sse", + plugins = { + -- No SSE plugin here + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/4", route_conf) + core.response.exit(200) + } +} +--- request +GET /no_sse +--- response_headers_unlike +Content-Type: text/event-stream +X-Accel-Buffering: .* +Cache-Control: .* +Connection: .* +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error]