From 11f8ad8f9cffa65ef52129d99f21fa16bd687cd3 Mon Sep 17 00:00:00 2001 From: "Hermes (agora)" Date: Wed, 17 Jun 2026 22:50:15 -0400 Subject: [PATCH] removed replay lock --- .fern/replay.lock | 3604 --------------------------------------------- 1 file changed, 3604 deletions(-) delete mode 100644 .fern/replay.lock diff --git a/.fern/replay.lock b/.fern/replay.lock deleted file mode 100644 index fc1da4b..0000000 --- a/.fern/replay.lock +++ /dev/null @@ -1,3604 +0,0 @@ -# DO NOT EDIT MANUALLY - Managed by Fern Replay -version: "1.0" -generations: - - commit_sha: a217c8ecfd919345831eebaca8295e292d65ebcf - tree_hash: 707f496ae7e028b80fc9a2adc1e5d69468f170b3 - timestamp: 2026-05-20T20:38:02.180Z - cli_version: unknown - generator_versions: {} - - commit_sha: b66d871314ca0e5929cb9c9095949a7fd5e856a7 - tree_hash: db7756fbc0a5c6923371615dd752c8e17b2d828b - timestamp: 2026-06-04T20:30:41.901Z - cli_version: unknown - generator_versions: - fernapi/fern-python-sdk: 4.37.0 - - commit_sha: 1d61baad436285e3b6a37555edb5ca67c158681c - tree_hash: 277360a3264a8c6b4bef09971b179275aab994ec - timestamp: 2026-06-17T07:20:24.878Z - cli_version: unknown - generator_versions: - fernapi/fern-python-sdk: 4.37.0 -current_generation: 1d61baad436285e3b6a37555edb5ca67c158681c -patches: - - id: patch-7465fada - content_hash: sha256:a2f90f66c927424018f2c3304742f097e8594dec9cb2f783264c7b11679a14ac - original_commit: 7465fadafa0f1e62051d99b42d0eeda85f31eeee - original_message: "fix(agentkit): resolve Python session typing issues" - original_author: digitallysavvy - base_generation: 1d61baad436285e3b6a37555edb5ca67c158681c - files: - - src/agora_agent/agentkit/agent_session.py - patch_content: | - diff --git a/src/agora_agent/agentkit/agent_session.py b/src/agora_agent/agentkit/agent_session.py - index 2900c18..745c465 100644 - --- a/src/agora_agent/agentkit/agent_session.py - +++ b/src/agora_agent/agentkit/agent_session.py - @@ -15,6 +15,7 @@ from ..agent_management.types.agent_think_agent_management_response import ( - AgentThinkAgentManagementResponse as AgentThinkResponse, - ) - from ..agents.types.get_turns_agents_response import GetTurnsAgentsResponse - +from ..agents.types.get_turns_agents_response import GetTurnsAgentsResponse - from .agent import Agent, GetTurnsOptions, SayOptions, ThinkOptions, _start_properties_from_mapping - from .avatar_types import ( - is_akool_avatar, - theirs_snapshot: - src/agora_agent/agentkit/agent_session.py: | - import typing - import warnings - - from ..core.api_error import ApiError - from ..agent_management.types.agent_think_agent_management_request_on_listening_action import ( - AgentThinkAgentManagementRequestOnListeningAction as AgentThinkRequestOnListeningAction, - ) - from ..agent_management.types.agent_think_agent_management_request_on_speaking_action import ( - AgentThinkAgentManagementRequestOnSpeakingAction as AgentThinkRequestOnSpeakingAction, - ) - from ..agent_management.types.agent_think_agent_management_request_on_thinking_action import ( - AgentThinkAgentManagementRequestOnThinkingAction as AgentThinkRequestOnThinkingAction, - ) - from ..agent_management.types.agent_think_agent_management_response import ( - AgentThinkAgentManagementResponse as AgentThinkResponse, - ) - from ..agents.types.get_turns_agents_response import GetTurnsAgentsResponse - from ..agents.types.get_turns_agents_response import GetTurnsAgentsResponse - from .agent import Agent, GetTurnsOptions, SayOptions, ThinkOptions, _start_properties_from_mapping - from .avatar_types import ( - is_akool_avatar, - is_anam_avatar, - is_avatar_token_managed, - is_generic_avatar, - is_heygen_avatar, - is_live_avatar_avatar, - is_rtc_avatar, - validate_avatar_config, - validate_tts_sample_rate, - ) - from .presets import ( - get_preset_category, - infer_asr_preset, - infer_llm_preset, - infer_tts_preset, - normalize_preset_input, - resolve_session_presets, - ) - from .token import generate_convo_ai_token, _parse_numeric_uid - - - class _AgentSessionRequiredOptions(typing.TypedDict, total=True): - """Required fields shared by both sync and async session constructors.""" - - client: typing.Any - agent: Agent - app_id: str - name: str - channel: str - agent_uid: str - remote_uids: typing.List[str] - - - class AgentSessionOptions(_AgentSessionRequiredOptions, total=False): - """Configuration options for creating an agent session. - - Required fields - --------------- - client, agent, app_id, name, channel, agent_uid, remote_uids - - Optional fields - --------------- - app_certificate, token, idle_timeout, enable_string_uid, preset, - pipeline_id, expires_in, debug, warn - """ - - app_certificate: str - token: str - idle_timeout: int - enable_string_uid: bool - preset: typing.Union[str, typing.Sequence[str]] - pipeline_id: str - expires_in: int - debug: bool - warn: typing.Callable[[str], None] - - - class _AgentSessionBase: - """Shared state and helpers for :class:`AgentSession` and :class:`AsyncAgentSession`. - - Not intended for direct use — instantiate one of the concrete subclasses or - call :meth:`Agent.create_session` / :meth:`Agent.create_async_session`. - """ - - def __init__( - self, - client: typing.Any, - agent: Agent, - app_id: str, - name: str, - channel: str, - agent_uid: str, - remote_uids: typing.List[str], - app_certificate: typing.Optional[str] = None, - token: typing.Optional[str] = None, - idle_timeout: typing.Optional[int] = None, - enable_string_uid: typing.Optional[bool] = None, - preset: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, - pipeline_id: typing.Optional[str] = None, - expires_in: typing.Optional[int] = None, - debug: typing.Optional[bool] = None, - warn: typing.Optional[typing.Callable[[str], None]] = None, - ): - self._client = client - self._agent = agent - self._app_id = app_id - self._app_certificate = app_certificate - self._name = name - self._channel = channel - self._token = token - self._agent_uid = agent_uid - self._remote_uids = remote_uids - self._idle_timeout = idle_timeout - self._enable_string_uid = enable_string_uid - self._preset = preset - self._pipeline_id = pipeline_id - self._expires_in = expires_in - self._debug = debug - self._warn = warn or warnings.warn - self._agent_id: typing.Optional[str] = None - self._status: str = "idle" - self._event_handlers: typing.Dict[str, typing.List[typing.Callable[..., None]]] = {} - - # ------------------------------------------------------------------ - # Public read-only properties - # ------------------------------------------------------------------ - - @property - def id(self) -> typing.Optional[str]: - return self._agent_id - - @property - def status(self) -> str: - return self._status - - @property - def agent(self) -> Agent: - return self._agent - - @property - def app_id(self) -> str: - return self._app_id - - @property - def raw(self) -> typing.Any: - """Direct access to the underlying Fern-generated AgentsClient. - - Use this to access any new endpoints that Fern generates without - waiting for agentkit method updates. - """ - return self._client.agents - - @property - def raw_agent_management(self) -> typing.Any: - """Direct access to the underlying Fern-generated AgentManagement client.""" - return self._client.agent_management - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - - def _convo_ai_headers(self) -> typing.Optional[typing.Dict[str, str]]: - """Return per-request auth headers when client is in app-credentials mode. - - In app-credentials mode a fresh ConvoAI token (RTC + RTM) is generated - for every request and returned as ``Authorization: agora token=``. - In basic-auth mode this returns ``None`` (the client-level header is used). - """ - if getattr(self._client, "auth_mode", None) != "app-credentials": - return None - app_id: str = getattr(self._client, "app_id", self._app_id) - app_certificate: typing.Optional[str] = getattr( - self._client, "app_certificate", self._app_certificate - ) - if not app_certificate: - raise RuntimeError("app_certificate is required for app-credentials auth mode") - token = generate_convo_ai_token( - app_id=app_id, - app_certificate=app_certificate, - channel_name=self._channel, - uid=_parse_numeric_uid(self._agent_uid, "agent_uid"), - ) - return {"Authorization": f"agora token={token}"} - - def _request_options(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - """Build request_options dict with per-request auth headers if needed.""" - headers = self._convo_ai_headers() - if headers is None: - return None - return {"additional_headers": headers} - - def _validate_avatar_config(self) -> None: - avatar = self._agent.avatar - tts = self._agent.tts - if not avatar or avatar.get("enable", True) is False: - return - if self._is_mllm_mode(): - raise ValueError( - "Avatars are only supported with the cascading ASR + LLM + TTS pipeline. " - "Remove the avatar configuration when using MLLM, or switch to a cascading session." - ) - - if ( - is_heygen_avatar(avatar) - or is_live_avatar_avatar(avatar) - or is_akool_avatar(avatar) - or is_anam_avatar(avatar) - or is_generic_avatar(avatar) - ): - validate_avatar_config(avatar) - - tts_params = tts.get("params") if isinstance(tts, dict) else None - sample_rate = self._agent.tts_sample_rate - if sample_rate is None and isinstance(tts_params, dict): - sample_rate = ( - tts_params.get("sample_rate") - or tts_params.get("sample_rate_hertz") - or tts_params.get("samplingRate") - ) - if isinstance(sample_rate, int): - validate_tts_sample_rate(avatar, sample_rate) - elif is_heygen_avatar(avatar): - self._warn( - "Warning: HeyGen avatar detected but TTS sample_rate is not explicitly set. " - "HeyGen requires 24,000 Hz. Please ensure your TTS provider is configured for 24kHz." - ) - elif is_live_avatar_avatar(avatar): - self._warn( - "Warning: LiveAvatar avatar detected but TTS sample_rate is not explicitly set. " - "LiveAvatar requires 24,000 Hz. Please ensure your TTS provider is configured for 24kHz." - ) - elif is_akool_avatar(avatar): - self._warn( - "Warning: Akool avatar detected but TTS sample_rate is not explicitly set. " - "Akool requires 16,000 Hz. Please ensure your TTS provider is configured for 16kHz." - ) - - def _enrich_avatar_for_session(self, properties: typing.Dict[str, typing.Any]) -> None: - avatar = properties.get("avatar") - if not isinstance(avatar, dict) or avatar.get("enable", True) is False: - return - - params = avatar.get("params") - if not isinstance(params, dict): - params = {} - avatar["params"] = params - - if is_generic_avatar(avatar): - if not params.get("agora_appid"): - params["agora_appid"] = self._app_id - if not params.get("agora_channel"): - params["agora_channel"] = self._channel - - if not is_avatar_token_managed(avatar): - validate_avatar_config(avatar, require_session_fields=is_generic_avatar(avatar)) - return - - if not params.get("agora_uid"): - validate_avatar_config(avatar, require_session_fields=is_generic_avatar(avatar)) - return - - if not params.get("agora_token"): - if not self._app_certificate: - raise ValueError( - "Cannot auto-generate avatar RTC token: app_certificate is required when agora_token is omitted. " - "Pass app_certificate on the Agora client or supply agora_token explicitly on the avatar vendor." - ) - token_kwargs: typing.Dict[str, typing.Any] = {} - if self._expires_in is not None: - token_kwargs["token_expire"] = self._expires_in - params["agora_token"] = generate_convo_ai_token( - app_id=self._app_id, - app_certificate=self._app_certificate, - channel_name=self._channel, - uid=_parse_numeric_uid(str(params["agora_uid"]), "avatar agora_uid"), - **token_kwargs, - ) - - if str(params.get("agora_uid")) == self._agent_uid: - self._warn( - "Warning: avatar agora_uid matches agent_rtc_uid. Use a unique UID for the avatar video publisher." - ) - - validate_avatar_config(avatar, require_session_fields=True) - - @staticmethod - def _dump_model(value: typing.Any) -> typing.Any: - if hasattr(value, "model_dump"): - return value.model_dump(exclude_none=True) - if isinstance(value, dict): - return {k: _AgentSessionBase._dump_model(v) for k, v in value.items() if v is not None} - if isinstance(value, list): - return [_AgentSessionBase._dump_model(item) for item in value] - return value - - def _is_mllm_mode(self) -> bool: - mllm = self._agent.mllm - if isinstance(mllm, dict) and mllm.get("enable") is True: - return True - return mllm is not None - - def _build_start_properties( - self, - token_opts: typing.Dict[str, typing.Any], - skip_vendor_validation_categories: typing.AbstractSet[str], - allow_missing_vendor_categories: typing.AbstractSet[str], - ) -> typing.Dict[str, typing.Any]: - base_properties = self._agent.to_properties( - channel=self._channel, - agent_uid=self._agent_uid, - remote_uids=self._remote_uids, - idle_timeout=self._idle_timeout, - enable_string_uid=self._enable_string_uid, - skip_vendor_validation_categories=skip_vendor_validation_categories, - allow_missing_vendor_categories=allow_missing_vendor_categories, - **token_opts, - ) - properties = self._dump_model(base_properties) - self._enrich_avatar_for_session(properties) - - if self._is_mllm_mode(): - if self._agent.mllm is not None: - mllm = self._dump_model(self._agent.mllm) - if not isinstance(mllm, dict): - mllm = {} - if self._agent.greeting is not None: - mllm.setdefault("greeting_message", self._agent.greeting) - if self._agent.failure_message is not None: - mllm.setdefault("failure_message", self._agent.failure_message) - properties["mllm"] = mllm - return properties - - if self._agent.tts is not None: - properties["tts"] = self._dump_model(self._agent.tts) - if self._agent.llm is not None: - llm = dict(self._agent.llm) - if self._agent.instructions is not None and "system_messages" not in llm: - llm["system_messages"] = [{"role": "system", "content": self._agent.instructions}] - if self._agent.greeting is not None and "greeting_message" not in llm: - llm["greeting_message"] = self._agent.greeting - if self._agent.greeting_configs is not None and "greeting_configs" not in llm: - llm["greeting_configs"] = self._dump_model(self._agent.greeting_configs) - if self._agent.failure_message is not None and "failure_message" not in llm: - llm["failure_message"] = self._agent.failure_message - if self._agent.max_history is not None and "max_history" not in llm: - llm["max_history"] = self._agent.max_history - properties["llm"] = llm - if self._agent.stt is not None: - properties["asr"] = self._dump_model(self._agent.stt) - - return properties - - @staticmethod - def _request_properties_for_start( - resolved_properties: typing.Dict[str, typing.Any], - *, - resolved_preset: typing.Optional[str], - pipeline_id: typing.Optional[str], - ) -> typing.Any: - try: - return _start_properties_from_mapping(resolved_properties) - except Exception as exc: - if pipeline_id: - return resolved_properties - if resolved_preset: - normalized_preset = normalize_preset_input(resolved_preset) - if not normalized_preset: - raise - preset_categories = { - category - for item in normalized_preset.split(",") - for category in [get_preset_category(item)] - if category is not None - } - error_categories = _AgentSessionBase._validation_error_categories(exc) - if error_categories and error_categories.issubset(preset_categories): - return resolved_properties - raise - - @staticmethod - def _validation_error_categories(exc: Exception) -> typing.Set[str]: - errors = getattr(exc, "errors", None) - if not callable(errors): - return set() - categories: typing.Set[str] = set() - for error in errors(): - loc = error.get("loc") if isinstance(error, dict) else None - if isinstance(loc, tuple) and loc: - field = loc[0] - if field in {"asr", "llm", "tts"}: - categories.add(typing.cast(str, field)) - return categories - - def _vendor_validation_categories( - self, - pipeline_id: typing.Optional[str], - ) -> typing.Tuple[typing.Set[str], typing.Set[str]]: - skip_categories: typing.Set[str] = set() - allow_missing_categories: typing.Set[str] = {"asr", "llm", "tts"} if pipeline_id else set() - - preset = normalize_preset_input(self._preset) - if preset: - for item in preset.split(","): - category = get_preset_category(item) - if category is not None: - skip_categories.add(category) - allow_missing_categories.add(category) - - if infer_asr_preset(self._agent.stt): - skip_categories.add("asr") - if infer_llm_preset(self._agent.llm): - skip_categories.add("llm") - if infer_tts_preset(self._agent.tts): - skip_categories.add("tts") - return skip_categories, allow_missing_categories - - @staticmethod - def _page_value(pagination: typing.Any, field: str) -> typing.Any: - if pagination is None: - return None - if isinstance(pagination, dict): - return pagination.get(field) - return getattr(pagination, field, None) - - @staticmethod - def _response_turns(response: typing.Any) -> typing.List[typing.Any]: - turns = response.get("turns") if isinstance(response, dict) else getattr(response, "turns", None) - return list(turns or []) - - @staticmethod - def _response_pagination(response: typing.Any) -> typing.Any: - if isinstance(response, dict): - return response.get("pagination") - return getattr(response, "pagination", None) - - @classmethod - def _with_all_turns(cls, first_response: typing.Any, turns: typing.List[typing.Any]) -> GetTurnsAgentsResponse: - data = cls._dump_model(first_response) - if not isinstance(data, dict): - data = {} - data["turns"] = turns - return GetTurnsAgentsResponse(**data) - - # ------------------------------------------------------------------ - # Event handling - # ------------------------------------------------------------------ - - def on(self, event: str, handler: typing.Callable[..., None]) -> None: - """Register an event handler. - - Parameters - ---------- - event : str - The event type (``started``, ``stopped``, ``error``). - handler : callable - The event handler to invoke when the event fires. - """ - if event not in self._event_handlers: - self._event_handlers[event] = [] - self._event_handlers[event].append(handler) - - def off(self, event: str, handler: typing.Callable[..., None]) -> None: - """Unregister a previously registered event handler.""" - handlers = self._event_handlers.get(event) - if handlers and handler in handlers: - handlers.remove(handler) - - def _emit(self, event: str, data: typing.Any) -> None: - handlers = self._event_handlers.get(event) - if handlers: - for handler in handlers: - try: - handler(data) - except Exception as exc: - # Prevent a misbehaving handler from blocking other handlers or - # the session lifecycle. Warn so the error is not silently lost. - warnings.warn( - f"Event handler for '{event}' raised an exception: {exc}", - stacklevel=2, - ) - - - class AgentSession(_AgentSessionBase): - """Manages the lifecycle of an agent session (synchronous). - - This class provides a high-level interface for managing agent sessions, - including starting, stopping, and interacting with the agent. - - Use :meth:`Agent.create_session` to create a session — this is the - recommended entry point. - - Examples - -------- - >>> from agora_agent import Agora, Area, Agent, OpenAI, ElevenLabsTTS - >>> - >>> client = Agora(area=Area.US, app_id="...", app_certificate="...") - >>> agent = Agent(name="assistant", instructions="You are a helpful voice assistant.") - >>> agent = agent.with_llm(OpenAI(api_key="...", base_url="https://api.openai.com/v1/chat/completions", model="gpt-4")).with_tts(ElevenLabsTTS(key="...", model_id="...", voice_id="...", base_url="wss://api.elevenlabs.io/v1")) - >>> session = agent.create_session(client, channel="room-123", agent_uid="1", remote_uids=["100"]) - >>> agent_id = session.start() - >>> session.say("Hello!") - >>> session.stop() - """ - - def start(self) -> str: - """Start the agent session. - - Returns - ------- - str - The agent ID. - - Raises - ------ - RuntimeError - If the session is not in a startable state. - ValueError - If avatar/TTS configuration is invalid. - """ - if self._status not in ("idle", "stopped", "error"): - raise RuntimeError(f"Cannot start session in {self._status} state") - - self._validate_avatar_config() - self._status = "starting" - - try: - pipeline_id = self._pipeline_id if self._pipeline_id is not None else self._agent.pipeline_id - if self._token: - token_opts: typing.Dict[str, typing.Any] = {"token": self._token} - else: - token_opts = { - "app_id": self._app_id, - "app_certificate": self._app_certificate, - "expires_in": self._expires_in, - } - - skip_categories, allow_missing_categories = self._vendor_validation_categories(pipeline_id) - properties = self._build_start_properties( - token_opts, - skip_vendor_validation_categories=skip_categories, - allow_missing_vendor_categories=allow_missing_categories, - ) - resolved_preset, resolved_properties = resolve_session_presets( - self._preset, - properties, - ) - - if self._debug: - print("[Agora Debug] Starting agent session...") - print("[Agora Debug] Request:", { - "appid": self._app_id, - "name": self._name, - "preset": resolved_preset, - "pipeline_id": pipeline_id, - "properties": resolved_properties, - }) - - request_properties = self._request_properties_for_start( - resolved_properties, - resolved_preset=resolved_preset, - pipeline_id=pipeline_id, - ) - - response = self._client.agents.start( - self._app_id, - name=self._name, - properties=request_properties, - preset=resolved_preset, - pipeline_id=pipeline_id, - request_options=self._request_options(), - ) - - self._agent_id = response.agent_id if hasattr(response, "agent_id") else None - self._status = "running" - self._emit("started", {"agent_id": self._agent_id}) - return self._agent_id or "" - except Exception as e: - self._status = "error" - self._emit("error", e) - raise - - def stop(self) -> None: - """Stop the agent session. - - If the agent has already stopped (e.g., crashed or timed out), the - server returns 404, which this method treats as a successful stop - rather than raising an error. - """ - if self._status != "running": - raise RuntimeError(f"Cannot stop session in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - self._status = "stopping" - - try: - self._client.agents.stop( - self._app_id, self._agent_id, request_options=self._request_options() - ) - self._status = "stopped" - self._emit("stopped", {"agent_id": self._agent_id}) - except ApiError as e: - if e.status_code == 404: - self._status = "stopped" - self._emit("stopped", {"agent_id": self._agent_id}) - return - self._status = "error" - self._emit("error", e) - raise - except Exception as e: - self._status = "error" - self._emit("error", e) - raise - - def say( - self, - text: str, - priority: typing.Optional[str] = None, - interruptable: typing.Optional[bool] = None, - *, - options: typing.Optional["SayOptions"] = None, - ) -> None: - """Send a message to be spoken by the agent. - - Parameters - ---------- - text : str - The text to speak. - priority : str, optional - Priority of the message (``INTERRUPT``, ``APPEND``, ``IGNORE``). - interruptable : bool, optional - Whether the message can be interrupted by the user. - """ - if self._status != "running": - raise RuntimeError(f"Cannot say in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {"text": text} - if options is not None: - kwargs.update(options) - if priority is not None: - kwargs["priority"] = priority - if interruptable is not None: - kwargs["interruptable"] = interruptable - - self._client.agents.speak( - self._app_id, self._agent_id, request_options=self._request_options(), **kwargs - ) - - def interrupt(self) -> None: - """Interrupt the agent while it is speaking or thinking.""" - if self._status != "running": - raise RuntimeError(f"Cannot interrupt in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - self._client.agents.interrupt( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - def think( - self, - text: str, - *, - on_listening_action: typing.Optional[AgentThinkRequestOnListeningAction] = None, - on_thinking_action: typing.Optional[AgentThinkRequestOnThinkingAction] = None, - on_speaking_action: typing.Optional[AgentThinkRequestOnSpeakingAction] = None, - interruptable: typing.Optional[bool] = None, - metadata: typing.Optional[typing.Dict[str, str]] = None, - options: typing.Optional["ThinkOptions"] = None, - ) -> AgentThinkResponse: - """Inject a custom text instruction into the current session pipeline. - - In API v2.7, omitting ``on_listening_action`` uses the server default - ``"interrupt"``. Pass ``on_listening_action="inject"`` explicitly to - preserve the pre-v2.7 behavior. - """ - if self._status != "running": - raise RuntimeError(f"Cannot think in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {"text": text} - if options is not None: - kwargs.update(options) - if on_listening_action is not None: - kwargs["on_listening_action"] = on_listening_action - if on_thinking_action is not None: - kwargs["on_thinking_action"] = on_thinking_action - if on_speaking_action is not None: - kwargs["on_speaking_action"] = on_speaking_action - if interruptable is not None: - kwargs["interruptable"] = interruptable - if metadata is not None: - kwargs["metadata"] = metadata - - return self._client.agent_management.agent_think( - self._app_id, - self._agent_id, - request_options=self._request_options(), - **kwargs, - ) - - def update(self, properties: typing.Any) -> None: - """Update the agent configuration at runtime. - - Parameters - ---------- - properties : UpdateAgentsRequestProperties - Partial configuration to update. - """ - if self._status != "running": - raise RuntimeError(f"Cannot update in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - self._client.agents.update( - self._app_id, - self._agent_id, - properties=properties, - request_options=self._request_options(), - ) - - def get_history(self) -> typing.Any: - """Get the conversation history.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - return self._client.agents.get_history( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - def get_info(self) -> typing.Any: - """Get the current session info.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - return self._client.agents.get( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - def get_turns( - self, - *, - page_index: typing.Optional[int] = None, - page_size: typing.Optional[int] = None, - options: typing.Optional["GetTurnsOptions"] = None, - ) -> GetTurnsAgentsResponse: - """Get turn-by-turn analytics and timing details for this session.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {} - if options is not None: - kwargs.update(options) - if page_index is not None: - kwargs["page_index"] = page_index - if page_size is not None: - kwargs["page_size"] = page_size - - return self._client.agents.get_turns( - self._app_id, - self._agent_id, - request_options=self._request_options(), - **kwargs, - ) - - def get_all_turns(self, *, page_size: typing.Optional[int] = None) -> GetTurnsAgentsResponse: - """Get all turn analytics pages for this session. - - Raises ``RuntimeError`` if the server's pagination metadata is missing - the fields required to advance, or if requesting the next page returns - a page index that did not advance. - """ - response = self.get_turns(page_index=1, page_size=page_size) - all_turns = self._response_turns(response) - pagination = self._response_pagination(response) - current_page = self._page_value(pagination, "page_index") or 1 - while pagination is not None and self._page_value(pagination, "is_last_page") is False: - total_pages = self._page_value(pagination, "total_pages") - returned_index = self._page_value(pagination, "page_index") - if returned_index is None and total_pages is None: - raise RuntimeError( - "get_all_turns pagination cannot continue: response must include " - "page_index, total_pages, or is_last_page=true." - ) - if total_pages is not None and current_page >= total_pages: - break - next_page = current_page + 1 - response = self.get_turns(page_index=next_page, page_size=page_size) - all_turns.extend(self._response_turns(response)) - pagination = self._response_pagination(response) - returned_index = self._page_value(pagination, "page_index") if pagination else None - if returned_index is not None: - if returned_index <= current_page and self._page_value(pagination, "is_last_page") is not True: - raise RuntimeError( - f"get_all_turns pagination did not advance: requested page {next_page}, " - f"received page {returned_index}." - ) - current_page = returned_index - else: - total_pages = self._page_value(pagination, "total_pages") if pagination else None - is_last_page = self._page_value(pagination, "is_last_page") if pagination else None - if total_pages is None and is_last_page is not True: - raise RuntimeError( - "get_all_turns pagination cannot continue: response must include " - "page_index, total_pages, or is_last_page=true." - ) - current_page = next_page - return self._with_all_turns(response, all_turns) - - - class AsyncAgentSession(_AgentSessionBase): - """Async version of :class:`AgentSession` for use with :class:`AsyncAgora`. - - Use :meth:`Agent.create_async_session` to create a session — this is the - recommended entry point. - - Examples - -------- - >>> from agora_agent import AsyncAgora, Area, Agent, OpenAI, ElevenLabsTTS - >>> - >>> client = AsyncAgora(area=Area.US, app_id="...", app_certificate="...") - >>> agent = Agent(name="assistant", instructions="You are helpful.") - >>> agent = agent.with_llm(OpenAI(api_key="...", base_url="https://api.openai.com/v1/chat/completions", model="gpt-4")).with_tts(ElevenLabsTTS(key="...", model_id="...", voice_id="...", base_url="wss://api.elevenlabs.io/v1")) - >>> session = agent.create_async_session(client, channel="room-123", agent_uid="1", remote_uids=["100"]) - >>> agent_id = await session.start() - >>> await session.say("Hello!") - >>> await session.stop() - """ - - async def start(self) -> str: - """Start the agent session. - - Returns - ------- - str - The agent ID. - - Raises - ------ - RuntimeError - If the session is not in a startable state. - ValueError - If avatar/TTS configuration is invalid. - """ - if self._status not in ("idle", "stopped", "error"): - raise RuntimeError(f"Cannot start session in {self._status} state") - - self._validate_avatar_config() - self._status = "starting" - - try: - pipeline_id = self._pipeline_id if self._pipeline_id is not None else self._agent.pipeline_id - if self._token: - token_opts: typing.Dict[str, typing.Any] = {"token": self._token} - else: - token_opts = { - "app_id": self._app_id, - "app_certificate": self._app_certificate, - "expires_in": self._expires_in, - } - - skip_categories, allow_missing_categories = self._vendor_validation_categories(pipeline_id) - properties = self._build_start_properties( - token_opts, - skip_vendor_validation_categories=skip_categories, - allow_missing_vendor_categories=allow_missing_categories, - ) - resolved_preset, resolved_properties = resolve_session_presets( - self._preset, - properties, - ) - - if self._debug: - print("[Agora Debug] Starting agent session...") - print("[Agora Debug] Request:", { - "appid": self._app_id, - "name": self._name, - "preset": resolved_preset, - "pipeline_id": pipeline_id, - "properties": resolved_properties, - }) - - request_properties = self._request_properties_for_start( - resolved_properties, - resolved_preset=resolved_preset, - pipeline_id=pipeline_id, - ) - - response = await self._client.agents.start( - self._app_id, - name=self._name, - properties=request_properties, - preset=resolved_preset, - pipeline_id=pipeline_id, - request_options=self._request_options(), - ) - - self._agent_id = response.agent_id if hasattr(response, "agent_id") else None - self._status = "running" - self._emit("started", {"agent_id": self._agent_id}) - return self._agent_id or "" - except Exception as e: - self._status = "error" - self._emit("error", e) - raise - - async def stop(self) -> None: - """Stop the agent session. - - If the agent has already stopped (e.g., crashed or timed out), the - server returns 404, which this method treats as a successful stop - rather than raising an error. - """ - if self._status != "running": - raise RuntimeError(f"Cannot stop session in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - self._status = "stopping" - - try: - await self._client.agents.stop( - self._app_id, self._agent_id, request_options=self._request_options() - ) - self._status = "stopped" - self._emit("stopped", {"agent_id": self._agent_id}) - except ApiError as e: - if e.status_code == 404: - self._status = "stopped" - self._emit("stopped", {"agent_id": self._agent_id}) - return - self._status = "error" - self._emit("error", e) - raise - except Exception as e: - self._status = "error" - self._emit("error", e) - raise - - async def say( - self, - text: str, - priority: typing.Optional[str] = None, - interruptable: typing.Optional[bool] = None, - *, - options: typing.Optional["SayOptions"] = None, - ) -> None: - """Send a message to be spoken by the agent. - - Parameters - ---------- - text : str - The text to speak. - priority : str, optional - Priority of the message (``INTERRUPT``, ``APPEND``, ``IGNORE``). - interruptable : bool, optional - Whether the message can be interrupted by the user. - """ - if self._status != "running": - raise RuntimeError(f"Cannot say in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {"text": text} - if options is not None: - kwargs.update(options) - if priority is not None: - kwargs["priority"] = priority - if interruptable is not None: - kwargs["interruptable"] = interruptable - - await self._client.agents.speak( - self._app_id, self._agent_id, request_options=self._request_options(), **kwargs - ) - - async def interrupt(self) -> None: - """Interrupt the agent while it is speaking or thinking.""" - if self._status != "running": - raise RuntimeError(f"Cannot interrupt in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - await self._client.agents.interrupt( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - async def think( - self, - text: str, - *, - on_listening_action: typing.Optional[AgentThinkRequestOnListeningAction] = None, - on_thinking_action: typing.Optional[AgentThinkRequestOnThinkingAction] = None, - on_speaking_action: typing.Optional[AgentThinkRequestOnSpeakingAction] = None, - interruptable: typing.Optional[bool] = None, - metadata: typing.Optional[typing.Dict[str, str]] = None, - options: typing.Optional["ThinkOptions"] = None, - ) -> AgentThinkResponse: - """Inject a custom text instruction into the current session pipeline. - - In API v2.7, omitting ``on_listening_action`` uses the server default - ``"interrupt"``. Pass ``on_listening_action="inject"`` explicitly to - preserve the pre-v2.7 behavior. - """ - if self._status != "running": - raise RuntimeError(f"Cannot think in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {"text": text} - if options is not None: - kwargs.update(options) - if on_listening_action is not None: - kwargs["on_listening_action"] = on_listening_action - if on_thinking_action is not None: - kwargs["on_thinking_action"] = on_thinking_action - if on_speaking_action is not None: - kwargs["on_speaking_action"] = on_speaking_action - if interruptable is not None: - kwargs["interruptable"] = interruptable - if metadata is not None: - kwargs["metadata"] = metadata - - return await self._client.agent_management.agent_think( - self._app_id, - self._agent_id, - request_options=self._request_options(), - **kwargs, - ) - - async def update(self, properties: typing.Any) -> None: - """Update the agent configuration at runtime. - - Parameters - ---------- - properties : UpdateAgentsRequestProperties - Partial configuration to update. - """ - if self._status != "running": - raise RuntimeError(f"Cannot update in {self._status} state") - if not self._agent_id: - raise RuntimeError("No agent ID available") - - await self._client.agents.update( - self._app_id, - self._agent_id, - properties=properties, - request_options=self._request_options(), - ) - - async def get_history(self) -> typing.Any: - """Get the conversation history.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - return await self._client.agents.get_history( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - async def get_info(self) -> typing.Any: - """Get the current session info.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - return await self._client.agents.get( - self._app_id, self._agent_id, request_options=self._request_options() - ) - - async def get_turns( - self, - *, - page_index: typing.Optional[int] = None, - page_size: typing.Optional[int] = None, - options: typing.Optional["GetTurnsOptions"] = None, - ) -> GetTurnsAgentsResponse: - """Get turn-by-turn analytics and timing details for this session.""" - if not self._agent_id: - raise RuntimeError("No agent ID available") - - kwargs: typing.Dict[str, typing.Any] = {} - if options is not None: - kwargs.update(options) - if page_index is not None: - kwargs["page_index"] = page_index - if page_size is not None: - kwargs["page_size"] = page_size - - return await self._client.agents.get_turns( - self._app_id, - self._agent_id, - request_options=self._request_options(), - **kwargs, - ) - - async def get_all_turns(self, *, page_size: typing.Optional[int] = None) -> GetTurnsAgentsResponse: - """Get all turn analytics pages for this session. - - Raises ``RuntimeError`` if the server's pagination metadata is missing - the fields required to advance, or if requesting the next page returns - a page index that did not advance. - """ - response = await self.get_turns(page_index=1, page_size=page_size) - all_turns = self._response_turns(response) - pagination = self._response_pagination(response) - current_page = self._page_value(pagination, "page_index") or 1 - while pagination is not None and self._page_value(pagination, "is_last_page") is False: - total_pages = self._page_value(pagination, "total_pages") - returned_index = self._page_value(pagination, "page_index") - if returned_index is None and total_pages is None: - raise RuntimeError( - "get_all_turns pagination cannot continue: response must include " - "page_index, total_pages, or is_last_page=true." - ) - if total_pages is not None and current_page >= total_pages: - break - next_page = current_page + 1 - response = await self.get_turns(page_index=next_page, page_size=page_size) - all_turns.extend(self._response_turns(response)) - pagination = self._response_pagination(response) - returned_index = self._page_value(pagination, "page_index") if pagination else None - if returned_index is not None: - if returned_index <= current_page and self._page_value(pagination, "is_last_page") is not True: - raise RuntimeError( - f"get_all_turns pagination did not advance: requested page {next_page}, " - f"received page {returned_index}." - ) - current_page = returned_index - else: - total_pages = self._page_value(pagination, "total_pages") if pagination else None - is_last_page = self._page_value(pagination, "is_last_page") if pagination else None - if total_pages is None and is_last_page is not True: - raise RuntimeError( - "get_all_turns pagination cannot continue: response must include " - "page_index, total_pages, or is_last_page=true." - ) - current_page = next_page - return self._with_all_turns(response, all_turns) - - id: patch-fae1249a - content_hash: sha256:01bf21f3cc4c784dfcff80a48c9c7bb3123af4327a567b7c990b528e9780e9a2 - original_commit: fae1249a20c53761a2eb5515a1bf92ca666760d1 - original_message: Re-export agora-agents API from legacy PyPI compatibility package The compat distribution delegates to agora_agent via __getattr__ and documents both import paths in its README. - original_author: digitallysavvy - base_generation: 1d61baad436285e3b6a37555edb5ca67c158681c - files: - - compat/agora-agent-server-sdk/README.md - - compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py - patch_content: |+ - From fae1249a20c53761a2eb5515a1bf92ca666760d1 Mon Sep 17 00:00:00 2001 - From: digitallysavvy - Date: Wed, 27 May 2026 16:58:18 -0400 - Subject: [PATCH] Re-export agora-agents API from legacy PyPI compatibility - package The compat distribution delegates to agora_agent via __getattr__ and - documents both import paths in its README. - - --- - compat/agora-agent-server-sdk/README.md | 7 +++++-- - .../src/agora_agent_server_sdk_compat/__init__.py | 14 +++++++++++++- - 2 files changed, 18 insertions(+), 3 deletions(-) - - diff --git a/compat/agora-agent-server-sdk/README.md b/compat/agora-agent-server-sdk/README.md - index cff3cfe..e43d1d8 100644 - --- a/compat/agora-agent-server-sdk/README.md - +++ b/compat/agora-agent-server-sdk/README.md - @@ -8,6 +8,9 @@ New projects should install: - pip install agora-agents - ``` - - -This compatibility package is kept only to preserve the legacy distribution name during the migration window. It depends on `agora-agents`, which continues to provide the `agora_agent` Python import path. - +This compatibility package re-exports the public API from `agora-agents` to support existing installs during the migration window. The primary import path remains `agora_agent`; you can also import from `agora_agent_server_sdk_compat`: - - -It intentionally contains only a minimal compatibility module so the distribution can be built and published cleanly with Poetry. - +```python - +from agora_agent import Agora, Area - +from agora_agent_server_sdk_compat import Agora, Area - +``` - diff --git a/compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py b/compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py - index 55522c6..6283244 100644 - --- a/compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py - +++ b/compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py - @@ -1 +1,13 @@ - -"""Compatibility package for the renamed agora-agents distribution.""" - +"""Compatibility re-exports for the renamed agora-agents package.""" - + - +import agora_agent as _agora_agent - + - +__all__ = getattr(_agora_agent, "__all__", []) - + - + - +def __getattr__(name: str): - + return getattr(_agora_agent, name) - + - + - +def __dir__(): - + return dir(_agora_agent) - -- - 2.52.0 - - theirs_snapshot: - compat/agora-agent-server-sdk/README.md: | - # agora-agent-server-sdk - - This package has been renamed to `agora-agents`. - - New projects should install: - - ```sh - pip install agora-agents - ``` - - This compatibility package re-exports the public API from `agora-agents` to support existing installs during the migration window. The primary import path remains `agora_agent`; you can also import from `agora_agent_server_sdk_compat`: - - ```python - from agora_agent import Agora, Area - from agora_agent_server_sdk_compat import Agora, Area - ``` - compat/agora-agent-server-sdk/src/agora_agent_server_sdk_compat/__init__.py: | - """Compatibility re-exports for the renamed agora-agents package.""" - - import agora_agent as _agora_agent - - __all__ = getattr(_agora_agent, "__all__", []) - - - def __getattr__(name: str): - return getattr(_agora_agent, name) - - - def __dir__(): - return dir(_agora_agent) - user_owned: true - - id: patch-299e4bd9 - content_hash: sha256:ee71350debd51653f1cb1472477a577436d74cbb847b3536a9cdbff0211abf2d - original_commit: 299e4bd9cb59bd6144084332a7c3fa7bf260769f - original_message: "fix(agentkit): resolve provider config type checks" - original_author: digitallysavvy - base_generation: 1d61baad436285e3b6a37555edb5ca67c158681c - files: - - src/agora_agent/agentkit/agent.py - - src/agora_agent/agentkit/vendors/llm.py - - src/agora_agent/agentkit/vendors/mllm.py - - src/agora_agent/agentkit/vendors/stt.py - patch_content: | - diff --git a/src/agora_agent/agentkit/agent.py b/src/agora_agent/agentkit/agent.py - index 1daba82..95cfe34 100644 - --- a/src/agora_agent/agentkit/agent.py - +++ b/src/agora_agent/agentkit/agent.py - @@ -57,6 +57,8 @@ from ..agents.types.start_agents_request_properties_filler_words_content import - from ..agents.types.start_agents_request_properties_filler_words_content_static_config import StartAgentsRequestPropertiesFillerWordsContentStaticConfig - from ..agents.types.start_agents_request_properties_filler_words_content_static_config_selection_rule import StartAgentsRequestPropertiesFillerWordsContentStaticConfigSelectionRule - from ..types.tts import Tts - +from ..agents.types.start_agents_request_properties_filler_words_content_static_config_selection_rule import StartAgentsRequestPropertiesFillerWordsContentStaticConfigSelectionRule - +from ..types.tts import Tts - from ..types.asr import Asr - from ..types.llm import Llm - from ..types.llm_style import LlmStyle as GeneratedLlmStyle - @@ -544,6 +546,23 @@ class Agent: - ) - return new_agent - - + def with_audio_scenario(self, audio_scenario: ParametersAudioScenario) -> "Agent": - + """Returns a new Agent with the specified RTC audio scenario.""" - + new_agent = self._clone() - + if new_agent._parameters is None: - + new_agent._parameters = StartAgentsRequestPropertiesParameters(audio_scenario=audio_scenario) - + elif isinstance(new_agent._parameters, dict): - + new_agent._parameters = typing.cast( - + SessionParamsInput, - + {**new_agent._parameters, "audio_scenario": audio_scenario}, - + ) - + else: - + new_agent._parameters = self._copy_model_update( - + new_agent._parameters, - + {"audio_scenario": audio_scenario}, - + ) - + return new_agent - + - def with_failure_message(self, message: str) -> "Agent": - """Deprecated. Configure the failure message on the LLM or MLLM vendor instead.""" - new_agent = self._clone() - diff --git a/src/agora_agent/agentkit/vendors/llm.py b/src/agora_agent/agentkit/vendors/llm.py - index 5a9f39e..1f1b354 100644 - --- a/src/agora_agent/agentkit/vendors/llm.py - +++ b/src/agora_agent/agentkit/vendors/llm.py - @@ -2,6 +2,9 @@ from typing import Any, Dict, List, Optional, Union - - from pydantic import BaseModel, ConfigDict, Field, model_validator - - +from ...agents.types.start_agents_request_properties_llm_greeting_configs import ( - + StartAgentsRequestPropertiesLlmGreetingConfigs, - +) - from .base import BaseLLM - - LlmGreetingConfigs = Dict[str, Any] - theirs_snapshot: - src/agora_agent/agentkit/agent.py: | - from __future__ import annotations - - import time - import typing - import typing_extensions - import warnings - - if typing.TYPE_CHECKING: - from .agent_session import AgentSession, AsyncAgentSession - - from ..agents.types.start_agents_request_properties import StartAgentsRequestProperties - from ..agents.types.start_agents_request_properties_avatar import StartAgentsRequestPropertiesAvatar - from ..agents.types.start_agents_request_properties_avatar_vendor import StartAgentsRequestPropertiesAvatarVendor - from ..agents.types.update_agents_request_properties import UpdateAgentsRequestProperties - from ..agents.types.get_agents_response import GetAgentsResponse - from ..agents.types.list_agents_response import ListAgentsResponse - from ..agents.types.list_agents_response_data_list_item import ListAgentsResponseDataListItem - from ..agents.types.list_agents_response_data_list_item_status import ListAgentsResponseDataListItemStatus - from ..agents.types.get_history_agents_response import GetHistoryAgentsResponse - from ..agents.types.get_history_agents_response_contents_item import GetHistoryAgentsResponseContentsItem - from ..agents.types.get_history_agents_response_contents_item_role import GetHistoryAgentsResponseContentsItemRole - from ..agents.types.get_turns_agents_response import GetTurnsAgentsResponse - from ..agents.types.get_turns_agents_response_turns_item import GetTurnsAgentsResponseTurnsItem - from ..agents.types.speak_agents_request_priority import SpeakAgentsRequestPriority - from ..agents.types.start_agents_request_properties_turn_detection import StartAgentsRequestPropertiesTurnDetection - from ..agents.types.start_agents_request_properties_turn_detection_config import StartAgentsRequestPropertiesTurnDetectionConfig - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeech - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech_mode import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechMode - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech_vad_config import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechVadConfig - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech_keywords_config import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechKeywordsConfig - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech_disabled_config import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechDisabledConfig - from ..agents.types.start_agents_request_properties_turn_detection_config_start_of_speech_disabled_config_strategy import StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechDisabledConfigStrategy - from ..agents.types.start_agents_request_properties_turn_detection_config_end_of_speech import StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeech - from ..agents.types.start_agents_request_properties_turn_detection_config_end_of_speech_mode import StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechMode - from ..agents.types.start_agents_request_properties_turn_detection_config_end_of_speech_vad_config import StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechVadConfig - from ..agents.types.start_agents_request_properties_turn_detection_config_end_of_speech_semantic_config import StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechSemanticConfig - from ..agents.types.start_agents_request_properties_turn_detection_type import StartAgentsRequestPropertiesTurnDetectionType - from ..agents.types.start_agents_request_properties_turn_detection_interrupt_mode import StartAgentsRequestPropertiesTurnDetectionInterruptMode - from ..agents.types.start_agents_request_properties_turn_detection_eagerness import StartAgentsRequestPropertiesTurnDetectionEagerness - from ..agents.types.start_agents_request_properties_sal import StartAgentsRequestPropertiesSal - from ..agents.types.start_agents_request_properties_sal_sal_mode import StartAgentsRequestPropertiesSalSalMode - from ..agents.types.start_agents_request_properties_parameters import StartAgentsRequestPropertiesParameters - from ..agents.types.start_agents_request_properties_parameters_silence_config import StartAgentsRequestPropertiesParametersSilenceConfig - from ..agents.types.start_agents_request_properties_parameters_silence_config_action import StartAgentsRequestPropertiesParametersSilenceConfigAction - from ..agents.types.start_agents_request_properties_parameters_farewell_config import StartAgentsRequestPropertiesParametersFarewellConfig - from ..agents.types.start_agents_request_properties_parameters_data_channel import StartAgentsRequestPropertiesParametersDataChannel - from ..agents.types.start_agents_request_properties_parameters_audio_scenario import StartAgentsRequestPropertiesParametersAudioScenario - from ..agents.types.start_agents_request_properties_interruption import StartAgentsRequestPropertiesInterruption - from ..agents.types.start_agents_request_properties_interruption_mode import StartAgentsRequestPropertiesInterruptionMode - from ..agents.types.start_agents_request_properties_geofence import StartAgentsRequestPropertiesGeofence - from ..agents.types.start_agents_request_properties_rtc import StartAgentsRequestPropertiesRtc - from ..agents.types.start_agents_request_properties_advanced_features import StartAgentsRequestPropertiesAdvancedFeatures - from ..agents.types.start_agents_request_properties_filler_words import StartAgentsRequestPropertiesFillerWords - from ..agents.types.start_agents_request_properties_filler_words_trigger import StartAgentsRequestPropertiesFillerWordsTrigger - from ..agents.types.start_agents_request_properties_filler_words_trigger_fixed_time_config import StartAgentsRequestPropertiesFillerWordsTriggerFixedTimeConfig - from ..agents.types.start_agents_request_properties_filler_words_content import StartAgentsRequestPropertiesFillerWordsContent - from ..agents.types.start_agents_request_properties_filler_words_content_static_config import StartAgentsRequestPropertiesFillerWordsContentStaticConfig - from ..agents.types.start_agents_request_properties_filler_words_content_static_config_selection_rule import StartAgentsRequestPropertiesFillerWordsContentStaticConfigSelectionRule - from ..types.tts import Tts - from ..agents.types.start_agents_request_properties_filler_words_content_static_config_selection_rule import StartAgentsRequestPropertiesFillerWordsContentStaticConfigSelectionRule - from ..types.tts import Tts - from ..types.asr import Asr - from ..types.llm import Llm - from ..types.llm_style import LlmStyle as GeneratedLlmStyle - from ..types.mllm import Mllm - from ..types.mllm_turn_detection import MllmTurnDetection - from ..types.mllm_turn_detection_mode import MllmTurnDetectionMode as GeneratedMllmTurnDetectionMode - from ..types.mllm_vendor import MllmVendor as GeneratedMllmVendor - from ..agent_management.types.agent_think_agent_management_request_on_listening_action import ( - AgentThinkAgentManagementRequestOnListeningAction, - ) - from ..agent_management.types.agent_think_agent_management_request_on_thinking_action import ( - AgentThinkAgentManagementRequestOnThinkingAction, - ) - from ..agent_management.types.agent_think_agent_management_request_on_speaking_action import ( - AgentThinkAgentManagementRequestOnSpeakingAction, - ) - from ..agent_management.types.agent_think_agent_management_response import ( - AgentThinkAgentManagementResponse, - ) - from ..core.pydantic_utilities import parse_obj_as - from .vendors.base import BaseAvatar, BaseLLM, BaseMLLM, BaseSTT, BaseTTS - - # Top-level aliases - LlmConfig = Llm - LlmStyle = GeneratedLlmStyle - SttConfig = Asr - AsrConfig = SttConfig - SttVendor = typing.Any - TtsConfig = Tts - MllmConfig = Mllm - MllmVendor = GeneratedMllmVendor - AvatarConfig = StartAgentsRequestPropertiesAvatar - AvatarVendor = StartAgentsRequestPropertiesAvatarVendor - TurnDetectionConfig = StartAgentsRequestPropertiesTurnDetection - SalConfig = StartAgentsRequestPropertiesSal - SalMode = StartAgentsRequestPropertiesSalSalMode - AdvancedFeatures = StartAgentsRequestPropertiesAdvancedFeatures - SessionParams = StartAgentsRequestPropertiesParameters - - # SOS/EOS turn detection aliases (preferred) - TurnDetectionNestedConfig = StartAgentsRequestPropertiesTurnDetectionConfig - StartOfSpeechConfig = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeech - StartOfSpeechMode = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechMode - StartOfSpeechVadConfig = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechVadConfig - StartOfSpeechKeywordsConfig = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechKeywordsConfig - StartOfSpeechDisabledConfig = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechDisabledConfig - StartOfSpeechDisabledConfigStrategy = StartAgentsRequestPropertiesTurnDetectionConfigStartOfSpeechDisabledConfigStrategy - EndOfSpeechConfig = StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeech - EndOfSpeechMode = StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechMode - EndOfSpeechVadConfig = StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechVadConfig - EndOfSpeechSemanticConfig = StartAgentsRequestPropertiesTurnDetectionConfigEndOfSpeechSemanticConfig - - # Deprecated turn detection aliases - # Deprecated: Use TurnDetectionConfig with TurnDetectionNestedConfig.start_of_speech - # and .end_of_speech instead. The `type` field and agora_vad/server_vad/semantic_vad - # values will be removed in a future release. - TurnDetectionType = StartAgentsRequestPropertiesTurnDetectionType - - # Deprecated: Use StartOfSpeechConfig with mode="vad"|"keywords"|"disabled" and the - # corresponding vad_config, keywords_config, or disabled_config instead. - InterruptMode = StartAgentsRequestPropertiesTurnDetectionInterruptMode - - # Deprecated: Only applies to server_vad/semantic_vad modes with OpenAI Realtime - # (MLLM). Has no equivalent in the ASR + LLM + TTS pipeline. - Eagerness = StartAgentsRequestPropertiesTurnDetectionEagerness - - # Parameters (SessionParams) sub-type aliases - SilenceConfig = StartAgentsRequestPropertiesParametersSilenceConfig - SilenceAction = StartAgentsRequestPropertiesParametersSilenceConfigAction - FarewellConfig = StartAgentsRequestPropertiesParametersFarewellConfig - ParametersDataChannel = StartAgentsRequestPropertiesParametersDataChannel - ParametersAudioScenario = StartAgentsRequestPropertiesParametersAudioScenario - InterruptionConfig = StartAgentsRequestPropertiesInterruption - InterruptionMode = StartAgentsRequestPropertiesInterruptionMode - MllmTurnDetectionConfig = MllmTurnDetection - MllmTurnDetectionMode = GeneratedMllmTurnDetectionMode - AgentConfig = StartAgentsRequestProperties - AgentConfigUpdate = UpdateAgentsRequestProperties - SessionInfo = GetAgentsResponse - SessionListResponse = ListAgentsResponse - SessionSummary = ListAgentsResponseDataListItem - SessionStatus = ListAgentsResponseDataListItemStatus - ConversationHistory = GetHistoryAgentsResponse - ConversationTurn = GetHistoryAgentsResponseContentsItem - ConversationRole = GetHistoryAgentsResponseContentsItemRole - ConversationTurns = GetTurnsAgentsResponse - ConversationSessionTurn = GetTurnsAgentsResponseTurnsItem - SpeakPriority = SpeakAgentsRequestPriority - Labels = typing.Dict[str, str] - - - class SessionParamsInput(typing_extensions.TypedDict, total=False): - silence_config: StartAgentsRequestPropertiesParametersSilenceConfig - farewell_config: StartAgentsRequestPropertiesParametersFarewellConfig - data_channel: StartAgentsRequestPropertiesParametersDataChannel - enable_metrics: bool - enable_error_message: bool - audio_scenario: ParametersAudioScenario - - - class ThinkOptions(typing_extensions.TypedDict, total=False): - on_listening_action: AgentThinkAgentManagementRequestOnListeningAction - on_thinking_action: AgentThinkAgentManagementRequestOnThinkingAction - on_speaking_action: AgentThinkAgentManagementRequestOnSpeakingAction - interruptable: bool - metadata: typing.Dict[str, str] - - - class GetTurnsOptions(typing_extensions.TypedDict, total=False): - page_index: int - page_size: int - - - class SayOptions(typing_extensions.TypedDict, total=False): - priority: SpeakAgentsRequestPriority - interruptable: bool - - - class SessionOptions(typing_extensions.TypedDict, total=False): - name: str - channel: str - token: str - agent_uid: str - remote_uids: typing.List[str] - idle_timeout: int - enable_string_uid: bool - preset: typing.Union[str, typing.Sequence[str]] - pipeline_id: str - expires_in: int - debug: bool - warn: typing.Callable[[str], None] - - - def _start_properties_from_mapping( - properties: typing.Mapping[str, typing.Any], - ) -> StartAgentsRequestProperties: - return parse_obj_as(StartAgentsRequestProperties, dict(properties)) - - - # LLM sub-type aliases - LlmGreetingConfigs = typing.Dict[str, typing.Any] - LlmGreetingConfigsMode = typing.Any - McpServersItem = typing.Dict[str, typing.Any] - - # Additional top-level config aliases - GeofenceConfig = StartAgentsRequestPropertiesGeofence - RtcConfig = StartAgentsRequestPropertiesRtc - FillerWordsConfig = StartAgentsRequestPropertiesFillerWords - FillerWordsTrigger = StartAgentsRequestPropertiesFillerWordsTrigger - FillerWordsTriggerFixedTimeConfig = StartAgentsRequestPropertiesFillerWordsTriggerFixedTimeConfig - FillerWordsContent = StartAgentsRequestPropertiesFillerWordsContent - FillerWordsContentStaticConfig = StartAgentsRequestPropertiesFillerWordsContentStaticConfig - FillerWordsContentSelectionRule = StartAgentsRequestPropertiesFillerWordsContentStaticConfigSelectionRule - - # Think type aliases and response - ThinkOnListeningAction = AgentThinkAgentManagementRequestOnListeningAction - ThinkOnThinkingAction = AgentThinkAgentManagementRequestOnThinkingAction - ThinkOnSpeakingAction = AgentThinkAgentManagementRequestOnSpeakingAction - ThinkResponse = AgentThinkAgentManagementResponse - - from .token import generate_convo_ai_token, _parse_numeric_uid, _validate_expires_in - - TurnDetectionLanguage = typing_extensions.Literal[ - "ar-EG", - "ar-JO", - "ar-SA", - "ar-AE", - "bn-IN", - "zh-CN", - "zh-HK", - "zh-TW", - "nl-NL", - "en-IN", - "en-US", - "fil-PH", - "fr-FR", - "de-DE", - "gu-IN", - "he-IL", - "hi-IN", - "id-ID", - "it-IT", - "ja-JP", - "kn-IN", - "ko-KR", - "ms-MY", - "fa-IR", - "pt-PT", - "ru-RU", - "es-ES", - "ta-IN", - "te-IN", - "th-TH", - "tr-TR", - "vi-VN", - ] - - DEFAULT_TURN_DETECTION_LANGUAGE: TurnDetectionLanguage = "en-US" - TURN_DETECTION_LANGUAGE_VALUES: typing.Tuple[TurnDetectionLanguage, ...] = ( - "ar-EG", - "ar-JO", - "ar-SA", - "ar-AE", - "bn-IN", - "zh-CN", - "zh-HK", - "zh-TW", - "nl-NL", - "en-IN", - "en-US", - "fil-PH", - "fr-FR", - "de-DE", - "gu-IN", - "he-IL", - "hi-IN", - "id-ID", - "it-IT", - "ja-JP", - "kn-IN", - "ko-KR", - "ms-MY", - "fa-IR", - "pt-PT", - "ru-RU", - "es-ES", - "ta-IN", - "te-IN", - "th-TH", - "tr-TR", - "vi-VN", - ) - _TURN_DETECTION_LANGUAGES = set(TURN_DETECTION_LANGUAGE_VALUES) - - - def _dump_optional_model(value: typing.Any) -> typing.Any: - if hasattr(value, "model_dump"): - return value.model_dump(exclude_none=True) - if hasattr(value, "dict"): - return value.dict(exclude_none=True) - return value - - - def _is_turn_detection_language(value: typing.Any) -> bool: - return isinstance(value, str) and value in _TURN_DETECTION_LANGUAGES - - - def _validate_turn_detection_language(value: typing.Any) -> TurnDetectionLanguage: - if not _is_turn_detection_language(value): - raise ValueError(f"Invalid turn_detection.language: {value}") - return value # type: ignore[return-value] - - - class Agent: - """A reusable agent definition. - - Use the fluent builder methods (.with_llm(), .with_tts(), .with_stt(), .with_mllm()) - to configure vendor settings after construction. - - Deprecated: - The Agent-level ``instructions``, ``greeting``, ``failure_message``, - ``max_history``, and ``greeting_configs`` convenience fields are kept - for compatibility. Configure those values on the LLM or MLLM vendor - instead. - - Examples - -------- - >>> from agora_agent import Agent, OpenAI, ElevenLabsTTS, DeepgramSTT - >>> - >>> agent = Agent(instructions="You are a helpful voice assistant.") - >>> agent = ( - ... agent - ... .with_llm(OpenAI(api_key="...", base_url="https://api.openai.com/v1/chat/completions", model="gpt-4")) - ... .with_tts(ElevenLabsTTS(key="...", model_id="...", voice_id="...", base_url="wss://api.elevenlabs.io/v1", sample_rate=24000)) - ... .with_stt(DeepgramSTT(api_key="...", model="nova-2")) - ... ) - """ - - def __init__( - self, - name: typing.Optional[str] = None, - instructions: typing.Optional[str] = None, - turn_detection: typing.Optional[TurnDetectionConfig] = None, - interruption: typing.Optional[InterruptionConfig] = None, - sal: typing.Optional[SalConfig] = None, - advanced_features: typing.Optional[AdvancedFeatures] = None, - parameters: typing.Optional[typing.Union[SessionParams, SessionParamsInput]] = None, - greeting: typing.Optional[str] = None, - failure_message: typing.Optional[str] = None, - max_history: typing.Optional[int] = None, - geofence: typing.Optional[GeofenceConfig] = None, - labels: typing.Optional[typing.Dict[str, str]] = None, - rtc: typing.Optional[RtcConfig] = None, - filler_words: typing.Optional[FillerWordsConfig] = None, - greeting_configs: typing.Optional[LlmGreetingConfigs] = None, - pipeline_id: typing.Optional[str] = None, - ): - self._name = name - self._pipeline_id = pipeline_id - self._instructions = instructions - self._greeting = greeting - self._failure_message = failure_message - self._max_history = max_history - self._llm: typing.Optional[typing.Dict[str, typing.Any]] = None - self._tts: typing.Optional[typing.Dict[str, typing.Any]] = None - self._stt: typing.Optional[typing.Dict[str, typing.Any]] = None - self._mllm: typing.Optional[typing.Dict[str, typing.Any]] = None - self._tts_sample_rate: typing.Optional[int] = None - self._avatar: typing.Optional[typing.Dict[str, typing.Any]] = None - self._avatar_required_sample_rate: typing.Optional[int] = None - self._turn_detection = turn_detection - self._interruption = interruption - self._sal = sal - self._advanced_features = advanced_features - self._parameters = parameters - self._geofence = geofence - self._labels = labels - self._rtc = rtc - self._filler_words = filler_words - self._greeting_configs = greeting_configs - - def with_llm(self, vendor: BaseLLM) -> "Agent": - new_agent = self._clone() - new_agent._llm = vendor.to_config() - return new_agent - - def with_tts(self, vendor: BaseTTS) -> "Agent": - sample_rate = vendor.sample_rate - if ( - self._avatar_required_sample_rate not in (None, 0) - and sample_rate is not None - and sample_rate != self._avatar_required_sample_rate - ): - raise ValueError( - f"Avatar requires TTS sample rate of {self._avatar_required_sample_rate} Hz, " - f"but TTS is configured with {sample_rate} Hz. " - f"Please update your TTS sample_rate to {self._avatar_required_sample_rate}." - ) - new_agent = self._clone() - new_agent._tts = vendor.to_config() - new_agent._tts_sample_rate = sample_rate - return new_agent - - def with_stt(self, vendor: BaseSTT) -> "Agent": - new_agent = self._clone() - new_agent._stt = vendor.to_config() - return new_agent - - def with_mllm(self, vendor: BaseMLLM) -> "Agent": - # Note: avatars are not supported with MLLM. The combination is rejected - # at ``to_properties`` / ``AgentSession.start`` so callers can still - # configure both for tests, debugging, or disabled-avatar use cases. - new_agent = self._clone() - new_agent._mllm = vendor.to_config() - if isinstance(new_agent._mllm, dict): - new_agent._mllm["enable"] = True - if isinstance(new_agent._advanced_features, dict): - advanced_features = {key: value for key, value in new_agent._advanced_features.items() if key != "enable_mllm"} - new_agent._advanced_features = typing.cast(AdvancedFeatures, advanced_features) if advanced_features else None - elif isinstance(new_agent._advanced_features, StartAgentsRequestPropertiesAdvancedFeatures): - advanced_features_model = self._copy_model_update( - new_agent._advanced_features, - {"enable_mllm": None}, - ) - if ( - advanced_features_model.enable_rtm is None - and advanced_features_model.enable_sal is None - and advanced_features_model.enable_tools is None - ): - new_agent._advanced_features = None - else: - new_agent._advanced_features = advanced_features_model - return new_agent - - def with_avatar(self, vendor: BaseAvatar) -> "Agent": - # Note: avatars are not supported with MLLM. The combination is rejected - # at ``to_properties`` / ``AgentSession.start`` (only when the avatar is - # enabled) so callers may still combine the two for testing or for the - # disabled-avatar pattern. - required_sample_rate = vendor.required_sample_rate - if ( - required_sample_rate not in (None, 0) - and self._tts_sample_rate is not None - and self._tts_sample_rate != required_sample_rate - ): - raise ValueError( - f"Avatar requires TTS sample rate of {required_sample_rate} Hz, " - f"but TTS is configured with {self._tts_sample_rate} Hz. " - f"Please update your TTS sample_rate to {required_sample_rate}." - ) - new_agent = self._clone() - new_agent._avatar = vendor.to_config() - new_agent._avatar_required_sample_rate = required_sample_rate - return new_agent - - def with_turn_detection(self, config: TurnDetectionConfig) -> "Agent": - new_agent = self._clone() - new_agent._turn_detection = config - return new_agent - - def with_interruption(self, config: InterruptionConfig) -> "Agent": - """Returns a new Agent with unified interruption control configured.""" - new_agent = self._clone() - new_agent._interruption = config - return new_agent - - def with_instructions(self, instructions: str) -> "Agent": - """Deprecated. Configure system messages on the LLM vendor instead.""" - new_agent = self._clone() - new_agent._instructions = instructions - return new_agent - - def with_greeting(self, greeting: str) -> "Agent": - """Deprecated. Configure the greeting on the LLM or MLLM vendor instead.""" - new_agent = self._clone() - new_agent._greeting = greeting - return new_agent - - def with_greeting_configs(self, configs: LlmGreetingConfigs) -> "Agent": - """Deprecated. Configure greeting playback on the LLM vendor instead.""" - new_agent = self._clone() - new_agent._greeting_configs = configs - return new_agent - - def with_name(self, name: str) -> "Agent": - new_agent = self._clone() - new_agent._name = name - return new_agent - - def with_sal(self, config: SalConfig) -> "Agent": - """Returns a new Agent with the specified SAL (Selective Attention Locking) configuration.""" - new_agent = self._clone() - new_agent._sal = config - return new_agent - - def with_advanced_features(self, features: AdvancedFeatures) -> "Agent": - """Returns a new Agent with the specified advanced features configuration. - - Use this to enable RTM and other advanced features. - """ - new_agent = self._clone() - new_agent._advanced_features = features - return new_agent - - def with_tools(self, enabled: bool = True) -> "Agent": - """Returns a new Agent with MCP tool invocation enabled or disabled.""" - new_agent = self._clone() - if new_agent._advanced_features is None: - new_agent._advanced_features = StartAgentsRequestPropertiesAdvancedFeatures(enable_tools=enabled) - elif isinstance(new_agent._advanced_features, dict): - new_agent._advanced_features = typing.cast( - AdvancedFeatures, - {**new_agent._advanced_features, "enable_tools": enabled}, - ) - else: - new_agent._advanced_features = self._copy_model_update( - new_agent._advanced_features, - {"enable_tools": enabled}, - ) - return new_agent - - def with_parameters(self, parameters: typing.Union[SessionParams, SessionParamsInput]) -> "Agent": - """Returns a new Agent with the specified session parameters. - - Use this to configure silence behaviour, graceful hang-up, data channel, and more. - """ - new_agent = self._clone() - new_agent._parameters = parameters - return new_agent - - def with_audio_scenario(self, audio_scenario: ParametersAudioScenario) -> "Agent": - """Returns a new Agent with the specified RTC audio scenario.""" - new_agent = self._clone() - if new_agent._parameters is None: - new_agent._parameters = StartAgentsRequestPropertiesParameters(audio_scenario=audio_scenario) - elif isinstance(new_agent._parameters, dict): - new_agent._parameters = typing.cast( - SessionParamsInput, - {**new_agent._parameters, "audio_scenario": audio_scenario}, - ) - else: - new_agent._parameters = self._copy_model_update( - new_agent._parameters, - {"audio_scenario": audio_scenario}, - ) - return new_agent - - def with_audio_scenario(self, audio_scenario: ParametersAudioScenario) -> "Agent": - """Returns a new Agent with the specified RTC audio scenario.""" - new_agent = self._clone() - if new_agent._parameters is None: - new_agent._parameters = StartAgentsRequestPropertiesParameters(audio_scenario=audio_scenario) - elif isinstance(new_agent._parameters, dict): - new_agent._parameters = typing.cast( - SessionParamsInput, - {**new_agent._parameters, "audio_scenario": audio_scenario}, - ) - else: - new_agent._parameters = self._copy_model_update( - new_agent._parameters, - {"audio_scenario": audio_scenario}, - ) - return new_agent - - def with_failure_message(self, message: str) -> "Agent": - """Deprecated. Configure the failure message on the LLM or MLLM vendor instead.""" - new_agent = self._clone() - new_agent._failure_message = message - return new_agent - - def with_max_history(self, max_history: int) -> "Agent": - """Deprecated. Configure max history on the LLM vendor instead.""" - new_agent = self._clone() - new_agent._max_history = max_history - return new_agent - - def with_geofence(self, geofence: GeofenceConfig) -> "Agent": - """Returns a new Agent with the specified geofence configuration. - - Restricts which geographic regions the agent's backend servers may run in. - """ - new_agent = self._clone() - new_agent._geofence = geofence - return new_agent - - def with_labels(self, labels: typing.Dict[str, str]) -> "Agent": - """Returns a new Agent with the specified custom labels. - - Labels are key-value pairs attached to the agent and returned in notification callbacks. - """ - new_agent = self._clone() - new_agent._labels = dict(labels) - return new_agent - - def with_rtc(self, rtc: RtcConfig) -> "Agent": - """Returns a new Agent with the specified RTC configuration.""" - new_agent = self._clone() - new_agent._rtc = rtc - return new_agent - - def with_filler_words(self, filler_words: FillerWordsConfig) -> "Agent": - """Returns a new Agent with the specified filler words configuration. - - Filler words are played while the agent waits for the LLM to respond. - """ - new_agent = self._clone() - new_agent._filler_words = filler_words - return new_agent - - @staticmethod - def _field_value(value: typing.Any, field: str) -> typing.Any: - if value is None: - return None - if isinstance(value, dict): - return value.get(field) - return getattr(value, field, None) - - @staticmethod - def _copy_model_update(value: typing.Any, update: typing.Dict[str, typing.Any]) -> typing.Any: - if hasattr(value, "model_copy"): - return value.model_copy(update=update) - if hasattr(value, "copy"): - return value.copy(update=update) - raise TypeError(f"Object of type {type(value).__name__} does not support model copying") - - def _resolved_parameters(self) -> typing.Optional[typing.Union[SessionParams, SessionParamsInput]]: - enable_rtm = self._field_value(self._advanced_features, "enable_rtm") is True - data_channel = self._field_value(self._parameters, "data_channel") - if not enable_rtm or data_channel is not None: - return self._parameters - if self._parameters is None: - return StartAgentsRequestPropertiesParameters(data_channel="rtm") - if isinstance(self._parameters, dict): - return typing.cast(SessionParamsInput, {**self._parameters, "data_channel": "rtm"}) - return self._copy_model_update(self._parameters, {"data_channel": "rtm"}) - - @property - def name(self) -> typing.Optional[str]: - return self._name - - @property - def pipeline_id(self) -> typing.Optional[str]: - """Published AI Studio pipeline ID used as this agent's base configuration.""" - return self._pipeline_id - - @property - def llm(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - return self._llm - - @property - def tts(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - return self._tts - - @property - def tts_sample_rate(self) -> typing.Optional[int]: - return self._tts_sample_rate - - @property - def stt(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - return self._stt - - @property - def mllm(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - return self._mllm - - @property - def turn_detection(self) -> typing.Optional[TurnDetectionConfig]: - return self._turn_detection - - @property - def interruption(self) -> typing.Optional[InterruptionConfig]: - return self._interruption - - @property - def instructions(self) -> typing.Optional[str]: - return self._instructions - - @property - def greeting(self) -> typing.Optional[str]: - return self._greeting - - @property - def greeting_configs(self) -> typing.Optional[LlmGreetingConfigs]: - return self._greeting_configs - - @property - def failure_message(self) -> typing.Optional[str]: - return self._failure_message - - @property - def max_history(self) -> typing.Optional[int]: - return self._max_history - - @property - def avatar(self) -> typing.Optional[typing.Dict[str, typing.Any]]: - return self._avatar - - @property - def sal(self) -> typing.Optional[SalConfig]: - return self._sal - - @property - def advanced_features(self) -> typing.Optional[AdvancedFeatures]: - return self._advanced_features - - @property - def parameters(self) -> typing.Optional[typing.Union[SessionParams, SessionParamsInput]]: - return self._parameters - - @property - def geofence(self) -> typing.Optional[GeofenceConfig]: - return self._geofence - - @property - def labels(self) -> typing.Optional[typing.Dict[str, str]]: - return self._labels - - @property - def rtc(self) -> typing.Optional[RtcConfig]: - return self._rtc - - @property - def filler_words(self) -> typing.Optional[FillerWordsConfig]: - return self._filler_words - - @property - def config(self) -> typing.Dict[str, typing.Any]: - return { - "name": self._name, - "pipeline_id": self._pipeline_id, - "instructions": self._instructions, - "greeting": self._greeting, - "failure_message": self._failure_message, - "max_history": self._max_history, - "llm": self._llm, - "tts": self._tts, - "stt": self._stt, - "mllm": self._mllm, - "turn_detection": self._turn_detection, - "interruption": self._interruption, - "sal": self._sal, - "avatar": self._avatar, - "advanced_features": self._advanced_features, - "parameters": self._parameters, - "geofence": self._geofence, - "labels": self._labels, - "rtc": self._rtc, - "filler_words": self._filler_words, - "greeting_configs": self._greeting_configs, - } - - def create_session( - self, - client: typing.Any, - channel: str, - agent_uid: str, - remote_uids: typing.List[str], - name: typing.Optional[str] = None, - token: typing.Optional[str] = None, - idle_timeout: typing.Optional[int] = None, - enable_string_uid: typing.Optional[bool] = None, - preset: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, - pipeline_id: typing.Optional[str] = None, - expires_in: typing.Optional[int] = None, - debug: typing.Optional[bool] = None, - warn: typing.Optional[typing.Callable[[str], None]] = None, - ) -> "AgentSession": - from .agent_session import AgentSession - - session_name = name or self._name or f"agent-{int(time.time())}" - return AgentSession( - client=client, - agent=self, - app_id=client.app_id if hasattr(client, "app_id") else "", - app_certificate=client.app_certificate if hasattr(client, "app_certificate") else None, - name=session_name, - channel=channel, - token=token, - agent_uid=agent_uid, - remote_uids=remote_uids, - idle_timeout=idle_timeout, - enable_string_uid=enable_string_uid, - preset=preset, - pipeline_id=pipeline_id, - expires_in=expires_in, - debug=debug, - warn=warn, - ) - - def create_async_session( - self, - client: typing.Any, - channel: str, - agent_uid: str, - remote_uids: typing.List[str], - name: typing.Optional[str] = None, - token: typing.Optional[str] = None, - idle_timeout: typing.Optional[int] = None, - enable_string_uid: typing.Optional[bool] = None, - preset: typing.Optional[typing.Union[str, typing.Sequence[str]]] = None, - pipeline_id: typing.Optional[str] = None, - expires_in: typing.Optional[int] = None, - debug: typing.Optional[bool] = None, - warn: typing.Optional[typing.Callable[[str], None]] = None, - ) -> "AsyncAgentSession": - """Create an async session for use with :class:`~agora_agent.AsyncAgora`. - - Equivalent to :meth:`create_session` but returns an - :class:`~agora_agent.agentkit.AsyncAgentSession`. - """ - from .agent_session import AsyncAgentSession - - session_name = name or self._name or f"agent-{int(time.time())}" - return AsyncAgentSession( - client=client, - agent=self, - app_id=client.app_id if hasattr(client, "app_id") else "", - app_certificate=client.app_certificate if hasattr(client, "app_certificate") else None, - name=session_name, - channel=channel, - token=token, - agent_uid=agent_uid, - remote_uids=remote_uids, - idle_timeout=idle_timeout, - enable_string_uid=enable_string_uid, - preset=preset, - pipeline_id=pipeline_id, - expires_in=expires_in, - debug=debug, - warn=warn, - ) - - def to_properties( - self, - channel: str, - agent_uid: str, - remote_uids: typing.List[str], - idle_timeout: typing.Optional[int] = None, - enable_string_uid: typing.Optional[bool] = None, - token: typing.Optional[str] = None, - app_id: typing.Optional[str] = None, - app_certificate: typing.Optional[str] = None, - expires_in: typing.Optional[int] = None, - skip_vendor_validation: bool = False, - skip_vendor_validation_categories: typing.Optional[typing.AbstractSet[str]] = None, - allow_missing_vendor_categories: typing.Optional[typing.AbstractSet[str]] = None, - ) -> StartAgentsRequestProperties: - # Validate the MLLM + enabled-avatar combination BEFORE generating the - # RTC token so callers get a clear, actionable error first (matches the - # TypeScript and Go SDKs' fail-fast contract). - mllm_flag = isinstance(self._mllm, dict) and self._mllm.get("enable") is True - is_mllm_mode = bool(mllm_flag or self._mllm is not None) - avatar_enabled = ( - isinstance(self._avatar, dict) and self._avatar.get("enable") is not False - ) - if is_mllm_mode and avatar_enabled: - raise ValueError( - "Avatars are only supported with the cascading ASR + LLM + TTS pipeline. " - "Remove the avatar configuration when using MLLM, or switch to a cascading session." - ) - - if token is None: - if app_id is None or app_certificate is None: - raise ValueError("Either token or app_id+app_certificate must be provided") - validated_expires_in = _validate_expires_in(expires_in) if expires_in is not None else None - # Use generate_convo_ai_token (RTC + RTM) so the token works whether or - # not the caller enables advanced_features.enable_rtm. - token_kwargs: typing.Dict[str, typing.Any] = {} - if validated_expires_in is not None: - token_kwargs["token_expire"] = validated_expires_in - token = generate_convo_ai_token( - app_id=app_id, - app_certificate=app_certificate, - channel_name=channel, - uid=_parse_numeric_uid(agent_uid, "agent_uid"), - **token_kwargs, - ) - - base_kwargs: typing.Dict[str, typing.Any] = { - "channel": channel, - "token": token, - "agent_rtc_uid": agent_uid, - "remote_rtc_uids": remote_uids, - } - - if idle_timeout is not None: - base_kwargs["idle_timeout"] = idle_timeout - if enable_string_uid is not None: - base_kwargs["enable_string_uid"] = enable_string_uid - if self._mllm is not None: - base_kwargs["mllm"] = self._mllm - if self._turn_detection is not None: - base_kwargs["turn_detection"] = self._turn_detection - if self._interruption is not None: - base_kwargs["interruption"] = self._interruption - if self._sal is not None: - base_kwargs["sal"] = self._sal - if self._avatar is not None: - base_kwargs["avatar"] = self._avatar - if self._advanced_features is not None: - base_kwargs["advanced_features"] = self._advanced_features - parameters = self._resolved_parameters() - if parameters is not None: - if isinstance(parameters, dict): - base_kwargs["parameters"] = StartAgentsRequestPropertiesParameters(**parameters) - else: - base_kwargs["parameters"] = parameters - if self._geofence is not None: - base_kwargs["geofence"] = self._geofence - if self._labels is not None: - base_kwargs["labels"] = self._labels - if self._rtc is not None: - base_kwargs["rtc"] = self._rtc - if self._filler_words is not None: - base_kwargs["filler_words"] = self._filler_words - - if is_mllm_mode: - if self._mllm is not None: - mllm_config = dict(self._mllm) - if self._greeting is not None: - mllm_config.setdefault("greeting_message", self._greeting) - if self._failure_message is not None: - mllm_config.setdefault("failure_message", self._failure_message) - base_kwargs["mllm"] = mllm_config - return _start_properties_from_mapping(base_kwargs) - - if skip_vendor_validation: - warnings.warn( - "skip_vendor_validation is deprecated and will be removed in a future release. " - "Use skip_vendor_validation_categories and allow_missing_vendor_categories instead.", - DeprecationWarning, - stacklevel=2, - ) - - skip_categories = set(skip_vendor_validation_categories or ()) - allow_missing_categories = set(allow_missing_vendor_categories or ()) - if skip_vendor_validation: - skip_categories.update({"asr", "llm", "tts"}) - allow_missing_categories.update({"asr", "llm", "tts"}) - - skip_asr_validation = skip_vendor_validation or "asr" in skip_categories - skip_llm_validation = skip_vendor_validation or "llm" in skip_categories - skip_tts_validation = skip_vendor_validation or "tts" in skip_categories - allow_missing_asr = "asr" in allow_missing_categories - allow_missing_llm = "llm" in allow_missing_categories - allow_missing_tts = "tts" in allow_missing_categories - - turn_detection_config = self._resolve_turn_detection_config() - if not skip_asr_validation and (self._stt is not None or not allow_missing_asr): - base_kwargs["asr"] = self._resolve_asr_config(turn_detection_config) - base_kwargs["turn_detection"] = turn_detection_config - - if skip_vendor_validation: - return _start_properties_from_mapping(base_kwargs) - - if self._tts is None and not (skip_tts_validation or allow_missing_tts): - raise ValueError("TTS configuration is required. Use with_tts() to set it.") - - if self._llm is None and not (skip_llm_validation or allow_missing_llm): - raise ValueError("LLM configuration is required. Use with_llm() to set it.") - - if self._llm is not None and not skip_llm_validation: - base_kwargs["llm"] = self._resolve_llm_config() - if self._tts is not None and not skip_tts_validation: - base_kwargs["tts"] = self._tts - - return _start_properties_from_mapping(base_kwargs) - - def _resolve_llm_config(self) -> typing.Dict[str, typing.Any]: - llm_config = dict(self._llm or {}) - if self._instructions is not None and "system_messages" not in llm_config: - llm_config["system_messages"] = [{"role": "system", "content": self._instructions}] - if self._greeting is not None and "greeting_message" not in llm_config: - llm_config["greeting_message"] = self._greeting - if self._greeting_configs is not None and "greeting_configs" not in llm_config: - llm_config["greeting_configs"] = _dump_optional_model(self._greeting_configs) - if self._failure_message is not None and "failure_message" not in llm_config: - llm_config["failure_message"] = self._failure_message - if self._max_history is not None and "max_history" not in llm_config: - llm_config["max_history"] = self._max_history - return llm_config - - def _resolve_asr_config(self, turn_detection_config: TurnDetectionConfig) -> typing.Dict[str, typing.Any]: - asr_config = dict(self._stt or {}) - if not asr_config: - asr_config["vendor"] = "ares" - asr_config["language"] = self._field_value(turn_detection_config, "language") - return asr_config - - def _resolve_turn_detection_config(self) -> TurnDetectionConfig: - existing_turn_detection_language = self._field_value(self._turn_detection, "language") - language = ( - existing_turn_detection_language - if existing_turn_detection_language is not None - else DEFAULT_TURN_DETECTION_LANGUAGE - ) - language = _validate_turn_detection_language(language) - if self._turn_detection is None: - return StartAgentsRequestPropertiesTurnDetection(language=language) - if isinstance(self._turn_detection, dict): - return typing.cast(TurnDetectionConfig, {**self._turn_detection, "language": language}) - return self._copy_model_update(self._turn_detection, {"language": language}) - - def _clone(self) -> "Agent": - new_agent = Agent.__new__(Agent) - new_agent._name = self._name - new_agent._pipeline_id = self._pipeline_id - new_agent._llm = self._llm - new_agent._tts = self._tts - new_agent._stt = self._stt - new_agent._mllm = self._mllm - new_agent._tts_sample_rate = self._tts_sample_rate - new_agent._avatar = self._avatar - new_agent._avatar_required_sample_rate = self._avatar_required_sample_rate - new_agent._turn_detection = self._turn_detection - new_agent._interruption = self._interruption - new_agent._sal = self._sal - new_agent._advanced_features = self._advanced_features - new_agent._parameters = self._parameters - new_agent._instructions = self._instructions - new_agent._greeting = self._greeting - new_agent._failure_message = self._failure_message - new_agent._max_history = self._max_history - new_agent._geofence = self._geofence - new_agent._labels = self._labels - new_agent._rtc = self._rtc - new_agent._filler_words = self._filler_words - new_agent._greeting_configs = self._greeting_configs - return new_agent - src/agora_agent/agentkit/vendors/llm.py: | - from typing import Any, Dict, List, Optional, Union - - from pydantic import BaseModel, ConfigDict, Field, model_validator - - from ...agents.types.start_agents_request_properties_llm_greeting_configs import ( - StartAgentsRequestPropertiesLlmGreetingConfigs, - ) - from .base import BaseLLM - - LlmGreetingConfigs = Dict[str, Any] - _OPENAI_MANAGED_MODELS = {"gpt-4o-mini", "gpt-4.1-mini", "gpt-5-nano", "gpt-5-mini"} - - - def _ensure_mcp_transport(servers: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Ensure each MCP server has transport set (API requires it). Default to streamable_http.""" - result = [] - for s in servers: - item = dict(s) - if item.get("transport") is None: - item["transport"] = "streamable_http" - result.append(item) - return result - - - def _dump_optional_model(value: Any) -> Any: - if hasattr(value, "model_dump"): - return value.model_dump(exclude_none=True) - if hasattr(value, "dict"): - return value.dict(exclude_none=True) - return value - - class OpenAIOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: Optional[str] = Field(default=None, description="OpenAI API key") - model: str = Field(..., description="Model name") - base_url: Optional[str] = Field(default=None, description="Custom base URL") - temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - max_tokens: Optional[int] = Field(default=None, gt=0) - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Optional[Dict[str, str]] = Field(default=None) - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0, description="Maximum number of conversation history messages to cache") - - @model_validator(mode="after") - def _validate_byok_params(self) -> "OpenAIOptions": - if not self.model: - raise ValueError("OpenAI requires model") - if self.api_key is not None and self.base_url is None: - raise ValueError("OpenAI requires base_url when api_key is set") - if self.api_key is None and self.base_url is not None: - raise ValueError("OpenAI base_url is only valid when api_key is set") - if self.api_key is None and self.model.strip().lower() not in _OPENAI_MANAGED_MODELS: - raise ValueError("OpenAI requires api_key unless using a supported Agora-managed model") - if self.api_key is None and self.vendor is not None: - raise ValueError("OpenAI Agora-managed mode does not allow vendor") - return self - - class OpenAI(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = OpenAIOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - # model is the default; explicit params entries extend/override it. - # This matches the TS SDK behaviour: { model, ...params }. - params: Dict[str, Any] = {"model": self.options.model, **(self.options.params or {})} - - # Named fields take precedence over anything in the generic params dict. - if self.options.max_tokens is not None: - params["max_tokens"] = self.options.max_tokens - if self.options.temperature is not None: - params["temperature"] = self.options.temperature - if self.options.top_p is not None: - params["top_p"] = self.options.top_p - - config: Dict[str, Any] = { - "url": self.options.base_url or "https://api.openai.com/v1/chat/completions", - "params": params, - "style": "openai", - "input_modalities": self.options.input_modalities or ["text"], - } - if self.options.api_key is not None: - config["api_key"] = self.options.api_key - if self.options.headers is not None: - config["headers"] = self.options.headers - - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.vendor is not None: - config["vendor"] = self.options.vendor - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - - return config - - - class AzureOpenAIOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Azure OpenAI API key") - model: str = Field(..., description="Azure deployment model name") - endpoint: str = Field(..., description="Azure endpoint URL") - deployment_name: str = Field(..., description="Azure deployment name") - api_version: str = Field(default="2024-08-01-preview", description="Azure API version") - temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - max_tokens: Optional[int] = Field(default=None, gt=0) - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Optional[Dict[str, str]] = Field(default=None) - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0, description="Maximum number of conversation history messages to cache") - - class AzureOpenAI(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = AzureOpenAIOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - url = ( - f"{self.options.endpoint}/openai/deployments/" - f"{self.options.deployment_name}/chat/completions" - f"?api-version={self.options.api_version}" - ) - config: Dict[str, Any] = { - "url": url, - "api_key": self.options.api_key, - "vendor": self.options.vendor or "azure", - "style": "openai", - "input_modalities": self.options.input_modalities or ["text"], - } - - # Named fields take precedence over anything in the generic params dict. - params: Dict[str, Any] = {"model": self.options.model, **(self.options.params or {})} - if self.options.temperature is not None: - params["temperature"] = self.options.temperature - if self.options.top_p is not None: - params["top_p"] = self.options.top_p - if self.options.max_tokens is not None: - params["max_tokens"] = self.options.max_tokens - if params: - config["params"] = params - if self.options.headers is not None: - config["headers"] = self.options.headers - - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - - return config - - - class AnthropicOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Anthropic API key") - model: str = Field(..., description="Model name") - url: str = Field(..., description="Anthropic messages endpoint URL") - max_tokens: int = Field(..., gt=0) - temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Dict[str, str] = Field(..., description="Anthropic request headers") - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0, description="Maximum number of conversation history messages to cache") - - class Anthropic(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = AnthropicOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - # Named fields take precedence over anything in the generic params dict. - params: Dict[str, Any] = {"model": self.options.model, **(self.options.params or {})} - if self.options.max_tokens is not None: - params["max_tokens"] = self.options.max_tokens - if self.options.temperature is not None: - params["temperature"] = self.options.temperature - if self.options.top_p is not None: - params["top_p"] = self.options.top_p - - config: Dict[str, Any] = { - "url": self.options.url, - "api_key": self.options.api_key, - "params": params, - "headers": self.options.headers, - "style": "anthropic", - "input_modalities": self.options.input_modalities or ["text"], - } - - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.vendor is not None: - config["vendor"] = self.options.vendor - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - - return config - - - class GeminiOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Google AI API key") - model: str = Field(..., description="Model name") - url: Optional[str] = Field(default=None, description="Custom API endpoint URL") - temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - top_k: Optional[int] = Field(default=None, gt=0) - max_output_tokens: Optional[int] = Field(default=None, gt=0) - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Optional[Dict[str, str]] = Field(default=None) - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0, description="Maximum number of conversation history messages to cache") - - class Gemini(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = GeminiOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - # Named fields take precedence over anything in the generic params dict. - params: Dict[str, Any] = {"model": self.options.model, **(self.options.params or {})} - if self.options.temperature is not None: - params["temperature"] = self.options.temperature - if self.options.top_p is not None: - params["top_p"] = self.options.top_p - if self.options.top_k is not None: - params["top_k"] = self.options.top_k - if self.options.max_output_tokens is not None: - params["max_output_tokens"] = self.options.max_output_tokens - - config: Dict[str, Any] = { - "url": self.options.url or "https://generativelanguage.googleapis.com/v1beta/models", - "api_key": self.options.api_key, - "params": params, - "style": "gemini", - "input_modalities": self.options.input_modalities or ["text"], - } - - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.headers is not None: - config["headers"] = self.options.headers - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.vendor is not None: - config["vendor"] = self.options.vendor - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - - return config - - - class GroqOptions(OpenAIOptions): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Groq API key") - model: str = Field(..., description="Model name") - base_url: str = Field(..., description="Groq-compatible endpoint") - - - class Groq(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = GroqOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - config = OpenAI(**_dump_optional_model(self.options)).to_config() - config["url"] = self.options.base_url - return config - - - class CustomLLMOptions(OpenAIOptions): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Custom LLM API key") - base_url: str = Field(..., description="OpenAI-compatible chat completions endpoint") - - - class CustomLLM(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = CustomLLMOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - config = OpenAI(**_dump_optional_model(self.options)).to_config() - config["vendor"] = self.options.vendor or "custom" - return config - - - class VertexAILLMOptions(GeminiOptions): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Vertex AI access token or API key") - project_id: str = Field(..., description="Google Cloud project ID") - location: str = Field(..., description="Google Cloud location") - - - class VertexAILLM(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = VertexAILLMOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - options = _dump_optional_model(self.options) - options.pop("project_id", None) - options.pop("location", None) - if not options.get("url"): - options["url"] = ( - f"https://{self.options.location}-aiplatform.googleapis.com/v1/projects/" - f"{self.options.project_id}/locations/{self.options.location}/" - f"publishers/google/models/{self.options.model}:streamGenerateContent?alt=sse" - ) - return Gemini(**options).to_config() - - - class AmazonBedrockOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - access_key: str = Field(..., description="AWS access key ID") - secret_key: str = Field(..., description="AWS secret access key") - region: str = Field(..., description="AWS region") - model: str = Field(..., description="Amazon Bedrock model identifier") - max_tokens: Optional[int] = Field(default=None, gt=0) - url: Optional[str] = Field(default=None, description="Amazon Bedrock converse stream endpoint URL") - temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Optional[Dict[str, str]] = Field(default=None) - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0, description="Maximum number of conversation history messages to cache") - - - class AmazonBedrock(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = AmazonBedrockOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.params or {}) - if self.options.max_tokens is not None: - params["max_tokens"] = self.options.max_tokens - if self.options.temperature is not None: - params["temperature"] = self.options.temperature - if self.options.top_p is not None: - params["top_p"] = self.options.top_p - - config: Dict[str, Any] = { - "url": self.options.url or f"https://bedrock-runtime.{self.options.region}.amazonaws.com/model/{self.options.model}/converse-stream", - "access_key": self.options.access_key, - "secret_key": self.options.secret_key, - "region": self.options.region, - "model": self.options.model, - "params": params, - "style": "bedrock", - "input_modalities": self.options.input_modalities or ["text"], - } - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.headers is not None: - config["headers"] = self.options.headers - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.vendor is not None: - config["vendor"] = self.options.vendor - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - return config - - - class DifyOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Dify API key") - url: str = Field(..., description="Dify workflow or chat endpoint") - model: str = Field(..., description="Dify model identifier") - user: Optional[str] = Field(default=None, description="Dify user identifier") - conversation_id: Optional[str] = Field(default=None, description="Dify conversation ID") - system_messages: Optional[List[Dict[str, Any]]] = Field(default=None) - greeting_message: Optional[str] = Field(default=None) - failure_message: Optional[str] = Field(default=None) - input_modalities: Optional[List[str]] = Field(default=None) - params: Optional[Dict[str, Any]] = Field(default=None) - headers: Optional[Dict[str, str]] = Field(default=None) - output_modalities: Optional[List[str]] = Field(default=None) - greeting_configs: Optional[LlmGreetingConfigs] = Field(default=None) - template_variables: Optional[Dict[str, str]] = Field(default=None) - vendor: Optional[str] = Field(default=None) - mcp_servers: Optional[List[Dict[str, Any]]] = Field(default=None) - max_history: Optional[int] = Field(default=None, gt=0) - - - class Dify(BaseLLM): - def __init__(self, **kwargs: Any): - self.options = DifyOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = {"model": self.options.model, **(self.options.params or {})} - if self.options.user is not None: - params["user"] = self.options.user - if self.options.conversation_id is not None: - params["conversation_id"] = self.options.conversation_id - - config: Dict[str, Any] = { - "url": self.options.url, - "api_key": self.options.api_key, - "params": params, - "style": "dify", - "input_modalities": self.options.input_modalities or ["text"], - } - if self.options.headers is not None: - config["headers"] = self.options.headers - if self.options.system_messages is not None: - config["system_messages"] = self.options.system_messages - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.greeting_configs is not None: - config["greeting_configs"] = _dump_optional_model(self.options.greeting_configs) - if self.options.template_variables is not None: - config["template_variables"] = self.options.template_variables - if self.options.vendor is not None: - config["vendor"] = self.options.vendor - if self.options.mcp_servers is not None: - config["mcp_servers"] = _ensure_mcp_transport(self.options.mcp_servers) - if self.options.max_history is not None: - config["max_history"] = self.options.max_history - return config - src/agora_agent/agentkit/vendors/mllm.py: | - import warnings - from typing import Any, Dict, List, Optional - - from pydantic import BaseModel, ConfigDict, Field - - from ...types.mllm_turn_detection import MllmTurnDetection - from .base import BaseMLLM - - MllmTurnDetectionConfig = MllmTurnDetection - - - class OpenAIRealtimeOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="OpenAI API key") - model: Optional[str] = Field(default=None, description="Model name (e.g., gpt-4o-realtime-preview)") - voice: Optional[str] = Field(default=None, description="Voice identifier") - instructions: Optional[str] = Field(default=None, description="System instructions") - input_audio_transcription: Optional[Dict[str, Any]] = Field(default=None, description="Audio transcription settings") - url: Optional[str] = Field(default=None, description="WebSocket URL") - greeting_message: Optional[str] = Field(default=None, description="Agent greeting message") - input_modalities: Optional[List[str]] = Field(default=None, description="Input modalities") - output_modalities: Optional[List[str]] = Field(default=None, description="Output modalities") - messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="Conversation messages") - params: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters") - turn_detection: Optional[MllmTurnDetectionConfig] = Field(default=None, description="MLLM turn detection configuration") - failure_message: Optional[str] = Field(default=None, description="Message played on failure") - - class OpenAIRealtime(BaseMLLM): - def __init__(self, **kwargs: Any): - self.options = OpenAIRealtimeOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - config: Dict[str, Any] = { - "vendor": "openai", - "api_key": self.options.api_key, - } - - if self.options.url is not None: - config["url"] = self.options.url - if ( - self.options.model is not None - or self.options.params is not None - or self.options.voice is not None - or self.options.instructions is not None - or self.options.input_audio_transcription is not None - ): - params: Dict[str, Any] = {} - if self.options.model is not None: - params["model"] = self.options.model - if self.options.params is not None: - params.update(self.options.params) - if self.options.voice is not None: - params["voice"] = self.options.voice - if self.options.instructions is not None: - params["instructions"] = self.options.instructions - if self.options.input_audio_transcription is not None: - params["input_audio_transcription"] = self.options.input_audio_transcription - config["params"] = params - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.input_modalities is not None: - config["input_modalities"] = self.options.input_modalities - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.messages is not None: - config["messages"] = self.options.messages - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.turn_detection is not None: - config["turn_detection"] = self.options.turn_detection - - return config - - - # xAI MLLM: use XaiGrok (product name, mllm.vendor "xai"). Do not use XaiRealtime—that name - # is deprecated and reserved naming for future XaiSTT / XaiTTS cascading vendors. - - - class XaiGrokOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="xAI API key") - url: str = Field(default="wss://api.x.ai/v1/realtime", description="xAI Realtime WebSocket URL") - voice: Optional[str] = Field(default=None, description="Voice identifier (e.g., eve or rex)") - language: Optional[str] = Field(default=None, description="Language code (e.g., en)") - sample_rate: Optional[int] = Field(default=None, description="Audio sample rate in Hz") - greeting_message: Optional[str] = Field(default=None, description="Agent greeting message") - input_modalities: Optional[List[str]] = Field(default=None, description="Input modalities") - output_modalities: Optional[List[str]] = Field(default=None, description="Output modalities") - messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="Conversation messages") - params: Optional[Dict[str, Any]] = Field(default=None, description="Additional xAI parameters") - turn_detection: Optional[MllmTurnDetectionConfig] = Field(default=None, description="MLLM turn detection configuration") - failure_message: Optional[str] = Field(default=None, description="Message played on failure") - - - class XaiGrok(BaseMLLM): - """xAI Grok MLLM vendor (`mllm.vendor`: ``xai``).""" - - def __init__(self, **kwargs: Any): - self.options = XaiGrokOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.params or {}) - if self.options.voice is not None: - params["voice"] = self.options.voice - if self.options.language is not None: - params["language"] = self.options.language - if self.options.sample_rate is not None: - params["sample_rate"] = self.options.sample_rate - - config: Dict[str, Any] = { - "vendor": "xai", - "api_key": self.options.api_key, - "url": self.options.url, - "params": params, - } - - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.input_modalities is not None: - config["input_modalities"] = self.options.input_modalities - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.messages is not None: - config["messages"] = self.options.messages - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.turn_detection is not None: - config["turn_detection"] = self.options.turn_detection - - return config - - - class VertexAIOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - model: str = Field(..., description="Model name") - url: Optional[str] = Field(default=None, description="WebSocket URL") - project_id: str = Field(..., description="Google Cloud project ID") - location: str = Field(..., description="Google Cloud location/region") - adc_credentials_string: str = Field(..., description="Application Default Credentials JSON string") - instructions: Optional[str] = Field(default=None, description="System instructions") - voice: Optional[str] = Field(default=None, description="Voice name (e.g., Aoede, Charon)") - affective_dialog: Optional[bool] = Field(default=None, description="Enable affective dialog") - proactive_audio: Optional[bool] = Field(default=None, description="Enable proactive audio") - transcribe_agent: Optional[bool] = Field(default=None, description="Transcribe agent speech") - transcribe_user: Optional[bool] = Field(default=None, description="Transcribe user speech") - http_options: Optional[Dict[str, Any]] = Field(default=None, description="HTTP options") - greeting_message: Optional[str] = Field(default=None, description="Agent greeting message") - input_modalities: Optional[List[str]] = Field(default=None, description="Input modalities") - output_modalities: Optional[List[str]] = Field(default=None, description="Output modalities") - messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="Conversation messages") - additional_params: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters") - turn_detection: Optional[MllmTurnDetectionConfig] = Field(default=None, description="MLLM turn detection configuration") - failure_message: Optional[str] = Field(default=None, description="Message played on failure") - - class VertexAI(BaseMLLM): - def __init__(self, **kwargs: Any): - self.options = VertexAIOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - # additional_params spread first so that explicit fields always win, - # matching the TypeScript SDK. - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params["model"] = self.options.model - if self.options.instructions is not None: - params["instructions"] = self.options.instructions - if self.options.voice is not None: - params["voice"] = self.options.voice - if self.options.affective_dialog is not None: - params["affective_dialog"] = self.options.affective_dialog - if self.options.proactive_audio is not None: - params["proactive_audio"] = self.options.proactive_audio - if self.options.transcribe_agent is not None: - params["transcribe_agent"] = self.options.transcribe_agent - if self.options.transcribe_user is not None: - params["transcribe_user"] = self.options.transcribe_user - if self.options.http_options is not None: - params["http_options"] = self.options.http_options - - config: Dict[str, Any] = { - "vendor": "vertexai", - "project_id": self.options.project_id, - "location": self.options.location, - "adc_credentials_string": self.options.adc_credentials_string, - "params": params, - } - - if self.options.url is not None: - config["url"] = self.options.url - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.input_modalities is not None: - config["input_modalities"] = self.options.input_modalities - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.messages is not None: - config["messages"] = self.options.messages - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.turn_detection is not None: - config["turn_detection"] = self.options.turn_detection - - return config - - - class GeminiLiveOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Google API key") - model: str = Field(..., description="Gemini Live model name") - url: Optional[str] = Field(default=None, description="WebSocket URL") - instructions: Optional[str] = Field(default=None, description="System instructions") - voice: Optional[str] = Field(default=None, description="Voice name") - affective_dialog: Optional[bool] = Field(default=None, description="Enable affective dialog") - proactive_audio: Optional[bool] = Field(default=None, description="Enable proactive audio") - transcribe_agent: Optional[bool] = Field(default=None, description="Transcribe agent speech") - transcribe_user: Optional[bool] = Field(default=None, description="Transcribe user speech") - http_options: Optional[Dict[str, Any]] = Field(default=None, description="HTTP options") - greeting_message: Optional[str] = Field(default=None, description="Agent greeting message") - input_modalities: Optional[List[str]] = Field(default=None, description="Input modalities") - output_modalities: Optional[List[str]] = Field(default=None, description="Output modalities") - messages: Optional[List[Dict[str, Any]]] = Field(default=None, description="Conversation messages") - additional_params: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters") - turn_detection: Optional[MllmTurnDetectionConfig] = Field(default=None, description="MLLM turn detection configuration") - failure_message: Optional[str] = Field(default=None, description="Message played on failure") - - class GeminiLive(BaseMLLM): - def __init__(self, **kwargs: Any): - self.options = GeminiLiveOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = {} - if self.options.additional_params is not None: - params.update(self.options.additional_params) - params["model"] = self.options.model - if self.options.instructions is not None: - params["instructions"] = self.options.instructions - if self.options.voice is not None: - params["voice"] = self.options.voice - if self.options.affective_dialog is not None: - params["affective_dialog"] = self.options.affective_dialog - if self.options.proactive_audio is not None: - params["proactive_audio"] = self.options.proactive_audio - if self.options.transcribe_agent is not None: - params["transcribe_agent"] = self.options.transcribe_agent - if self.options.transcribe_user is not None: - params["transcribe_user"] = self.options.transcribe_user - if self.options.http_options is not None: - params["http_options"] = self.options.http_options - - config: Dict[str, Any] = { - "vendor": "gemini", - "api_key": self.options.api_key, - "params": params, - } - - if self.options.url is not None: - config["url"] = self.options.url - if self.options.greeting_message is not None: - config["greeting_message"] = self.options.greeting_message - if self.options.input_modalities is not None: - config["input_modalities"] = self.options.input_modalities - if self.options.output_modalities is not None: - config["output_modalities"] = self.options.output_modalities - if self.options.messages is not None: - config["messages"] = self.options.messages - if self.options.failure_message is not None: - config["failure_message"] = self.options.failure_message - if self.options.turn_detection is not None: - config["turn_detection"] = self.options.turn_detection - - return config - src/agora_agent/agentkit/vendors/stt.py: | - from typing import Any, Dict, Optional - - from pydantic import BaseModel, ConfigDict, Field, model_validator - - from .base import BaseSTT - - _DEEPGRAM_MANAGED_MODELS = {"nova-2", "nova-3"} - - - class SpeechmaticsSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Speechmatics API key") - language: str = Field(..., description="Language code (e.g., en, es, fr)") - model: Optional[str] = Field(default=None, description="Model name") - uri: Optional[str] = Field(default=None, description="Speechmatics streaming WebSocket URL") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class SpeechmaticsSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = SpeechmaticsSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params.update({ - "api_key": self.options.api_key, - "language": self.options.language, - }) - if self.options.model is not None: - params["model"] = self.options.model - if self.options.uri is not None: - params["uri"] = self.options.uri - - config: Dict[str, Any] = { - "vendor": "speechmatics", - "params": params, - } - return config - - - class DeepgramSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: Optional[str] = Field(default=None, description="Deepgram API key") - model: Optional[str] = Field(default=None, description="Model (e.g., nova-2, enhanced, base)") - language: Optional[str] = Field(default=None, description="Language code (e.g., en-US)") - keyterm: Optional[str] = Field(default=None, description="Boost specialized terms and brands for Deepgram") - smart_format: Optional[bool] = Field(default=None, description="Enable smart formatting") - punctuation: Optional[bool] = Field(default=None, description="Enable punctuation") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - @model_validator(mode="after") - def _validate_managed_model(self) -> "DeepgramSTTOptions": - if self.api_key is None and (self.model is None or self.model.strip().lower() not in _DEEPGRAM_MANAGED_MODELS): - raise ValueError("DeepgramSTT requires api_key unless using a supported Agora-managed model") - return self - - class DeepgramSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = DeepgramSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - - if self.options.api_key is not None: - params["key"] = self.options.api_key - if self.options.model is not None: - params["model"] = self.options.model - if self.options.language is not None: - params["language"] = self.options.language - if self.options.smart_format is not None: - params["smart_format"] = self.options.smart_format - if self.options.punctuation is not None: - params["punctuation"] = self.options.punctuation - if self.options.keyterm is not None: - params["keyterm"] = self.options.keyterm - config: Dict[str, Any] = { - "vendor": "deepgram", - "params": params, - } - return config - - - class MicrosoftSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - key: str = Field(..., description="Azure subscription key") - region: str = Field(..., description="Azure region (e.g., eastus)") - language: str = Field(..., description="Language code (e.g., en-US)") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class MicrosoftSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = MicrosoftSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params.update({ - "key": self.options.key, - "region": self.options.region, - }) - if self.options.language is not None: - params["language"] = self.options.language - - config: Dict[str, Any] = { - "vendor": "microsoft", - "params": params, - } - return config - - - class OpenAISTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="OpenAI API key") - model: Optional[str] = Field(default=None, description="Model (default: whisper-1)") - language: Optional[str] = Field(default=None, description="Language code") - prompt: Optional[str] = Field(default=None, description="Prompt that guides OpenAI transcription") - input_audio_transcription: Optional[Dict[str, Any]] = Field(default=None, description="OpenAI transcription settings") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class OpenAISTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = OpenAISTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params["api_key"] = self.options.api_key - - transcription: Dict[str, Any] = {"model": "gpt-4o-mini-transcribe"} - transcription.update(self.options.input_audio_transcription or {}) - if self.options.model is not None: - transcription["model"] = self.options.model - if self.options.prompt is not None: - transcription["prompt"] = self.options.prompt - if self.options.language is not None: - transcription["language"] = self.options.language - if not transcription.get("model"): - raise ValueError("OpenAISTT: input_audio_transcription.model is required") - if not transcription.get("prompt"): - raise ValueError("OpenAISTT: input_audio_transcription.prompt is required") - if not transcription.get("language"): - raise ValueError("OpenAISTT: input_audio_transcription.language is required") - params["input_audio_transcription"] = transcription - - config: Dict[str, Any] = { - "vendor": "openai", - "params": params, - } - return config - - - class GoogleSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - project_id: str = Field(..., description="Google Cloud project ID") - location: str = Field(..., description="Google Cloud region") - adc_credentials_string: str = Field(..., description="Google service account credentials JSON string") - language: str = Field(..., description="Language code (e.g., en-US)") - model: Optional[str] = Field(default=None, description="Recognition model") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class GoogleSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = GoogleSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params.update({ - "project_id": self.options.project_id, - "location": self.options.location, - "adc_credentials_string": self.options.adc_credentials_string, - }) - - if self.options.language is not None: - params["language"] = self.options.language - if self.options.model is not None: - params["model"] = self.options.model - - config: Dict[str, Any] = { - "vendor": "google", - "params": params, - } - return config - - - class AmazonSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - access_key: str = Field(..., description="AWS Access Key ID") - secret_key: str = Field(..., description="AWS Secret Access Key") - region: str = Field(..., description="AWS region (e.g., us-east-1)") - language: str = Field(..., description="Language code") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class AmazonSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = AmazonSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params.update({ - "access_key_id": self.options.access_key, - "secret_access_key": self.options.secret_key, - "region": self.options.region, - }) - if self.options.language is not None: - params["language_code"] = self.options.language - - config: Dict[str, Any] = { - "vendor": "amazon", - "params": params, - } - return config - - - class AssemblyAISTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="AssemblyAI API key") - language: str = Field(..., description="Language code") - uri: Optional[str] = Field(default=None, description="AssemblyAI streaming WebSocket URL") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class AssemblyAISTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = AssemblyAISTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params["api_key"] = self.options.api_key - if self.options.language is not None: - params["language"] = self.options.language - if self.options.uri is not None: - params["uri"] = self.options.uri - - config: Dict[str, Any] = { - "vendor": "assemblyai", - "params": params, - } - return config - - - class AresSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class AresSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = AresSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - config: Dict[str, Any] = {"vendor": "ares"} - if self.options.additional_params: - config["params"] = self.options.additional_params - return config - - - class SarvamSTTOptions(BaseModel): - model_config = ConfigDict(extra="forbid") - - api_key: str = Field(..., description="Sarvam API key") - language: str = Field(..., description="Language code (e.g., en, hi, ta)") - model: Optional[str] = Field(default=None, description="Model name") - additional_params: Optional[Dict[str, Any]] = Field(default=None) - - class SarvamSTT(BaseSTT): - def __init__(self, **kwargs: Any): - self.options = SarvamSTTOptions(**kwargs) - - def to_config(self) -> Dict[str, Any]: - params: Dict[str, Any] = dict(self.options.additional_params or {}) - params.update({ - "api_key": self.options.api_key, - "language": self.options.language, - }) - if self.options.model is not None: - params["model"] = self.options.model - - config: Dict[str, Any] = { - "vendor": "sarvam", - "params": params, - } - return config - - id: patch-bed29b6b - content_hash: sha256:35a32ee64c95efd478f684c167efc54c9d95344af837e99b31da4c36f66febce - original_commit: bed29b6b7d4d08480a8510b26b5e21d1ef234cc9 - original_message: "chore: bump Python packages to 2.1.0" - original_author: digitallysavvy - base_generation: 1d61baad436285e3b6a37555edb5ca67c158681c - files: - - compat/agora-agent-server-sdk/pyproject.toml - patch_content: | - diff --git a/compat/agora-agent-server-sdk/pyproject.toml b/compat/agora-agent-server-sdk/pyproject.toml - index eea45d7..078ac75 100644 - --- a/compat/agora-agent-server-sdk/pyproject.toml - +++ b/compat/agora-agent-server-sdk/pyproject.toml - @@ -3,7 +3,7 @@ name = "agora-agent-server-sdk" - - [tool.poetry] - name = "agora-agent-server-sdk" - -version = "v2.1.1" - +version = "v2.2.0" - description = "Compatibility shim for the renamed agora-agents package." - readme = "README.md" - authors = [] - @@ -35,7 +35,7 @@ Repository = 'https://github.com/AgoraIO-Conversational-AI/agent-server-sdk-pyth - - [tool.poetry.dependencies] - python = "^3.8" - -agora-agents = ">=2.1.1,<3.0.0" - +agora-agents = ">=2.2.0,<3.0.0" - - [build-system] - requires = ["poetry-core"] - theirs_snapshot: - compat/agora-agent-server-sdk/pyproject.toml: | - [project] - name = "agora-agent-server-sdk" - - [tool.poetry] - name = "agora-agent-server-sdk" - version = "v2.2.0" - description = "Compatibility shim for the renamed agora-agents package." - readme = "README.md" - authors = [] - keywords = [] - - classifiers = [ - "Intended Audience :: Developers", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Operating System :: OS Independent", - "Operating System :: POSIX", - "Operating System :: MacOS", - "Operating System :: POSIX :: Linux", - "Operating System :: Microsoft :: Windows", - "Topic :: Software Development :: Libraries :: Python Modules", - "Typing :: Typed" - ] - packages = [ - { include = "agora_agent_server_sdk_compat", from = "src"} - ] - - [tool.poetry.urls] - Repository = 'https://github.com/AgoraIO-Conversational-AI/agent-server-sdk-python' - - [tool.poetry.dependencies] - python = "^3.8" - agora-agents = ">=2.2.0,<3.0.0" - - [build-system] - requires = ["poetry-core"] - build-backend = "poetry.core.masonry.api" - user_owned: true - - id: patch-fecdc77c - content_hash: sha256:4c3321ec0facd689cee56c0fc609559d1038380d04a4cd8478b7ad7bb4a85388 - original_commit: fecdc77c866f433d8287fcb8a55328612e016b21 - original_message: Fix AgentKit request validation and provider wire-key coverage - original_author: digitallysavvy - base_generation: 1d61baad436285e3b6a37555edb5ca67c158681c - files: - - PYTHON-AGENTKIT-SNAKE-CASE-AUDIT.md - patch_content: | - diff --git a/PYTHON-AGENTKIT-SNAKE-CASE-AUDIT.md b/PYTHON-AGENTKIT-SNAKE-CASE-AUDIT.md - new file mode 100644 - index 0000000..f3cd64a - --- /dev/null - +++ b/PYTHON-AGENTKIT-SNAKE-CASE-AUDIT.md - @@ -0,0 +1,27 @@ - +# Python AgentKit Snake Case API Audit - + - +Scope: `agora-agents-python` public AgentKit wrappers, docs, and tests. - + - +Search terms: - + - +```bash - +rg -n "apiKey|baseUrl|modelId|voiceId|groupId|keyTerm|turnDetection|inputAudioTranscription|greetingMessage|failureMessage|projectId|adcCredentialsString|sampleRate|targetLanguageCode|resourceName|deploymentName" agora-agents-python - +``` - + - +## Result - + - +No shipped camelCase public Python constructor kwargs were found in source or docs examples. No deprecated alias helper is required for this pass. - + - +| File | Class / symbol | Public arg or example | Current spelling | Desired Python spelling | `to_config()` key | Wire key | Action | Compatibility needed | Test coverage | - +|---|---|---|---|---|---|---|---|---|---| - +| `src/agora_agent/agentkit/vendors/tts.py` | `GoogleTTS` | constructor arg | `voice_name` | `voice_name` | `params.VoiceSelectionParams` | `params.VoiceSelectionParams` | keep | no | `tests/custom/test_tts_vendors.py` | - +| `src/agora_agent/agentkit/vendors/tts.py` | `RimeTTS` | constructor arg | `model_id` | `model_id` | `params.modelId` | `params.modelId` | keep | no | `tests/custom/test_tts_vendors.py` | - +| `src/agora_agent/agentkit/vendors/tts.py` | `MurfTTS` | constructor arg | `voice_id` | `voice_id` | `params.voiceId` | `params.voiceId` | keep | no | `tests/custom/test_tts_vendors.py`, `tests/custom/test_request_body.py` | - +| `src/agora_agent/types/rime_tts_params.py` | generated model | generated alias | `modelId` | n/a | `model_id` | `modelId` | keep | no | `tests/custom/test_tts_vendors.py` | - +| `src/agora_agent/types/murf_tts_params.py` | generated model | generated alias | `voiceId` | n/a | `voice_id` | `voiceId` | keep | no | `tests/custom/test_tts_vendors.py` | - +| `tests/custom/test_request_body.py` | wire assertion | payload key | `voiceId` | n/a | `params.voiceId` | `params.voiceId` | keep | no | request-body test | - +| `tests/custom/test_tts_vendors.py` | wire assertion | payload key | `modelId`, `voiceId`, `VoiceSelectionParams` | n/a | generated model fields | wire aliases | keep | no | wire serialization test | - + - +## Guardrail Added - + - +`tests/custom/test_docs_snake_case.py` scans Python markdown code fences and fails on common camelCase kwargs such as `apiKey`, `baseUrl`, `modelId`, `voiceId`, `projectId`, and `greetingMessage`. JSON, TypeScript, Go, shell, and YAML examples are skipped so wire payload examples can retain required non-Python keys. - theirs_snapshot: - PYTHON-AGENTKIT-SNAKE-CASE-AUDIT.md: | - # Python AgentKit Snake Case API Audit - - Scope: `agora-agents-python` public AgentKit wrappers, docs, and tests. - - Search terms: - - ```bash - rg -n "apiKey|baseUrl|modelId|voiceId|groupId|keyTerm|turnDetection|inputAudioTranscription|greetingMessage|failureMessage|projectId|adcCredentialsString|sampleRate|targetLanguageCode|resourceName|deploymentName" agora-agents-python - ``` - - ## Result - - No shipped camelCase public Python constructor kwargs were found in source or docs examples. No deprecated alias helper is required for this pass. - - | File | Class / symbol | Public arg or example | Current spelling | Desired Python spelling | `to_config()` key | Wire key | Action | Compatibility needed | Test coverage | - |---|---|---|---|---|---|---|---|---|---| - | `src/agora_agent/agentkit/vendors/tts.py` | `GoogleTTS` | constructor arg | `voice_name` | `voice_name` | `params.VoiceSelectionParams` | `params.VoiceSelectionParams` | keep | no | `tests/custom/test_tts_vendors.py` | - | `src/agora_agent/agentkit/vendors/tts.py` | `RimeTTS` | constructor arg | `model_id` | `model_id` | `params.modelId` | `params.modelId` | keep | no | `tests/custom/test_tts_vendors.py` | - | `src/agora_agent/agentkit/vendors/tts.py` | `MurfTTS` | constructor arg | `voice_id` | `voice_id` | `params.voiceId` | `params.voiceId` | keep | no | `tests/custom/test_tts_vendors.py`, `tests/custom/test_request_body.py` | - | `src/agora_agent/types/rime_tts_params.py` | generated model | generated alias | `modelId` | n/a | `model_id` | `modelId` | keep | no | `tests/custom/test_tts_vendors.py` | - | `src/agora_agent/types/murf_tts_params.py` | generated model | generated alias | `voiceId` | n/a | `voice_id` | `voiceId` | keep | no | `tests/custom/test_tts_vendors.py` | - | `tests/custom/test_request_body.py` | wire assertion | payload key | `voiceId` | n/a | `params.voiceId` | `params.voiceId` | keep | no | request-body test | - | `tests/custom/test_tts_vendors.py` | wire assertion | payload key | `modelId`, `voiceId`, `VoiceSelectionParams` | n/a | generated model fields | wire aliases | keep | no | wire serialization test | - - ## Guardrail Added - - `tests/custom/test_docs_snake_case.py` scans Python markdown code fences and fails on common camelCase kwargs such as `apiKey`, `baseUrl`, `modelId`, `voiceId`, `projectId`, and `greetingMessage`. JSON, TypeScript, Go, shell, and YAML examples are skipped so wire payload examples can retain required non-Python keys. - user_owned: true