|
| 1 | +import json |
| 2 | +import logging |
| 3 | +from typing import Mapping, Dict, Any, Optional |
| 4 | +from werkzeug import Request, Response |
| 5 | +from dify_plugin import Endpoint |
| 6 | +from endpoints.helpers import apply_middleware, validate_api_key, determine_route |
| 7 | + |
| 8 | +logger = logging.getLogger(__name__) |
| 9 | + |
| 10 | +class WebhookEndpoint(Endpoint): |
| 11 | + """ |
| 12 | + The UnifiedEndpoint handles both workflow and chatflow requests through a single interface. |
| 13 | +
|
| 14 | + This endpoint routes requests to the appropriate Dify API based on the path: |
| 15 | + - Paths starting with /workflow/ will invoke Dify workflows |
| 16 | + - Paths starting with /chatflow/ will invoke Dify chatflows |
| 17 | +
|
| 18 | + For chatflow requests, the following parameters are required: |
| 19 | + - `app_id` (required): The ID of the chatflow to trigger |
| 20 | + - `query` (required): A string representing the query to be processed |
| 21 | + - `inputs` (optional): An object containing inputs needed for the chatflow |
| 22 | + - `conversation_id` (optional): A string representing the conversation ID |
| 23 | +
|
| 24 | + For workflow requests, the following parameters are required: |
| 25 | + - `app_id` (required): The ID of the workflow to trigger |
| 26 | + - `inputs` (optional): An object containing inputs needed for the workflow |
| 27 | +
|
| 28 | + The endpoint behavior can be configured with: |
| 29 | + - `explicit_inputs`: When true, inputs should be in req.body.inputs. When false, req.body is used. |
| 30 | + - `raw_data_output`: When true, workflow responses will only return the data.outputs |
| 31 | + """ |
| 32 | + |
| 33 | + def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response: |
| 34 | + """ |
| 35 | + Invokes the endpoint with the given request for either chatflow or workflow. |
| 36 | + """ |
| 37 | + logger.info("Received request to unified endpoint") |
| 38 | + |
| 39 | + # Determine the endpoint mode |
| 40 | + route = determine_route(r.path) |
| 41 | + if not route: |
| 42 | + logger.error("Invalid path: %s", r.path) |
| 43 | + return Response(json.dumps({"error": "Invalid path. Use /workflow/ or /chatflow/"}), |
| 44 | + status=404, content_type="application/json") |
| 45 | + |
| 46 | + logger.info("Request mode: %s", route) |
| 47 | + |
| 48 | + # Apply middleware |
| 49 | + middleware_response = apply_middleware(r, settings) |
| 50 | + if middleware_response: |
| 51 | + logger.debug("Middleware response: %s", middleware_response) |
| 52 | + return middleware_response |
| 53 | + |
| 54 | + # Validate API key |
| 55 | + validation_response = validate_api_key(r, settings) |
| 56 | + if validation_response: |
| 57 | + logger.debug("API key validation failed: %s", validation_response) |
| 58 | + return validation_response |
| 59 | + |
| 60 | + try: |
| 61 | + request_body = getattr( |
| 62 | + r, 'default_middleware_json', {}) or r.get_json() |
| 63 | + |
| 64 | + dynamic_app_id = values.get("app_id") |
| 65 | + static_app_id = settings.get("static_app_id") |
| 66 | + if isinstance(static_app_id, dict): |
| 67 | + static_app_id = static_app_id.get('app_id') |
| 68 | + |
| 69 | + logger.debug("Parsed request body: %s", request_body) |
| 70 | + logger.debug("Extracted dynamic_app_id: %s", dynamic_app_id) |
| 71 | + logger.debug("Extracted static_app_id: %s", static_app_id) |
| 72 | + |
| 73 | + if not dynamic_app_id and not static_app_id: |
| 74 | + logger.error("app_id is required but not provided.") |
| 75 | + return Response(status=404, content_type="application/json") |
| 76 | + |
| 77 | + # Handle inputs based on explicit_inputs setting |
| 78 | + explicit_inputs = settings.get('explicit_inputs', True) |
| 79 | + |
| 80 | + if explicit_inputs: |
| 81 | + inputs = request_body.get("inputs", {}) |
| 82 | + else: |
| 83 | + inputs = request_body.copy() |
| 84 | + |
| 85 | + if not isinstance(inputs, dict): |
| 86 | + logger.error( |
| 87 | + "Invalid inputs type: expected object, got %s", type(inputs).__name__) |
| 88 | + return Response(json.dumps({"error": "inputs must be an object"}), |
| 89 | + status=400, content_type="application/json") |
| 90 | + |
| 91 | + # initialize empty response |
| 92 | + response = None |
| 93 | + |
| 94 | + if route == "/chatflow/<app_id>": |
| 95 | + if static_app_id: |
| 96 | + # Do not handle requests to /chatflow/<app_id> when a static app_id is defined |
| 97 | + # Static app_id is explicitly used to only expose one single app |
| 98 | + return Response(status=404, content_type="application/json") |
| 99 | + |
| 100 | + query = request_body.get( |
| 101 | + "query", None) if explicit_inputs else inputs.pop("query", None) |
| 102 | + if not query or not isinstance(query, str): |
| 103 | + logger.error("query is required and must be a string") |
| 104 | + return Response(json.dumps({"error": "query must be a string"}), |
| 105 | + status=400, content_type="application/json") |
| 106 | + |
| 107 | + conversation_id = request_body.get( |
| 108 | + "conversation_id") if explicit_inputs else inputs.pop("conversation_id", None) |
| 109 | + if conversation_id is not None and not isinstance(conversation_id, str): |
| 110 | + logger.error( |
| 111 | + "conversation_id must be a string if provided") |
| 112 | + return Response(json.dumps({"error": "conversation_id must be a string"}), |
| 113 | + status=400, content_type="application/json") |
| 114 | + |
| 115 | + # Invoke chatflow |
| 116 | + response = self._invoke_chatflow( |
| 117 | + dynamic_app_id, query, conversation_id, inputs) |
| 118 | + elif route == "/single-chatflow": |
| 119 | + query = request_body.get( |
| 120 | + "query") if explicit_inputs else inputs.pop("query", None) |
| 121 | + if not query or not isinstance(query, str): |
| 122 | + logger.error("query is required and must be a string") |
| 123 | + return Response(json.dumps({"error": "query must be a string"}), |
| 124 | + status=400, content_type="application/json") |
| 125 | + |
| 126 | + conversation_id = request_body.get( |
| 127 | + "conversation_id") if explicit_inputs else inputs.pop("conversation_id", None) |
| 128 | + if conversation_id is not None and not isinstance(conversation_id, str): |
| 129 | + logger.error( |
| 130 | + "conversation_id must be a string if provided") |
| 131 | + return Response(json.dumps({"error": "conversation_id must be a string"}), |
| 132 | + status=400, content_type="application/json") |
| 133 | + |
| 134 | + # Invoke chatflow |
| 135 | + response = self._invoke_chatflow( |
| 136 | + static_app_id, query, conversation_id, inputs) |
| 137 | + |
| 138 | + elif route == "/workflow/<app_id>": |
| 139 | + if static_app_id: |
| 140 | + # Do not handle requests to /chatflow/<app_id> when a static app_id is defined |
| 141 | + # Static app_id is explicitly used to only expose one single app |
| 142 | + return Response(status=404, content_type="application/json") |
| 143 | + # Invoking workflow |
| 144 | + response = self._invoke_workflow( |
| 145 | + dynamic_app_id, inputs, settings.get('raw_data_output', False)) |
| 146 | + |
| 147 | + elif route == "/single-workflow": |
| 148 | + # Invoking workflow |
| 149 | + response = self._invoke_workflow( |
| 150 | + static_app_id, inputs, settings.get('raw_data_output', False)) |
| 151 | + |
| 152 | + if not response: |
| 153 | + return Response(json.dumps({"error": "Failed to get response"}), status=500, content_type="application/json") |
| 154 | + else: |
| 155 | + # Return response |
| 156 | + logger.debug("%s response: %s", route, response) |
| 157 | + return Response(json.dumps(response), status=200, content_type="application/json") |
| 158 | + |
| 159 | + except (json.JSONDecodeError, KeyError, TypeError) as e: |
| 160 | + logger.error("Error during request processing: %s", str(e)) |
| 161 | + return Response(json.dumps({"error": str(e)}), status=500, content_type="application/json") |
| 162 | + |
| 163 | + def _invoke_chatflow(self, app_id: str, query: str, conversation_id: Optional[str], inputs: Dict[str, Any]) -> Dict[str, Any]: |
| 164 | + """ |
| 165 | + Invokes a Dify chatflow with the given parameters. |
| 166 | +
|
| 167 | + Args: |
| 168 | + app_id: The ID of the chatflow to invoke |
| 169 | + query: The user query to process |
| 170 | + conversation_id: Optional conversation ID for continuing a conversation |
| 171 | + inputs: Additional inputs for the chatflow |
| 172 | +
|
| 173 | + Returns: |
| 174 | + The chatflow response |
| 175 | + """ |
| 176 | + logger.info("Invoking chatflow with app_id: %s", app_id) |
| 177 | + dify_response = self.session.app.chat.invoke( |
| 178 | + app_id=app_id, |
| 179 | + query=query, |
| 180 | + conversation_id=conversation_id, |
| 181 | + inputs=inputs, |
| 182 | + response_mode="blocking" |
| 183 | + ) |
| 184 | + return dify_response |
| 185 | + |
| 186 | + def _invoke_workflow(self, app_id: str, inputs: Dict[str, Any], raw_data_output: bool) -> Dict[str, Any]: |
| 187 | + """ |
| 188 | + Invokes a Dify workflow with the given parameters. |
| 189 | +
|
| 190 | + Args: |
| 191 | + app_id: The ID of the workflow to invoke |
| 192 | + inputs: Inputs for the workflow |
| 193 | + raw_data_output: If True, returns only the outputs field of the response |
| 194 | +
|
| 195 | + Returns: |
| 196 | + The workflow response, either full or just the outputs depending on raw_data_output |
| 197 | + """ |
| 198 | + logger.info( |
| 199 | + "Invoking workflow with app_id: %s and inputs: %s", app_id, inputs) |
| 200 | + dify_response = self.session.app.workflow.invoke( |
| 201 | + app_id=app_id, |
| 202 | + inputs=inputs, |
| 203 | + response_mode="blocking" |
| 204 | + ) |
| 205 | + |
| 206 | + # Process workflow response if raw_data_output is enabled |
| 207 | + return dify_response["data"]["outputs"] if raw_data_output else dify_response |
0 commit comments