diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 82aaa6e452..867a0bdba1 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -105,10 +105,76 @@ def _load_config_from_path(config_path: str) -> AgentConfig: return AgentConfig.model_validate(config_data) +_ENFORCE_DENYLIST = False + +# Modules that must never be imported via YAML agent configuration. +# These provide direct access to the operating system, process execution, +# or dynamic code evaluation and could be abused to achieve arbitrary +# code execution when referenced in callback, tool, schema, or model +# code-reference fields. +_BLOCKED_MODULES = frozenset({ + "os", + "subprocess", + "sys", + "builtins", + "importlib", + "shutil", + "socket", + "http", + "urllib", + "ctypes", + "multiprocessing", + "threading", + "signal", + "code", + "codeop", + "compileall", + "runpy", + "webbrowser", + "antigravity", + "pty", + "commands", + "pdb", + "profile", + "tempfile", + "shelve", + "pickle", + "marshal", +}) + + +def _validate_module_reference(fully_qualified_name: str) -> None: + """Validate that a module reference does not target a blocked module. + + Args: + fully_qualified_name: The fully-qualified Python name to validate + (e.g. ``"my_package.my_module.my_func"``). + + Raises: + ValueError: If the top-level module is in ``_BLOCKED_MODULES``. + """ + if not _ENFORCE_DENYLIST: + return + # Extract the top-level package from the fully-qualified name. + top_module = fully_qualified_name.split(".")[0] + if top_module in _BLOCKED_MODULES: + raise ValueError( + f"Blocked module reference: {fully_qualified_name!r}. " + f"Importing from the '{top_module}' module is not allowed in " + "agent configurations because it can execute arbitrary code." + ) + + +def _set_enforce_denylist(value: bool) -> None: + global _ENFORCE_DENYLIST + _ENFORCE_DENYLIST = value + + @experimental(FeatureName.AGENT_CONFIG) def resolve_fully_qualified_name(name: str) -> Any: try: module_path, obj_name = name.rsplit(".", 1) + _validate_module_reference(name) module = importlib.import_module(module_path) return getattr(module, obj_name) except Exception as e: @@ -160,6 +226,7 @@ def _resolve_agent_code_reference(code: str) -> Any: if "." not in code: raise ValueError(f"Invalid code reference: {code}") + _validate_module_reference(code) module_path, obj_name = code.rsplit(".", 1) module = importlib.import_module(module_path) obj = getattr(module, obj_name) @@ -189,6 +256,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: if not code_config or not code_config.name: raise ValueError("Invalid CodeConfig.") + _validate_module_reference(code_config.name) module_path, obj_name = code_config.name.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, obj_name) diff --git a/tests/unittests/agents/test_agent_config.py b/tests/unittests/agents/test_agent_config.py index 99ee1d8401..71e5190c20 100644 --- a/tests/unittests/agents/test_agent_config.py +++ b/tests/unittests/agents/test_agent_config.py @@ -465,3 +465,63 @@ def fake_from_config(path: str): ) assert result == "sentinel" assert recorded["path"] == expected_path + + +# --- Security tests: module blocklist for YAML agent config code references --- + + +def test_resolve_code_reference_blocks_os_when_enforced(): + """Verify resolve_code_reference blocks os module directly.""" + from google.adk.agents.common_configs import CodeConfig + + config_agent_utils._set_enforce_denylist(True) + try: + with pytest.raises(ValueError, match="Blocked module reference"): + config_agent_utils.resolve_code_reference( + CodeConfig(name="os.system") + ) + finally: + config_agent_utils._set_enforce_denylist(False) + + +def test_resolve_fully_qualified_name_blocks_subprocess_when_enforced(): + """Verify resolve_fully_qualified_name blocks subprocess module.""" + config_agent_utils._set_enforce_denylist(True) + try: + with pytest.raises(ValueError, match="Blocked module reference"): + config_agent_utils.resolve_fully_qualified_name("subprocess.Popen") + finally: + config_agent_utils._set_enforce_denylist(False) + + +def test_allowed_module_passes_when_enforced(tmp_path: Path): + """Verify that google.adk modules are NOT blocked by the module denylist.""" + config_agent_utils._set_enforce_denylist(True) + try: + # This should NOT raise — google.adk modules must remain allowed + result = config_agent_utils.resolve_fully_qualified_name( + "google.adk.agents.llm_agent.LlmAgent" + ) + assert result is LlmAgent + finally: + config_agent_utils._set_enforce_denylist(False) + + +@pytest.mark.parametrize( + "blocked_module", + [ + "os.system", + "subprocess.call", + "builtins.exec", + ], +) +def test_resolve_agent_code_reference_blocks_when_enforced( + blocked_module: str, +): + """Verify _resolve_agent_code_reference blocks dangerous modules.""" + config_agent_utils._set_enforce_denylist(True) + try: + with pytest.raises(ValueError, match="Blocked module reference"): + config_agent_utils._resolve_agent_code_reference(blocked_module) + finally: + config_agent_utils._set_enforce_denylist(False)