Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 81c8e39

Browse files
authored
service: http: Add support for immediate response
Fixes: #657
1 parent cf385fb commit 81c8e39

File tree

4 files changed

+113
-10
lines changed

4 files changed

+113
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6464
core plugins
6565
- HTTP service got a `-redirect` flag which allows for URL redirection via a
6666
HTTP 307 response
67+
- Support for immediate response in HTTP service
6768
- Daal4py example usage.
6869
### Changed
6970
- Renamed `-seed` to `-inputs` in `dataflow create` command

service/http/dffml_service_http/routes.py

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import json
3+
import asyncio
34
import secrets
45
import inspect
56
import pathlib
@@ -10,10 +11,11 @@
1011
from functools import partial
1112
from dataclasses import dataclass
1213
from contextlib import AsyncExitStack
13-
from typing import List, Union, AsyncIterator, Type, NamedTuple, Dict
14+
from typing import List, Union, AsyncIterator, Type, NamedTuple, Dict, Any
1415

15-
from aiohttp import web
1616
import aiohttp_cors
17+
from aiohttp import web
18+
1719

1820
from dffml import Sources, MemorySource
1921
from dffml.record import Record
@@ -176,7 +178,21 @@ class HTTPChannelConfig(NamedTuple):
176178
- text:OUTPUT_KEYS
177179
- json
178180
- output of dataflow (Dict) is passes as json
179-
181+
immediate_response: Dict[str,Any]
182+
If provided with a reponse, server responds immediatly with
183+
it, whilst scheduling to run the dataflow.
184+
Expected keys:
185+
- status: HTTP status code for the response
186+
- content_type: MIME type.If not given, determined
187+
from the presence of body/text/json
188+
- body/text/json: One of this according to content_type
189+
- headers
190+
eg:
191+
{
192+
"status": 200,
193+
"content_type": "application/json",
194+
"data": {"text": "ok"},
195+
}
180196
"""
181197

182198
path: str
@@ -185,6 +201,7 @@ class HTTPChannelConfig(NamedTuple):
185201
asynchronous: bool = False
186202
input_mode: str = "default"
187203
forward_headers: str = None
204+
immediate_response: Dict[str, Any] = None
188205

189206
@classmethod
190207
def _fromdict(cls, **kwargs):
@@ -199,7 +216,7 @@ class Routes(BaseMultiCommContext):
199216
async def get_registered_handler(self, request):
200217
return self.app["multicomm_routes"].get(request.path, None)
201218

202-
async def multicomm_dataflow(self, config, request):
219+
async def _multicomm_dataflow(self, config, request):
203220
# Seed the network with inputs given by caller
204221
# TODO(p0,security) allowlist of valid definitions to seed (set
205222
# Input.origin to something other than seed)
@@ -222,6 +239,7 @@ async def multicomm_dataflow(self, config, request):
222239
},
223240
status=HTTPStatus.NOT_FOUND,
224241
)
242+
225243
inputs.append(
226244
MemoryInputSet(
227245
MemoryInputSetConfig(
@@ -251,17 +269,19 @@ async def multicomm_dataflow(self, config, request):
251269
)
252270
)
253271
elif ":" in config.input_mode:
254-
preprocess_mode, input_def = config.input_mode.split(":")
272+
preprocess_mode, *input_def = config.input_mode.split(":")
273+
input_def = ":".join(input_def)
255274
if input_def not in config.dataflow.definitions:
256275
return web.json_response(
257276
{
258277
"error": f"Missing definition for {input_def} in dataflow"
259278
},
260279
status=HTTPStatus.NOT_FOUND,
261280
)
281+
262282
if preprocess_mode == "json":
263283
value = await request.json()
264-
elif preprocess_mode == "str":
284+
elif preprocess_mode == "text":
265285
value = await request.text()
266286
elif preprocess_mode == "bytes":
267287
value = await request.read()
@@ -270,10 +290,11 @@ async def multicomm_dataflow(self, config, request):
270290
else:
271291
return web.json_response(
272292
{
273-
"error": f"preprocess tag must be one of {IO_MODES}, got {preprocess_mode}"
293+
"error": f"preprocess tag must be one of {self.IO_MODES}, got {preprocess_mode}"
274294
},
275295
status=HTTPStatus.NOT_FOUND,
276296
)
297+
277298
inputs.append(
278299
MemoryInputSet(
279300
MemoryInputSetConfig(
@@ -301,6 +322,7 @@ async def multicomm_dataflow(self, config, request):
301322
)
302323
)
303324
)
325+
304326
else:
305327
raise NotImplementedError(
306328
"Input modes other than default,preprocess:definition_name not yet implemented"
@@ -314,6 +336,7 @@ async def multicomm_dataflow(self, config, request):
314336
results = {
315337
str(ctx): result async for ctx, result in octx.run(*inputs)
316338
}
339+
317340
if config.output_mode == "json":
318341
return web.json_response(results)
319342

@@ -342,6 +365,38 @@ async def multicomm_dataflow(self, config, request):
342365
status=HTTPStatus.NOT_FOUND,
343366
)
344367

368+
async def multicomm_dataflow(self, config, request):
369+
if config.immediate_response:
370+
asyncio.create_task(self._multicomm_dataflow(config, request))
371+
ir = config.immediate_response
372+
content_type = None
373+
if "content_type" in ir:
374+
content_type = ir["content_type"]
375+
else:
376+
if "data" in ir:
377+
content_type = "application/json"
378+
elif "text" in ir:
379+
content_type = "text/plain"
380+
elif "body" in ir:
381+
content_type = "application/octet-stream"
382+
383+
if content_type == "application/json":
384+
return web.json_response(
385+
data={} if not "data" in ir else ir["data"],
386+
status=200 if not "status" in ir else ir["status"],
387+
headers=None if not "headers" in ir else ir["headers"],
388+
)
389+
else:
390+
return web.Response(
391+
body=None if not "body" in ir else ir["body"],
392+
text=None if not "text" in ir else ir["text"],
393+
status=200 if not "status" in ir else ir["status"],
394+
headers=None if not "headers" in ir else ir["headers"],
395+
content_type=content_type,
396+
)
397+
else:
398+
return await self._multicomm_dataflow(config, request)
399+
345400
async def multicomm_dataflow_asynchronous(self, config, request):
346401
# TODO allow list of valid definitions to seed
347402
raise NotImplementedError(

service/http/docs/dataflow.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,22 @@ HttpChannelConfig
7272

7373
- output of dataflow (Dict) is passes as json
7474

75+
- ``immediate_response: Dict[str,Any]``
76+
77+
- If provided with a reponse, server responds immediatly with
78+
it, whilst scheduling to run the dataflow.
79+
Expected keys:
80+
- status: HTTP status code for the response
81+
- content_type: MIME type, if not given, determined
82+
from the presence of body/text/json
83+
- body/text/json: One of this according to content_type
84+
- headers
85+
eg:
86+
87+
.. code-block:: python
88+
89+
{
90+
"status": 200,
91+
"content_type": "application/json",
92+
"data": {"text": "ok"},
93+
}

service/http/tests/test_routes.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import os
22
import io
3+
import aiohttp
4+
import asyncio
35
import pathlib
46
import tempfile
57
from unittest.mock import patch
68
from contextlib import asynccontextmanager, ExitStack, AsyncExitStack
79
from typing import AsyncIterator, Dict
810

9-
import aiohttp
10-
11+
from dffml import DataFlow
1112
from dffml.base import config
1213
from dffml.record import Record
13-
from dffml.df.base import BaseConfig
14+
from dffml.df.base import BaseConfig, op
1415
from dffml.operation.output import GetSingle
1516
from dffml.util.entrypoint import EntrypointNotFound
1617
from dffml.model.model import ModelContext, Model
@@ -28,6 +29,7 @@
2829
SOURCE_NOT_LOADED,
2930
MODEL_NOT_LOADED,
3031
MODEL_NO_SOURCES,
32+
HTTPChannelConfig,
3133
)
3234
from dffml_service_http.util.testing import (
3335
ServerRunner,
@@ -315,6 +317,32 @@ async def test_post(self):
315317
{"Feedface": {"response": message}}, await response.json()
316318
)
317319

320+
async def test_immediate_response(self):
321+
url: str = "/some/url"
322+
event = asyncio.Event()
323+
324+
@op()
325+
async def my_event_setter(trigger: int) -> None:
326+
event.set()
327+
328+
# Register the data flow
329+
await self.cli.register(
330+
HTTPChannelConfig(
331+
path=url,
332+
dataflow=DataFlow.auto(my_event_setter),
333+
input_mode=f"text:{my_event_setter.op.inputs['trigger'].name}",
334+
output_mode="json",
335+
immediate_response={
336+
"status": 200,
337+
"content_type": "application/json",
338+
"data": {"immediate": "response"},
339+
},
340+
)
341+
)
342+
async with self.post(url, json="trigger") as response:
343+
self.assertEqual(await response.json(), {"immediate": "response"})
344+
await asyncio.wait_for(event.wait(), timeout=2)
345+
318346

319347
class TestRoutesSource(TestRoutesRunning, AsyncTestCase):
320348
async def setUp(self):

0 commit comments

Comments
 (0)