Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import shutil
import threading
import time
from collections.abc import Callable, Sequence
from collections.abc import Callable, Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from copy import copy
Expand Down Expand Up @@ -483,15 +483,66 @@ def _display_mount_path(mount_path: str) -> str:
return f"/input/{mount_path}"


def _iter_real_entries(root: Path) -> Iterator[Path]:
"""Walk ``root`` recursively, yielding directories and regular files only.

``Path.rglob`` follows directory symlinks by default, which combined with
``Path.is_file()`` / ``shutil.copy2`` (all follow symlinks) would expose
paths outside the configured input tree if the source tree is
attacker-controlled. This walker mirrors the safe behaviour by checking
``is_symlink()`` at every directory level and never descending through one.

Non-regular files (sockets, FIFOs, devices) are also filtered out so the
signature mirrors exactly what ``_copy_path`` actually stages.
"""
stack: list[Path] = [root]
while stack:
current = stack.pop()
try:
children = list(current.iterdir())
except OSError:
continue
for child in children:
try:
if child.is_symlink():
continue
if child.is_dir():
stack.append(child)
yield child
elif child.is_file():
yield child
# Non-regular files (sockets/FIFOs/devices) are skipped to
# match ``_copy_path``'s staging behaviour.
except OSError:
Comment thread
eavanvalkenburg marked this conversation as resolved.
continue


def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]:
"""Return a stable signature of the real (non-symlink) file tree under ``path``.

If ``path`` itself is a symlink, it is resolved first so the signature
reflects the real target's contents. This matches the public construction
flow (``_resolve_workspace_root`` / ``_normalize_file_mount_input`` already
resolve roots up front) and acts as defense in depth for any direct caller
that builds a ``_RunConfig`` without going through the constructor.

Symlinks encountered inside the walked tree are skipped, and ``lstat()`` is
used so size/mtime are read from the entry itself, never through a
target. The result mirrors what ``_copy_path`` actually stages.
"""
if path.is_symlink():
Comment thread
eavanvalkenburg marked this conversation as resolved.
try:
path = path.resolve(strict=True)
except OSError:
return ()
if path.is_file():
stat = path.stat()
stat = path.lstat()
return ((path.name, int(stat.st_size), int(stat.st_mtime_ns)),)

entries: list[tuple[str, int, int]] = []
for candidate in sorted(path.rglob("*"), key=lambda value: value.as_posix()):
for candidate in sorted(_iter_real_entries(path), key=lambda value: value.as_posix()):
try:
stat = candidate.stat()
stat = candidate.lstat()
except FileNotFoundError:
continue
relative_path = candidate.relative_to(path).as_posix()
Expand All @@ -501,14 +552,37 @@ def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]:


def _copy_path(source: Path, destination: Path) -> None:
"""Stage ``source`` into ``destination`` without following symlinks.

Symlinks (file or directory) found in the source tree are skipped entirely
so a sandbox input tree can only contain real entries that physically live
under the configured ``workspace_root`` or a ``file_mounts`` host path.
``Path.is_dir()``, ``Path.is_file()`` and ``shutil.copy2`` all follow
symlinks by default, which is unsafe for symlinks planted in the source
tree at rest.

This helper does not attempt to make the copy atomic with respect to
concurrent mutation of the source tree. Callers that need protection from
an adversary modifying the workspace mid-stage should pass in an
immutable / snapshotted directory.
"""
# Detect symlinks before doing anything else - ``is_symlink()`` does not
# follow the link, unlike ``is_dir()`` / ``is_file()``.
if source.is_symlink():
Comment thread
eavanvalkenburg marked this conversation as resolved.
return

if source.is_dir():
destination.mkdir(parents=True, exist_ok=True)
for child in sorted(source.iterdir(), key=lambda value: value.name):
_copy_path(child, destination / child.name)
return

if not source.is_file():
# Non-regular files (sockets, FIFOs, devices) are intentionally skipped.
return

destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
shutil.copy2(source, destination, follow_symlinks=False)


def _populate_input_dir(*, config: _RunConfig, input_root: Path) -> None:
Expand Down
169 changes: 169 additions & 0 deletions python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,175 @@ async def test_execute_code_tool_populates_input_dir_with_workspace_and_file_mou
assert (input_root / "data" / "input.txt").read_text(encoding="utf-8") == "hello from mount"


