Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# camb_tts_python

Camb.ai TTS extension for TEN Framework using the MARS-8 text-to-speech API.

## Features

- MARS-8 model family (mars-8, mars-8-flash, mars-8-instruct)
- 140+ languages supported
- Voice cloning capabilities
- Real-time HTTP streaming
- High-quality 24kHz audio output

## API

Refer to `api` definition in [manifest.json](manifest.json) and default values in [property.json](property.json).

### Configuration Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| api_key | string | (required) | Camb.ai API key |
| voice_id | int32 | 2681 | Voice ID (default: Attic voice) |
| language | string | "en-us" | Language code (BCP-47 format) |
| speech_model | string | "mars-8-flash" | Model selection |
| speed | float64 | 1.0 | Speech speed multiplier |
| format | string | "pcm_s16le" | Output format |
| endpoint | string | (optional) | API endpoint override |

### Available Models

- `mars-8` - Default balanced model
- `mars-8-flash` - Faster inference (recommended)
- `mars-8-instruct` - Supports user instructions

## Development

### Setup

1. Get your API key from [Camb.ai](https://camb.ai)
2. Set environment variable:
```bash
export CAMB_API_KEY=your_key_here
```

### Build

Follow the standard TEN Framework extension build process.

### Unit test

Run tests using the standard TEN Framework testing approach.

## Resources

- [Camb.ai API Documentation](https://camb.mintlify.app/)
- [Getting Started](https://camb.mintlify.app/getting-started)
- [API Reference](https://camb.mintlify.app/api-reference/endpoint/create-tts-stream)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0.
# See the LICENSE file for more information.
#
from . import addon
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0.
# See the LICENSE file for more information.
#
from ten_runtime import (
Addon,
register_addon_as_extension,
TenEnv,
)


@register_addon_as_extension("camb_tts_python")
class CambTTSExtensionAddon(Addon):

def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None:
from .extension import CambTTSExtension

ten_env.log_info("CambTTSExtensionAddon on_create_instance")
ten_env.on_create_instance_done(CambTTSExtension(name), context)
215 changes: 215 additions & 0 deletions ai_agents/agents/ten_packages/extension/camb_tts_python/camb_tts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0.
# See the LICENSE file for more information.
#
from typing import Any, AsyncIterator, Tuple
from httpx import AsyncClient, Timeout, Limits

from .config import CambTTSConfig
from ten_runtime import AsyncTenEnv
from ten_ai_base.const import LOG_CATEGORY_VENDOR
from ten_ai_base.struct import TTS2HttpResponseEventType
from ten_ai_base.tts2_http import AsyncTTS2HttpClient


BYTES_PER_SAMPLE = 2
NUMBER_OF_CHANNELS = 1
SAMPLE_RATE = 24000


class CambTTSClient(AsyncTTS2HttpClient):
def __init__(
self,
config: CambTTSConfig,
ten_env: AsyncTenEnv,
):
super().__init__()
self.config = config
self.api_key = config.params.get("api_key", "")
self.ten_env: AsyncTenEnv = ten_env
self._is_cancelled = False
self.endpoint = config.params.get(
"endpoint", "https://client.camb.ai/apis/tts-stream"
)
self.headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
# Camb.ai TTS requires longer timeout (minimum 60s recommended)
self.client = AsyncClient(
timeout=Timeout(timeout=60.0),
limits=Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=600.0, # 10 minutes keepalive
),
http2=True, # Enable HTTP/2 if server supports it
)

async def cancel(self):
self.ten_env.log_debug("CambTTS: cancel() called.")
self._is_cancelled = True

async def get(
self, text: str, request_id: str
) -> AsyncIterator[Tuple[bytes | None, TTS2HttpResponseEventType]]:
"""Process a single TTS request in serial manner"""
self._is_cancelled = False
if not self.client:
self.ten_env.log_error(
f"CambTTS: client not initialized for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
raise RuntimeError(
f"CambTTS: client not initialized for request_id: {request_id}."
)

if len(text.strip()) == 0:
self.ten_env.log_warn(
f"CambTTS: empty text for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield None, TTS2HttpResponseEventType.END
return

# Validate text length (Camb.ai requires 3-3000 characters)
text_len = len(text.strip())
if text_len < 3:
self.ten_env.log_warn(
f"CambTTS: text too short ({text_len} chars, min 3) for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield None, TTS2HttpResponseEventType.END
return

if text_len > 3000:
self.ten_env.log_warn(
f"CambTTS: text too long ({text_len} chars, max 3000), truncating for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
text = text[:3000]

try:
# Build payload with Camb.ai's nested structure
payload = {
"text": text,
"voice_id": self.config.params.get("voice_id", 2681),
"language": self.config.params.get("language", "en-us"),
"speech_model": self.config.params.get("speech_model", "mars-8-flash"),
"output_configuration": {
"format": self.config.params.get("format", "pcm_s16le"),
},
"voice_settings": {
"speed": self.config.params.get("speed", 1.0),
},
}

async with self.client.stream(
"POST",
self.endpoint,
headers=self.headers,
json=payload,
) as response:
# Check for HTTP errors before streaming
if response.status_code == 401:
error_message = "Invalid Camb.ai API key. Set CAMB_API_KEY environment variable with your API key from https://camb.ai"
self.ten_env.log_error(
f"CambTTS: {error_message} for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.INVALID_KEY_ERROR
return

if response.status_code == 403:
voice_id = self.config.params.get("voice_id", 2681)
error_message = f"Voice ID {voice_id} is not accessible with your API key. Use list_voices() to see available voices."
self.ten_env.log_error(
f"CambTTS: {error_message} for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.ERROR
return

if response.status_code == 429:
error_message = "Rate limit exceeded. Please wait before making more requests."
self.ten_env.log_error(
f"CambTTS: {error_message} for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.ERROR
return

if response.status_code >= 400:
error_body = await response.aread()
error_message = f"API Error {response.status_code}: {error_body.decode('utf-8', errors='replace')}"
self.ten_env.log_error(
f"CambTTS: {error_message} for request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.ERROR
return

async for chunk in response.aiter_bytes(chunk_size=8192):
if self._is_cancelled:
self.ten_env.log_debug(
f"Cancellation flag detected, sending flush event and stopping TTS stream of request_id: {request_id}."
)
yield None, TTS2HttpResponseEventType.FLUSH
break

self.ten_env.log_debug(
f"CambTTS: sending EVENT_TTS_RESPONSE, length: {len(chunk)} of request_id: {request_id}."
)

if len(chunk) > 0:
yield bytes(chunk), TTS2HttpResponseEventType.RESPONSE
else:
yield None, TTS2HttpResponseEventType.END

if not self._is_cancelled:
self.ten_env.log_debug(
f"CambTTS: sending EVENT_TTS_END of request_id: {request_id}."
)
yield None, TTS2HttpResponseEventType.END

except Exception as e:
# Check if it's an API key authentication error
error_message = str(e)
self.ten_env.log_error(
f"vendor_error: {error_message} of request_id: {request_id}.",
category=LOG_CATEGORY_VENDOR,
)
if "401" in error_message:
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.INVALID_KEY_ERROR
else:
yield error_message.encode(
"utf-8"
), TTS2HttpResponseEventType.ERROR

async def clean(self):
# In this new model, most cleanup is handled by the connection object's lifecycle.
# This can be used for any additional cleanup if needed.
self.ten_env.log_debug("CambTTS: clean() called.")
try:
await self.client.aclose()
finally:
pass

def get_extra_metadata(self) -> dict[str, Any]:
"""Return extra metadata for TTFB metrics."""
return {
"voice_id": self.config.params.get("voice_id", 2681),
"speech_model": self.config.params.get("speech_model", "mars-8-flash"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0.
# See the LICENSE file for more information.
#
from typing import Any
import copy
from pathlib import Path
from ten_ai_base import utils
from ten_ai_base.tts2_http import AsyncTTS2HttpConfig

from pydantic import Field


class CambTTSConfig(AsyncTTS2HttpConfig):
"""Camb.ai TTS Config"""

# Debug and logging
dump: bool = Field(default=False, description="Camb TTS dump")
dump_path: str = Field(
default_factory=lambda: str(Path(__file__).parent / "camb_tts_in.pcm"),
description="Camb TTS dump path",
)
params: dict[str, Any] = Field(
default_factory=dict, description="Camb TTS params"
)

def update_params(self) -> None:
"""Update configuration from params dictionary"""
# Keys to exclude from params after processing (not passthrough params)
blacklist_keys = [
"text",
"endpoint",
]

# Remove blacklisted keys from params
for key in blacklist_keys:
if key in self.params:
del self.params[key]

def to_str(self, sensitive_handling: bool = True) -> str:
"""Convert config to string with optional sensitive data handling."""
if not sensitive_handling:
return f"{self}"

config = copy.deepcopy(self)

# Encrypt sensitive fields in params
if config.params and "api_key" in config.params:
config.params["api_key"] = utils.encrypt(config.params["api_key"])

return f"{config}"

def validate(self) -> None:
"""Validate Camb-specific configuration."""
if "api_key" not in self.params or not self.params["api_key"]:
raise ValueError("API key is required for Camb TTS")
Loading
Loading