diff --git a/python/packages/hyperlight/agent_framework_hyperlight/_execute_code_tool.py b/python/packages/hyperlight/agent_framework_hyperlight/_execute_code_tool.py index 582cbece22..738b080183 100644 --- a/python/packages/hyperlight/agent_framework_hyperlight/_execute_code_tool.py +++ b/python/packages/hyperlight/agent_framework_hyperlight/_execute_code_tool.py @@ -7,7 +7,7 @@ import shutil import threading import time -from collections.abc import Callable, Sequence +from collections.abc import Callable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from copy import copy @@ -483,15 +483,66 @@ def _display_mount_path(mount_path: str) -> str: return f"/input/{mount_path}" +def _iter_real_entries(root: Path) -> Iterator[Path]: + """Walk ``root`` recursively, yielding directories and regular files only. + + ``Path.rglob`` follows directory symlinks by default, which combined with + ``Path.is_file()`` / ``shutil.copy2`` (all follow symlinks) would expose + paths outside the configured input tree if the source tree is + attacker-controlled. This walker mirrors the safe behaviour by checking + ``is_symlink()`` at every directory level and never descending through one. + + Non-regular files (sockets, FIFOs, devices) are also filtered out so the + signature mirrors exactly what ``_copy_path`` actually stages. + """ + stack: list[Path] = [root] + while stack: + current = stack.pop() + try: + children = list(current.iterdir()) + except OSError: + continue + for child in children: + try: + if child.is_symlink(): + continue + if child.is_dir(): + stack.append(child) + yield child + elif child.is_file(): + yield child + # Non-regular files (sockets/FIFOs/devices) are skipped to + # match ``_copy_path``'s staging behaviour. + except OSError: + continue + + def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]: + """Return a stable signature of the real (non-symlink) file tree under ``path``. + + If ``path`` itself is a symlink, it is resolved first so the signature + reflects the real target's contents. This matches the public construction + flow (``_resolve_workspace_root`` / ``_normalize_file_mount_input`` already + resolve roots up front) and acts as defense in depth for any direct caller + that builds a ``_RunConfig`` without going through the constructor. + + Symlinks encountered inside the walked tree are skipped, and ``lstat()`` is + used so size/mtime are read from the entry itself, never through a + target. The result mirrors what ``_copy_path`` actually stages. + """ + if path.is_symlink(): + try: + path = path.resolve(strict=True) + except OSError: + return () if path.is_file(): - stat = path.stat() + stat = path.lstat() return ((path.name, int(stat.st_size), int(stat.st_mtime_ns)),) entries: list[tuple[str, int, int]] = [] - for candidate in sorted(path.rglob("*"), key=lambda value: value.as_posix()): + for candidate in sorted(_iter_real_entries(path), key=lambda value: value.as_posix()): try: - stat = candidate.stat() + stat = candidate.lstat() except FileNotFoundError: continue relative_path = candidate.relative_to(path).as_posix() @@ -501,14 +552,37 @@ def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]: def _copy_path(source: Path, destination: Path) -> None: + """Stage ``source`` into ``destination`` without following symlinks. + + Symlinks (file or directory) found in the source tree are skipped entirely + so a sandbox input tree can only contain real entries that physically live + under the configured ``workspace_root`` or a ``file_mounts`` host path. + ``Path.is_dir()``, ``Path.is_file()`` and ``shutil.copy2`` all follow + symlinks by default, which is unsafe for symlinks planted in the source + tree at rest. + + This helper does not attempt to make the copy atomic with respect to + concurrent mutation of the source tree. Callers that need protection from + an adversary modifying the workspace mid-stage should pass in an + immutable / snapshotted directory. + """ + # Detect symlinks before doing anything else - ``is_symlink()`` does not + # follow the link, unlike ``is_dir()`` / ``is_file()``. + if source.is_symlink(): + return + if source.is_dir(): destination.mkdir(parents=True, exist_ok=True) for child in sorted(source.iterdir(), key=lambda value: value.name): _copy_path(child, destination / child.name) return + if not source.is_file(): + # Non-regular files (sockets, FIFOs, devices) are intentionally skipped. + return + destination.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(source, destination) + shutil.copy2(source, destination, follow_symlinks=False) def _populate_input_dir(*, config: _RunConfig, input_root: Path) -> None: diff --git a/python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py b/python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py index 9611978744..03e3c2269c 100644 --- a/python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py +++ b/python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py @@ -425,6 +425,175 @@ async def test_execute_code_tool_populates_input_dir_with_workspace_and_file_mou assert (input_root / "data" / "input.txt").read_text(encoding="utf-8") == "hello from mount" +def _build_run_config( + *, + workspace_root: Path | None = None, + file_mounts: tuple = (), +) -> Any: + """Build a minimal _RunConfig for tests that exercise _populate_input_dir directly.""" + return execute_code_module._RunConfig( + backend="wasm", + module="python_guest.path", + module_path=None, + approval_mode="never_require", + tools=(), + workspace_root=workspace_root, + workspace_signature=(), + file_mounts=file_mounts, + allowed_domains=(), + ) + + +def _symlinks_supported(tmp: Path) -> bool: + """Return True if the current platform/environment supports symlinks. + + Mirrors python/packages/core/tests/core/test_skills.py so the symlink + regression tests are skipped on restricted Windows CI runners instead of + failing on ``OSError`` / ``NotImplementedError`` during creation. + """ + test_target = tmp / "_symlink_test_target" + test_link = tmp / "_symlink_test_link" + try: + test_target.write_text("test", encoding="utf-8") + test_link.symlink_to(test_target) + return True + except (OSError, NotImplementedError): + return False + finally: + test_link.unlink(missing_ok=True) + test_target.unlink(missing_ok=True) + + +def test_populate_input_dir_skips_symlink_to_file_outside_workspace(tmp_path: Path) -> None: + if not _symlinks_supported(tmp_path): + pytest.skip("Symlinks not supported on this platform/environment") + workspace = tmp_path / "workspace" + workspace.mkdir() + outside = tmp_path / "outside.txt" + outside.write_text("outside-content", encoding="utf-8") + (workspace / "real.txt").write_text("real-content", encoding="utf-8") + (workspace / "link.txt").symlink_to(outside) + + input_root = tmp_path / "input" + input_root.mkdir() + + execute_code_module._populate_input_dir( + config=_build_run_config(workspace_root=workspace), + input_root=input_root, + ) + + # Real file copied; symlink and its target are absent. + assert (input_root / "real.txt").read_text(encoding="utf-8") == "real-content" + assert not (input_root / "link.txt").exists() + assert not (input_root / "link.txt").is_symlink() + # Sanity: no outside-content anywhere in the input tree. + leaked = [ + path + for path in input_root.rglob("*") + if path.is_file() and path.read_text(encoding="utf-8") == "outside-content" + ] + assert leaked == [] + + +def test_populate_input_dir_skips_symlinked_directory_outside_workspace(tmp_path: Path) -> None: + if not _symlinks_supported(tmp_path): + pytest.skip("Symlinks not supported on this platform/environment") + workspace = tmp_path / "workspace" + workspace.mkdir() + outside_dir = tmp_path / "outside_dir" + outside_dir.mkdir() + (outside_dir / "deep.txt").write_text("deep-content", encoding="utf-8") + (workspace / "linked_dir").symlink_to(outside_dir, target_is_directory=True) + + input_root = tmp_path / "input" + input_root.mkdir() + + execute_code_module._populate_input_dir( + config=_build_run_config(workspace_root=workspace), + input_root=input_root, + ) + + # Neither the symlink itself nor anything under the symlinked target leaks. + assert not (input_root / "linked_dir").exists() + leaked = [ + path for path in input_root.rglob("*") if path.is_file() and path.read_text(encoding="utf-8") == "deep-content" + ] + assert leaked == [] + + +def test_populate_input_dir_skips_nested_symlinks(tmp_path: Path) -> None: + """A symlink several levels deep inside a real subdir must also be skipped.""" + if not _symlinks_supported(tmp_path): + pytest.skip("Symlinks not supported on this platform/environment") + workspace = tmp_path / "workspace" + (workspace / "real_sub").mkdir(parents=True) + (workspace / "real_sub" / "ok.txt").write_text("ok", encoding="utf-8") + outside = tmp_path / "outside.txt" + outside.write_text("outside-content", encoding="utf-8") + (workspace / "real_sub" / "link.txt").symlink_to(outside) + + input_root = tmp_path / "input" + input_root.mkdir() + + execute_code_module._populate_input_dir( + config=_build_run_config(workspace_root=workspace), + input_root=input_root, + ) + + assert (input_root / "real_sub" / "ok.txt").read_text(encoding="utf-8") == "ok" + assert not (input_root / "real_sub" / "link.txt").exists() + + +def test_path_tree_signature_does_not_follow_symlinks(tmp_path: Path) -> None: + """The cache-key signature must reflect only real files (mirrors the staged tree).""" + if not _symlinks_supported(tmp_path): + pytest.skip("Symlinks not supported on this platform/environment") + workspace = tmp_path / "workspace" + workspace.mkdir() + real = workspace / "real.txt" + real.write_text("real-content", encoding="utf-8") + outside = tmp_path / "outside.txt" + outside.write_text("outside-content", encoding="utf-8") + (workspace / "link.txt").symlink_to(outside) + + signature = execute_code_module._path_tree_signature(workspace) + + names = [entry[0] for entry in signature] + assert "real.txt" in names + assert "link.txt" not in names + + +def test_path_tree_signature_walks_through_symlinked_root(tmp_path: Path) -> None: + """A symlinked workspace root must produce a real signature, not an empty one. + + Defends against the cache never invalidating when a caller passes a + symlinked workspace and the underlying real directory's contents change. + """ + if not _symlinks_supported(tmp_path): + pytest.skip("Symlinks not supported on this platform/environment") + + real_workspace = tmp_path / "real_workspace" + real_workspace.mkdir() + target = real_workspace / "data.txt" + target.write_text("v1", encoding="utf-8") + + linked_workspace = tmp_path / "linked_workspace" + linked_workspace.symlink_to(real_workspace, target_is_directory=True) + + signature_v1 = execute_code_module._path_tree_signature(linked_workspace) + names = [entry[0] for entry in signature_v1] + assert "data.txt" in names, f"signature should include the target's contents, got {signature_v1!r}" + + # Mutate the real contents; the symlinked-root signature must reflect the change + # so the cache key invalidates. + import time + + time.sleep(0.01) # ensure mtime_ns moves on filesystems with coarse granularity + target.write_text("v2-content-larger", encoding="utf-8") + signature_v2 = execute_code_module._path_tree_signature(linked_workspace) + assert signature_v1 != signature_v2, "signature should change when symlinked target contents change" + + def test_execute_code_tool_allowed_domains_use_structured_entries_and_replace_by_target() -> None: execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())