diff --git a/.env.example b/.env.example index b818a5b75..e0c4633ad 100644 --- a/.env.example +++ b/.env.example @@ -14,7 +14,7 @@ TAG=dev # OR # > tr -dc 'A-Za-z0-9+_/' +JWT_SECRET= AGENTS_API_KEY= GPU_MEMORY_UTILIZATION=0.80 MAX_FREE_SESSIONS=50 @@ -139,4 +139,4 @@ ENABLE_RESPONSES=false HASURA_URL=http://hasura:8080 HASURA_CLAIMS_MAP='{"x-hasura-allowed-roles":{"path":"$.hasura.all_roles","default":["user"]},"x-hasura-default-role":{"path":"$.hasura.all_roles[0]","default":"user"},"x-hasura-user-id":{"path":"$.sub"}}' -HASURA_ADMIN_SECRET= +HASURA_GRAPHQL_ADMIN_SECRET= diff --git a/README.md b/README.md index 7e0ee3185..e3b96e2de 100644 --- a/README.md +++ b/README.md @@ -598,10 +598,10 @@ Generate a JWT Token (Only for Multi-Tenant Mode) To generate a JWT token, `jwt-cli` is required. Kindly install the same before proceeding with the next steps. -Use the following command and replace `JWT_SHARED_KEY` with the corresponding key from your `.env` file to generate a JWT token: +Use the following command and replace `JWT_SECRET` with the corresponding key from your `.env` file to generate a JWT token: ```bash -jwt encode --secret JWT_SHARED_KEY --alg HS512 --exp=$(date -d '+10 days' +%s) --sub '00000000-0000-0000-0000-000000000000' '{}' +jwt encode --secret JWT_SECRET --alg HS512 --exp=$(date -d '+10 days' +%s) --sub '00000000-0000-0000-0000-000000000000' '{}' ``` This command generates a JWT token that will be valid for 10 days. @@ -619,6 +619,12 @@ client = Client(api_key="your_jwt_token") **Note:** SDK in Multi-Tenant mode, you need to generate a JWT token locally that acts as an API KEY to interact with the SDK. Furthermore, while initializing the client you will need to set the environment to `local_multi_tenant` and the api key to the JWT token you generated in the previous step. Whereas in Single-Tenant mode you can interact with the SDK directly without the need for the API KEY and set the environment to `local`. +### Real-time transitions via Hasura + +The `hasura` service in `docker-compose.yml` exposes streaming subscriptions on port `8086`. +Use the `TransitionsStream` subscription with the header `X-Hasura-Execution-Id` to +receive live updates for an execution without polling. + ### 7. Troubleshooting - Ensure that all required Docker images are available. diff --git a/agents-api/agents_api/routers/tasks/__init__.py b/agents-api/agents_api/routers/tasks/__init__.py index 481c55b13..952d7e89f 100644 --- a/agents-api/agents_api/routers/tasks/__init__.py +++ b/agents-api/agents_api/routers/tasks/__init__.py @@ -8,5 +8,4 @@ from .list_task_executions import list_task_executions from .list_tasks import list_tasks from .router import router -from .stream_transitions_events import stream_transitions_events from .update_execution import update_execution diff --git a/agents-api/agents_api/routers/tasks/stream_transitions_events.py b/agents-api/agents_api/routers/tasks/stream_transitions_events.py deleted file mode 100644 index 92633bf08..000000000 --- a/agents-api/agents_api/routers/tasks/stream_transitions_events.py +++ /dev/null @@ -1,113 +0,0 @@ -import logging -from base64 import b64decode, b64encode -from functools import partial -from typing import Annotated -from uuid import UUID - -import anyio -from anyio.streams.memory import MemoryObjectSendStream -from fastapi import Depends, Query -from sse_starlette.sse import EventSourceResponse -from starlette.requests import Request -from temporalio.api.enums.v1 import EventType -from temporalio.client import ( - WorkflowHistoryEventAsyncIterator, - WorkflowHistoryEventFilterType, -) - -from ...autogen.openapi_model import TransitionEvent -from ...clients.temporal import get_workflow_handle -from ...dependencies.developer_id import get_developer_id -from ...queries.executions.lookup_temporal_data import lookup_temporal_data -from ...worker.codec import from_payload_data -from .router import router - -STREAM_TIMEOUT = 10 * 60 # 10 minutes - - -# Create a function to publish events to the client -# TODO: Unnest and simplify this function -async def event_publisher( - inner_send_chan: MemoryObjectSendStream, - history_events: WorkflowHistoryEventAsyncIterator, -): - async with inner_send_chan: - try: - async for event in history_events: - # TODO: We should get the workflow-completed event as well and use that to close the stream - if event.event_type == EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED: - payloads = event.activity_task_completed_event_attributes.result.payloads - - for payload in payloads: - try: - data_item = from_payload_data(payload.data) - - except Exception as e: - logging.warning(f"Could not decode payload: {e}") - continue - - if not isinstance(data_item, TransitionEvent): - continue - - # FIXME: This does NOT return the last event (and maybe other events) - transition_event_dict = { - "type": data_item.type, - "output": data_item.output, - "created_at": data_item.created_at.isoformat(), - } - - next_page_token = ( - b64encode(history_events.next_page_token).decode("ascii") - if history_events.next_page_token - else None - ) - - await inner_send_chan.send({ - "data": { - "transition": transition_event_dict, - "next_page_token": next_page_token, - }, - }) - - except anyio.get_cancelled_exc_class() as e: - with anyio.move_on_after(STREAM_TIMEOUT, shield=True): - await inner_send_chan.send({"closing": True}) - raise e - - -@router.get("/executions/{execution_id}/transitions.stream", tags=["executions"]) -async def stream_transitions_events( - x_developer_id: Annotated[UUID, Depends(get_developer_id)], - execution_id: UUID, - req: Request, - next_page_token: Annotated[str | None, Query()] = None, -): - # Get temporal id - temporal_data = await lookup_temporal_data( - developer_id=x_developer_id, - execution_id=execution_id, - ) - - # TODO: Need to get all the events for child workflows too. Maybe try the `run_id` or something? - # SCRUM-11 - workflow_handle = await get_workflow_handle( - handle_id=temporal_data["id"], - ) - - next_page_token: bytes | None = b64decode(next_page_token) if next_page_token else None - - history_events = workflow_handle.fetch_history_events( - page_size=1, - next_page_token=next_page_token, - wait_new_event=True, - event_filter_type=WorkflowHistoryEventFilterType.ALL_EVENT, - skip_archival=True, - ) - - # Create a channel to send events to the client - send_chan, recv_chan = anyio.create_memory_object_stream(max_buffer_size=100) - - return EventSourceResponse( - recv_chan, - data_sender_callable=partial(event_publisher, send_chan, history_events), - ) diff --git a/documentation/advanced/localsetup.mdx b/documentation/advanced/localsetup.mdx index b7c99ec88..0a77e0d1d 100644 --- a/documentation/advanced/localsetup.mdx +++ b/documentation/advanced/localsetup.mdx @@ -71,10 +71,10 @@ docker compose --env-file .env --profile temporal-ui --profile multi-tenant --pr To generate a JWT token, `jwt-cli` is required. Kindly install the same before proceeding with the next steps. -Use the following command and replace `JWT_SHARED_KEY` with the corresponding key from your `.env` file to generate a JWT token: +Use the following command and replace `JWT_SECRET` with the corresponding key from your `.env` file to generate a JWT token: ```bash -jwt encode --secret JWT_SHARED_KEY --alg HS512 --exp=$(date -j -v +10d +%s) --sub '00000000-0000-0000-0000-000000000000' '{}' +jwt encode --secret JWT_SECRET --alg HS512 --exp=$(date -j -v +10d +%s) --sub '00000000-0000-0000-0000-000000000000' '{}' ``` This command generates a JWT token that will be valid for 10 days. diff --git a/gateway/docker-compose.yml b/gateway/docker-compose.yml index e9358a36f..c981f387e 100644 --- a/gateway/docker-compose.yml +++ b/gateway/docker-compose.yml @@ -5,7 +5,7 @@ services: image: julepai/gateway:${TAG:-dev} environment: - GATEWAY_PORT=80 - - JWT_SHARED_KEY=${JWT_SHARED_KEY} + - JWT_SECRET=${JWT_SECRET} - AGENTS_API_URL=${AGENTS_API_URL:-http://agents-api-multi-tenant:8080} - TEMPORAL_UI_PUBLIC_URL=${TEMPORAL_UI_PUBLIC_URL:-http://temporal-ui-public:8080} - HASURA_URL=${HASURA_URL:-http://hasura:8080} diff --git a/gateway/entrypoint.sh b/gateway/entrypoint.sh index 1370b03a7..12bdeed08 100755 --- a/gateway/entrypoint.sh +++ b/gateway/entrypoint.sh @@ -7,7 +7,7 @@ GATEWAY_PORT=${GATEWAY_PORT:-80} TEMPORAL_UI_PUBLIC_URL=${TEMPORAL_UI_PUBLIC_URL:-http://temporal-ui-public:8080} HASURA_URL=${HASURA_URL:-http://hasura:8080} -for var_name in JWT_SHARED_KEY AGENTS_API_KEY +for var_name in JWT_SECRET AGENTS_API_KEY do if [ -z "`eval echo \\\$$var_name`" ]; then echo "Error: Environment variable '$var_name' is not set." diff --git a/gateway/traefik.yml.template b/gateway/traefik.yml.template index dbffb7745..ffabfa838 100644 --- a/gateway/traefik.yml.template +++ b/gateway/traefik.yml.template @@ -135,7 +135,7 @@ http: - iat Required: true Keys: - - $JWT_SHARED_KEY + - $JWT_SECRET JwtHeaders: X-Developer-Id: sub OpaHttpStatusField: allow_status_code diff --git a/hasura/docker-compose.yml b/hasura/docker-compose.yml index 5d5b1a801..fccec099e 100644 --- a/hasura/docker-compose.yml +++ b/hasura/docker-compose.yml @@ -14,12 +14,12 @@ services: HASURA_GRAPHQL_DEV_MODE: "true" HASURA_GRAPHQL_EXPERIMENTAL_FEATURES: naming_convention,streaming_subscriptions HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log - HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_ADMIN_SECRET:?HASURA_ADMIN_SECRET is required} + HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_GRAPHQL_ADMIN_SECRET:?required} HASURA_GRAPHQL_UNAUTHORIZED_ROLE: "anonymous" HASURA_GRAPHQL_JWT_SECRET: >- { "type":"HS512", - "key":"${JWT_SHARED_KEY:?required}", + "key":"${JWT_SECRET:?required}", "claims_map":{ "x-hasura-allowed-roles":{"path":"$.hasura.all_roles","default":["user"]}, "x-hasura-default-role":{"path":"$.hasura.all_roles[0]","default":"user"}, diff --git a/hasura/metadata/databases/default/tables/public_transitions.yaml b/hasura/metadata/databases/default/tables/public_transitions.yaml index 7bb87cc5d..08da42109 100644 --- a/hasura/metadata/databases/default/tables/public_transitions.yaml +++ b/hasura/metadata/databases/default/tables/public_transitions.yaml @@ -5,3 +5,19 @@ object_relationships: - name: execution using: foreign_key_constraint_on: execution_id +select_permissions: + - role: api_user + permission: + columns: + - id + - execution_id + - type + - step_label + - output + - created_at + filter: + execution_id: + _eq: X-Hasura-Execution-Id + subscription_root_fields: + - select_stream + - select diff --git a/hasura/migrations/0001_init.sql b/hasura/migrations/0001_init.sql new file mode 100644 index 000000000..7cd4a766d --- /dev/null +++ b/hasura/migrations/0001_init.sql @@ -0,0 +1,23 @@ +-- Hasura init migration for executions and transitions tables +-- This mirrors the schema used in memory-store + +CREATE TABLE IF NOT EXISTS public.executions ( + id uuid PRIMARY KEY, + developer_id uuid NOT NULL, + task_id uuid NOT NULL, + task_version integer NOT NULL, + input jsonb, + metadata jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS public.transitions ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + execution_id uuid NOT NULL REFERENCES executions(id) ON DELETE CASCADE, + type text NOT NULL, + step_label text, + output jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS transitions_execution_created_idx ON public.transitions(execution_id, created_at); diff --git a/packages/ui/graphql/Transitions.graphql b/packages/ui/graphql/Transitions.graphql new file mode 100644 index 000000000..2d4a0b005 --- /dev/null +++ b/packages/ui/graphql/Transitions.graphql @@ -0,0 +1,14 @@ +query Transitions($executionId: uuid!, $limit: Int = 100, $offset: Int = 0) { + transitions( + where: {execution_id: {_eq: $executionId}} + order_by: {created_at: asc} + limit: $limit + offset: $offset + ) { + id + type + step_label + output + created_at + } +} diff --git a/packages/ui/graphql/TransitionsStream.graphql b/packages/ui/graphql/TransitionsStream.graphql new file mode 100644 index 000000000..fa01249e3 --- /dev/null +++ b/packages/ui/graphql/TransitionsStream.graphql @@ -0,0 +1,13 @@ +subscription TransitionsStream($executionId: uuid!, $cursor: timestamptz!) { + transitions_stream( + batch_size: 100 + cursor: {initial_value: {created_at: $cursor}, ordering: ASC} + where: {execution_id: {_eq: $executionId}} + ) { + id + type + step_label + output + created_at + } +} diff --git a/packages/ui/graphql/types.ts b/packages/ui/graphql/types.ts new file mode 100644 index 000000000..ca7c094fc --- /dev/null +++ b/packages/ui/graphql/types.ts @@ -0,0 +1,16 @@ +export interface Transition { + id: string; + execution_id: string; + type: string; + step_label?: string | null; + output: any; + created_at: string; +} + +export interface TransitionsQuery { + transitions: Transition[]; +} + +export interface TransitionsStreamSubscription { + transitions_stream: Transition[]; +} diff --git a/typespec/executions/endpoints.tsp b/typespec/executions/endpoints.tsp index f0c414010..c96c77757 100644 --- a/typespec/executions/endpoints.tsp +++ b/typespec/executions/endpoints.tsp @@ -62,8 +62,3 @@ interface TransitionEndpoints "List the Transitions of an Execution by id" > {} -interface TransitionStreamEndpoints - extends ChildStreamEndpoint< - TransitionEvent, - "Stream events emitted by the given execution" - > {} diff --git a/typespec/main.tsp b/typespec/main.tsp index 72b063c02..eff6c3c64 100644 --- a/typespec/main.tsp +++ b/typespec/main.tsp @@ -122,8 +122,6 @@ namespace Api { @route("/executions/{id}/transitions") interface ExecutionTransitionsRoute extends Executions.TransitionEndpoints {} - @route("/executions/{id}/transitions.stream") - interface ExecutionTransitionsStreamRoute extends Executions.TransitionStreamEndpoints {} @route("/jobs") interface JobRoute extends Jobs.Endpoints {} diff --git a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml index 2469095d2..987bd82eb 100644 --- a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml +++ b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml @@ -876,33 +876,6 @@ paths: - transitions required: - items - /executions/{id}/transitions.stream: - get: - operationId: ExecutionTransitionsStreamRoute_stream - description: Stream events emitted by the given execution - parameters: - - name: id - in: path - required: true - description: ID of parent - schema: - $ref: '#/components/schemas/Common.uuid' - - name: next_token - in: query - required: true - description: Next page token - schema: - type: string - nullable: true - default: null - explode: false - responses: - '200': - description: The request has succeeded. - content: - text/event-stream: - schema: - type: string /files: post: operationId: FilesRoute_create