def _build_run_config(
*,
workspace_root: Path | None = None,
file_mounts: tuple = (),
) -> Any:
"""Build a minimal _RunConfig for tests that exercise _populate_input_dir directly."""
return execute_code_module._RunConfig(
backend="wasm",
module="python_guest.path",
module_path=None,
approval_mode="never_require",
tools=(),
workspace_root=workspace_root,
workspace_signature=(),
file_mounts=file_mounts,
allowed_domains=(),
)


def _symlinks_supported(tmp: Path) -> bool:
"""Return True if the current platform/environment supports symlinks.

Mirrors python/packages/core/tests/core/test_skills.py so the symlink
regression tests are skipped on restricted Windows CI runners instead of
failing on ``OSError`` / ``NotImplementedError`` during creation.
"""
test_target = tmp / "_symlink_test_target"
test_link = tmp / "_symlink_test_link"
try:
test_target.write_text("test", encoding="utf-8")
test_link.symlink_to(test_target)
return True
except (OSError, NotImplementedError):
return False
finally:
test_link.unlink(missing_ok=True)
test_target.unlink(missing_ok=True)


def test_populate_input_dir_skips_symlink_to_file_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real.txt").write_text("real-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)

Comment thread
eavanvalkenburg marked this conversation as resolved.
input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

# Real file copied; symlink and its target are absent.
assert (input_root / "real.txt").read_text(encoding="utf-8") == "real-content"
assert not (input_root / "link.txt").exists()
assert not (input_root / "link.txt").is_symlink()
# Sanity: no outside-content anywhere in the input tree.
leaked = [
path
for path in input_root.rglob("*")
if path.is_file() and path.read_text(encoding="utf-8") == "outside-content"
]
assert leaked == []


def test_populate_input_dir_skips_symlinked_directory_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside_dir = tmp_path / "outside_dir"
outside_dir.mkdir()
(outside_dir / "deep.txt").write_text("deep-content", encoding="utf-8")
(workspace / "linked_dir").symlink_to(outside_dir, target_is_directory=True)

input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

# Neither the symlink itself nor anything under the symlinked target leaks.
assert not (input_root / "linked_dir").exists()
leaked = [
path for path in input_root.rglob("*") if path.is_file() and path.read_text(encoding="utf-8") == "deep-content"
]
assert leaked == []


def test_populate_input_dir_skips_nested_symlinks(tmp_path: Path) -> None:
"""A symlink several levels deep inside a real subdir must also be skipped."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
(workspace / "real_sub").mkdir(parents=True)
(workspace / "real_sub" / "ok.txt").write_text("ok", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real_sub" / "link.txt").symlink_to(outside)

input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

assert (input_root / "real_sub" / "ok.txt").read_text(encoding="utf-8") == "ok"
assert not (input_root / "real_sub" / "link.txt").exists()


def test_path_tree_signature_does_not_follow_symlinks(tmp_path: Path) -> None:
"""The cache-key signature must reflect only real files (mirrors the staged tree)."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
real = workspace / "real.txt"
real.write_text("real-content", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)

signature = execute_code_module._path_tree_signature(workspace)

names = [entry[0] for entry in signature]
assert "real.txt" in names
assert "link.txt" not in names


def test_path_tree_signature_walks_through_symlinked_root(tmp_path: Path) -> None:
"""A symlinked workspace root must produce a real signature, not an empty one.

Defends against the cache never invalidating when a caller passes a
symlinked workspace and the underlying real directory's contents change.
"""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")

real_workspace = tmp_path / "real_workspace"
real_workspace.mkdir()
target = real_workspace / "data.txt"
target.write_text("v1", encoding="utf-8")

linked_workspace = tmp_path / "linked_workspace"
linked_workspace.symlink_to(real_workspace, target_is_directory=True)

signature_v1 = execute_code_module._path_tree_signature(linked_workspace)
names = [entry[0] for entry in signature_v1]
assert "data.txt" in names, f"signature should include the target's contents, got {signature_v1!r}"

# Mutate the real contents; the symlinked-root signature must reflect the change
# so the cache key invalidates.
import time

time.sleep(0.01) # ensure mtime_ns moves on filesystems with coarse granularity
target.write_text("v2-content-larger", encoding="utf-8")
signature_v2 = execute_code_module._path_tree_signature(linked_workspace)
assert signature_v1 != signature_v2, "signature should change when symlinked target contents change"


def test_execute_code_tool_allowed_domains_use_structured_entries_and_replace_by_target() -> None:
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())

Expand Down
Loading