diff --git a/README.rst b/README.rst index c54962cb..ad36ff4e 100644 --- a/README.rst +++ b/README.rst @@ -53,8 +53,33 @@ it will try to figure out where the code to mutate is. You can stop the mutation run at any time and mutmut will restart where you -left off. It will continue where it left off, and re-test functions that were -modified since last run. +left off. + +Incremental Testing +~~~~~~~~~~~~~~~~~~~ + +Mutmut is designed for incremental workflows. It remembers which mutants have +been tested and their results, so subsequent runs skip already-tested mutants. + +**Function-level change detection:** Mutmut computes a hash of each function's +source code. When you modify a function, mutmut detects the change and +automatically re-tests all mutants in that function. Unchanged functions keep +their previous results. + +**Limitation:** Change detection only tracks direct function changes, not +transitive dependencies. If function A calls function B, and you modify B, +mutants in A are not automatically re-tested. For significant changes to +shared utilities, use ``mutmut run "module*"`` to re-test affected modules, +or delete the ``mutants/`` directory for a full re-run. + +This means you can: + +- Run ``mutmut run``, stop partway through, and continue later +- Modify your source code and re-run - only changed functions are re-tested +- Update your tests and use ``mutmut browse`` to selectively re-test mutants + +The mutation data is stored in the ``mutants/`` directory. Delete this +directory to start completely fresh. To work with the results, use `mutmut browse` where you can see the mutants, retest them when you've updated your tests. @@ -209,6 +234,25 @@ to failing tests. debug=true +Disable setproctitle (macOS) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Mutmut uses ``setproctitle`` to show the current mutant name in the process +list, which is helpful for monitoring long runs. However, ``setproctitle`` +uses CoreFoundation APIs on macOS that are not fork-safe, causing segfaults +in child processes. + +By default, mutmut automatically disables ``setproctitle`` on macOS and +enables it on other platforms. If you need to override this (e.g. to enable it on +macOS at your own risk, or to disable it on other platforms), set ``use_setproctitle``: + +.. code-block:: toml + + # pyproject.toml + [tool.mutmut] + use_setproctitle = false + + Whitelisting ~~~~~~~~~~~~ @@ -226,6 +270,92 @@ whitelist lines are: to continue, but it's slower. +Enum Classes and Metaclass Compatibility +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Mutmut 3.x fully supports mutating enum classes. Methods inside enum classes +(``Enum``, ``IntEnum``, ``Flag``, ``IntFlag``, ``StrEnum``) are automatically +mutated using an external injection pattern that avoids conflicts with the +enum metaclass. + +This means enums with methods like: + +.. code-block:: python + + from enum import Enum + + class Color(Enum): + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() + + @staticmethod + def count(): + return 3 + +...will have their methods mutated just like regular class methods. + +**Disabling Enum Mutation** + +If you prefer to skip enum mutation entirely, you can disable it in your config: + +.. code-block:: toml + + # pyproject.toml + [tool.mutmut] + mutate_enums = false + +Or skip a specific enum class using the pragma: + +.. code-block:: python + + class Color(Enum): # pragma: no mutate class + RED = 1 + GREEN = 2 + + def describe(self): + return f"Color is {self.name}" + +This tells mutmut to completely skip the class—no mutations will be created +for any methods. + +Both syntax styles are supported: + +- ``# pragma: no mutate class`` +- ``# pragma: no mutate: class`` + +**Note:** The regular ``# pragma: no mutate`` on a class line only prevents +mutations on that specific line. It does NOT prevent mutations inside methods. +Use ``# pragma: no mutate class`` to skip the entire class (kept for backward +compatibility with str: return "Hello from my-lib!" @@ -14,6 +19,13 @@ def badly_tested() -> str: def untested() -> str: return "Mutants for this method should survive" +def skip_this_function() -> int: # pragma: no mutate: function + return 1 + 2 * 3 + +def also_skip_this_function() -> str: # pragma: no mutate function + return "should" + " not" + " mutate" + + def make_greeter(name: Union[str, None]) -> Callable[[], str]: def hi(): if name: @@ -88,6 +100,30 @@ def from_coords(coords) -> 'Point': def coords(self): return self.x, self.y + @staticmethod + def skip_static_decorator_pragma(a: int, b: int) -> int: # pragma: no mutate: function + return a + b * 2 + + @classmethod + def skip_class_decorator_pragma(cls, value: int) -> "Point": # pragma: no mutate: function + return cls(value + 1, value * 2) + + def skip_instance_method_pragma(self) -> int: # pragma: no mutate: function + return self.x + self.y * 2 + + @staticmethod # pragma: no mutate: function + def pragma_on_staticmethod_decorator(a: int, b: int) -> int: + return a + b * 2 + + @classmethod # pragma: no mutate: function + def pragma_on_classmethod_decorator(cls, value: int) -> "Point": + return cls(value + 1, value * 2) + + @my_decorator + @classmethod + def skip_multi_decorator(cls, value: int) -> "Point": + return cls(value + 1, value * 2) + def escape_sequences(): return "foo" \ @@ -111,3 +147,42 @@ def func_with_star(a, /, b, *, c, **kwargs): def func_with_arbitrary_args_clone(*args, **kwargs): pass # pragma: no mutate def func_with_arbitrary_args(*args, **kwargs): return len(args) + len(kwargs) + + +class Color(Enum): + RED = 1 + GREEN = 2 + BLUE = 3 + + def is_primary(self) -> bool: + return self in (Color.RED, Color.GREEN, Color.BLUE) + + def darken(self) -> int: + return self.value - 1 + + @staticmethod + def from_name(name: str) -> "Color": + return Color[name.upper()] + + @classmethod + def default(cls) -> "Color": + return cls.RED + + +class SkipThisClass: # pragma: no mutate: class + def method_one(self) -> int: + return 1 + 2 + + def method_two(self) -> str: + return "hello" + " world" + + @staticmethod + def static_method() -> int: + return 3 * 4 + + +class AlsoSkipThisClass: # pragma: no mutate class + VALUE = 10 + 20 + + def compute(self) -> int: + return self.VALUE * 2 diff --git a/e2e_projects/my_lib/tests/test_my_lib.py b/e2e_projects/my_lib/tests/test_my_lib.py index 3febc7af..ad9323d7 100644 --- a/e2e_projects/my_lib/tests/test_my_lib.py +++ b/e2e_projects/my_lib/tests/test_my_lib.py @@ -31,6 +31,28 @@ def test_point(): def test_point_from_coords(): assert Point.from_coords((1, 2)).x == 1 + +def test_point_skip_static_decorator_pragma(): + assert Point.skip_static_decorator_pragma(3, 4) == 11 + + +def test_point_skip_class_decorator_pragma(): + p = Point.skip_class_decorator_pragma(5) + assert p.x == 6 + assert p.y == 10 + + +def test_point_skip_instance_method_pragma(): + p = Point(3, 4) + assert p.skip_instance_method_pragma() == 11 + + +def test_point_skip_multi_decorator(): + p = Point.skip_multi_decorator(5) + assert p.x == 6 + assert p.y == 10 + + def test_fibonacci(): assert fibonacci(1) == 1 assert cached_fibonacci(1) == 1 @@ -66,3 +88,60 @@ def test_signature_functions_are_callable(): def test_signature_is_coroutine(): assert asyncio.iscoroutinefunction(async_consumer) + + +# Tests for enum mutation +def test_color_enum_values(): + assert Color.RED.value == 1 + assert Color.GREEN.value == 2 + assert Color.BLUE.value == 3 + + +def test_color_is_primary(): + assert Color.RED.is_primary() is True + assert Color.GREEN.is_primary() is True + + +def test_color_darken(): + assert Color.GREEN.darken() == 1 + assert Color.BLUE.darken() == 2 + + +def test_color_from_name(): + assert Color.from_name("red") == Color.RED + assert Color.from_name("BLUE") == Color.BLUE + + +def test_color_default(): + assert Color.default() == Color.RED + + +def test_skip_this_function(): + assert skip_this_function() == 7 + + +def test_also_skip_this_function(): + assert also_skip_this_function() == "should not mutate" + + +def test_skip_this_class(): + obj = SkipThisClass() + assert obj.method_one() == 3 + assert obj.method_two() == "hello world" + assert SkipThisClass.static_method() == 12 + + +def test_also_skip_this_class(): + obj = AlsoSkipThisClass() + assert obj.VALUE == 30 + assert obj.compute() == 60 + + +def test_pragma_on_staticmethod_decorator(): + assert Point.pragma_on_staticmethod_decorator(3, 4) == 11 + + +def test_pragma_on_classmethod_decorator(): + p = Point.pragma_on_classmethod_decorator(5) + assert p.x == 6 + assert p.y == 10 diff --git a/scripts/run_tests.sh b/scripts/run_tests.sh index 1bc2086e..c91a9dee 100755 --- a/scripts/run_tests.sh +++ b/scripts/run_tests.sh @@ -1,5 +1,72 @@ #!/bin/bash set -e cd "$(dirname "$0")/.." -docker build -t mutmut -f ./docker/Dockerfile.test . -docker run --rm -t -v "$(pwd)":/mutmut mutmut "$@" + +usage() { + echo "Usage: $0 [--py 3.10,3.12,3.14] [--ff] [-- pytest args...]" + echo " --py Comma-separated Python versions to test." + echo " Default: 3.10" + echo " --ff Stop on first failure instead of running all versions." + echo "" + echo " Everything after '--' is forwarded to pytest." + exit 1 +} + +PY_VERSIONS="3.10" +FAIL_FAST=false + +while [[ $# -gt 0 ]]; do + case "$1" in + --py) + PY_VERSIONS="$2" + shift 2 + ;; + --ff) + FAIL_FAST=true + shift + ;; + -h|--help) + usage + ;; + --) + shift + break + ;; + *) + break + ;; + esac +done + +IFS=',' read -r -a VERSIONS <<< "$PY_VERSIONS" +RESULTS=() + +print_results() { + echo "" + echo "=== Results ===" + for RESULT in "${RESULTS[@]}"; do + echo " $RESULT" + done +} + +for VER in "${VERSIONS[@]}"; do + IMAGE_NAME="mutmut-test-${VER}" + docker build -t "$IMAGE_NAME" --build-arg "PYTHON_VERSION=$VER" -f ./docker/Dockerfile.test . + if docker run --rm -t -v "$(pwd)":/mutmut "$IMAGE_NAME" "$@"; then + RESULTS+=("Python $VER: PASSED") + else + RESULTS+=("Python $VER: FAILED") + if [[ "$FAIL_FAST" == true ]]; then + print_results + exit 1 + fi + fi +done + +print_results + +for RESULT in "${RESULTS[@]}"; do + if [[ "$RESULT" == *FAILED* ]]; then + exit 1 + fi +done diff --git a/src/mutmut/__init__.py b/src/mutmut/__init__.py index f77effc2..d50ba1b6 100644 --- a/src/mutmut/__init__.py +++ b/src/mutmut/__init__.py @@ -1,31 +1,42 @@ from __future__ import annotations import importlib.metadata +import warnings from collections import defaultdict -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from mutmut.__main__ import Config +from mutmut.configuration import Config __version__ = importlib.metadata.version("mutmut") -duration_by_test: defaultdict[str, float] = defaultdict(float) stats_time: float | None = None -config: Config | None = None +duration_by_test: dict[str, float] = defaultdict(float) +tests_by_mangled_function_name: dict[str, set[str]] = defaultdict(set) _stats: set[str] = set() -tests_by_mangled_function_name: defaultdict[str, set[str]] = defaultdict(set) _covered_lines: dict[str, set[int]] | None = None +def __getattr__(name: str) -> object: + match name: + case "config": + warnings.warn( + "mutmut.config is deprecated as of 3.4.1, use mutmut.configuration.Config.get() instead", + FutureWarning, + stacklevel=2, + ) + return Config.get() + case _: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def _reset_globals() -> None: - global duration_by_test, stats_time, config, _stats, tests_by_mangled_function_name + global duration_by_test, stats_time, _stats, tests_by_mangled_function_name global _covered_lines duration_by_test.clear() stats_time = None - config = None + Config.reset() _stats = set() tests_by_mangled_function_name = defaultdict(set) _covered_lines = None diff --git a/src/mutmut/__main__.py b/src/mutmut/__main__.py index 2a710c87..5f0f07ce 100644 --- a/src/mutmut/__main__.py +++ b/src/mutmut/__main__.py @@ -7,8 +7,9 @@ from collections.abc import Iterator from typing import Any -from mutmut.type_checking import TypeCheckingError -from mutmut.type_checking import run_type_checker +from mutmut.utils.file_utils import change_cwd +from mutmut.utils.format_utils import get_mutant_name +from mutmut.utils.format_utils import strip_prefix if platform.system() == "Windows": print( @@ -23,15 +24,12 @@ import json import resource import shutil -import signal import subprocess import warnings from abc import ABC from collections import defaultdict -from configparser import ConfigParser -from configparser import NoOptionError -from configparser import NoSectionError -from contextlib import contextmanager +from collections.abc import Callable +from collections.abc import Sequence from dataclasses import dataclass from dataclasses import field from datetime import datetime @@ -40,7 +38,6 @@ from io import TextIOBase from json import JSONDecodeError from math import ceil -from multiprocessing import Lock from multiprocessing import Pool from multiprocessing import set_start_method from os import makedirs @@ -48,21 +45,24 @@ from os.path import isdir from os.path import isfile from pathlib import Path -from signal import SIGTERM from threading import Thread from time import process_time -from time import sleep +from types import TracebackType import click import libcst as cst from rich.text import Text -from setproctitle import setproctitle import mutmut from mutmut.code_coverage import gather_coverage from mutmut.code_coverage import get_covered_lines_for_file -from mutmut.file_mutation import mutate_file_contents -from mutmut.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.configuration import Config +from mutmut.mutation.data import SourceFileMutationData +from mutmut.mutation.file_mutation import filter_mutants_with_type_checker +from mutmut.mutation.file_mutation import mutate_file_contents +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.threading.timeout import register_timeout +from mutmut.utils.safe_setproctitle import safe_setproctitle as setproctitle # Document: surviving mutants are retested when you ask mutmut to retest them, interactively in the UI or via command line @@ -109,37 +109,11 @@ exit_code_to_emoji = {exit_code: emoji_by_status[status] for exit_code, status in status_by_exit_code.items()} -def guess_paths_to_mutate() -> list[Path]: - """Guess the path to source code to mutate""" - this_dir = os.getcwd().split(os.sep)[-1] - if isdir("lib"): - return [Path("lib")] - elif isdir("src"): - return [Path("src")] - elif isdir(this_dir): - return [Path(this_dir)] - elif isdir(this_dir.replace("-", "_")): - return [Path(this_dir.replace("-", "_"))] - elif isdir(this_dir.replace(" ", "_")): - return [Path(this_dir.replace(" ", "_"))] - elif isdir(this_dir.replace("-", "")): - return [Path(this_dir.replace("-", ""))] - elif isdir(this_dir.replace(" ", "")): - return [Path(this_dir.replace(" ", ""))] - if isfile(this_dir + ".py"): - return [Path(this_dir + ".py")] - raise FileNotFoundError( - "Could not figure out where the code to mutate is. " - 'Please specify it by adding "paths_to_mutate=code_dir" in setup.cfg to the [mutmut] section.' - ) - - def record_trampoline_hit(name: str) -> None: assert not name.startswith("src."), "Failed trampoline hit. Module name starts with `src.`, which is invalid" - assert mutmut.config is not None - if mutmut.config.max_stack_depth != -1: + if Config.get().max_stack_depth != -1: f = inspect.currentframe() - c = mutmut.config.max_stack_depth + c = Config.get().max_stack_depth while c and f: filename = f.f_code.co_filename if "pytest" in filename or "hammett" in filename or "unittest" in filename: @@ -153,9 +127,8 @@ def record_trampoline_hit(name: str) -> None: mutmut._stats.add(name) -def walk_all_files() -> Iterable[tuple[str, str]]: - assert mutmut.config is not None - for path in mutmut.config.paths_to_mutate: +def walk_all_files() -> Iterator[tuple[str, str]]: + for path in Config.get().paths_to_mutate: if not isdir(path): if isfile(path): yield "", str(path) @@ -165,7 +138,7 @@ def walk_all_files() -> Iterable[tuple[str, str]]: yield root, filename -def walk_source_files() -> Iterable[Path]: +def walk_source_files() -> Iterator[Path]: for root, filename in walk_all_files(): if filename.endswith(".py"): yield Path(root) / filename @@ -249,8 +222,8 @@ def create_file_mutants(path: Path) -> FileMutationResult: print(path) output_path = Path("mutants") / path makedirs(output_path.parent, exist_ok=True) - assert mutmut.config is not None - if mutmut.config.should_ignore_for_mutation(path): + + if Config.get().should_ignore_for_mutation(path): shutil.copy(path, output_path) return FileMutationResult(ignored=True) else: @@ -275,15 +248,13 @@ def setup_source_paths() -> None: def store_lines_covered_by_tests() -> None: - assert mutmut.config is not None - if mutmut.config.mutate_only_covered_lines: + if Config.get().mutate_only_covered_lines: mutmut._covered_lines = gather_coverage(PytestRunner(), list(walk_source_files())) def copy_also_copy_files() -> None: - assert mutmut.config is not None - assert isinstance(mutmut.config.also_copy, list) - for path in mutmut.config.also_copy: + assert isinstance(Config.get().also_copy, list) + for path in Config.get().also_copy: print(" also copying", path) path = Path(path) destination = Path("mutants") / path @@ -347,17 +318,7 @@ def create_mutants_for_file(filename: Path, output_path: Path) -> FileMutationRe return FileMutationResult(warnings=warnings) -def get_mutant_name(relative_source_path: Path, mutant_method_name: str) -> str: - module_name = str(relative_source_path)[: -len(relative_source_path.suffix)].replace(os.sep, ".") - module_name = strip_prefix(module_name, prefix="src.") - - # FYI, we currently use "mutant_name" inconsistently, for both the whole identifier including the path and only the mangled method name - mutant_name = f"{module_name}.{mutant_method_name}" - mutant_name = mutant_name.replace(".__init__.", ".") - return mutant_name - - -def write_all_mutants_to_file(*, out: Any, source: str, filename: str | Path) -> Any: +def write_all_mutants_to_file(*, out: TextIOBase, source: str, filename: Path) -> Sequence[str]: result, mutant_names = mutate_file_contents( str(filename), source, get_covered_lines_for_file(str(filename), mutmut._covered_lines) ) @@ -366,167 +327,10 @@ def write_all_mutants_to_file(*, out: Any, source: str, filename: str | Path) -> return mutant_names -class SourceFileMutationData: - def __init__(self, *, path: Path) -> None: - self.estimated_time_of_tests_by_mutant: dict[str, float] = {} - self.path = path - self.meta_path = Path("mutants") / (str(path) + ".meta") - self.key_by_pid: dict[int, str] = {} - self.exit_code_by_key: dict[str, int | None] = {} - self.durations_by_key: dict[str, float] = {} - self.type_check_error_by_key: dict[str, str] = {} - self.start_time_by_pid: dict[int, datetime] = {} - - def load(self) -> None: - try: - with open(self.meta_path) as f: - meta = json.load(f) - except FileNotFoundError: - return - - self.exit_code_by_key = meta.pop("exit_code_by_key") - self.durations_by_key = meta.pop("durations_by_key") - self.estimated_time_of_tests_by_mutant = meta.pop("estimated_durations_by_key") - self.type_check_error_by_key = meta.pop("type_check_error_by_key") - assert not meta, f"Meta file {self.meta_path} contains unexpected keys: {set(meta.keys())}" - - def register_pid(self, *, pid: int, key: str) -> None: - self.key_by_pid[pid] = key - with START_TIMES_BY_PID_LOCK: - self.start_time_by_pid[pid] = datetime.now() - - def register_result(self, *, pid: int, exit_code: int) -> None: - assert self.key_by_pid[pid] in self.exit_code_by_key - key = self.key_by_pid[pid] - self.exit_code_by_key[key] = exit_code - self.durations_by_key[key] = (datetime.now() - self.start_time_by_pid[pid]).total_seconds() - # TODO: maybe rate limit this? Saving on each result can slow down mutation testing a lot if the test run is fast. - del self.key_by_pid[pid] - with START_TIMES_BY_PID_LOCK: - del self.start_time_by_pid[pid] - self.save() - - def stop_children(self) -> None: - for pid in self.key_by_pid.keys(): - os.kill(pid, SIGTERM) - - def save(self) -> None: - with open(self.meta_path, "w") as f: - json.dump( - dict( - exit_code_by_key=self.exit_code_by_key, - durations_by_key=self.durations_by_key, - type_check_error_by_key=self.type_check_error_by_key, - estimated_durations_by_key=self.estimated_time_of_tests_by_mutant, - ), - f, - indent=4, - ) - - -def filter_mutants_with_type_checker() -> dict[str, FailedTypeCheckMutant]: - assert mutmut.config is not None - with change_cwd(Path("mutants")): - errors = run_type_checker(mutmut.config.type_check_command) - errors_by_path = group_by_path(errors) - - mutants_to_skip: dict[str, FailedTypeCheckMutant] = {} - - for path, errors_of_file in errors_by_path.items(): - with open(path, encoding="utf-8") as file: - source = file.read() - wrapper = cst.MetadataWrapper(cst.parse_module(source)) - visitor = MutatedMethodsCollector(path) - wrapper.visit(visitor) - mutated_methods = visitor.found_mutants - - for error in errors_of_file: - assert error.file_path == visitor.file - mutant = next( - (m for m in mutated_methods if m.line_number_start <= error.line_number <= m.line_number_end), None - ) - if mutant is None: - raise Exception( - f"Could not find mutant for type error {error.file_path}:{error.line_number} ({error.error_description}). " - "Probably, a code mutation influenced types in unexpected locations. " - "If your project normally has no type errors and uses mypy/pyrefly, please file an issue with steps to reproduce on github." - ) - - mutant_name = get_mutant_name(path.relative_to(Path(".").absolute()), mutant.function_name) - - mutants_to_skip[mutant_name] = FailedTypeCheckMutant( - method_location=mutant, - name=mutant_name, - error=error, - ) - - return mutants_to_skip - - -def group_by_path(errors: list[TypeCheckingError]) -> dict[Path, list[TypeCheckingError]]: - grouped: dict[Path, list[TypeCheckingError]] = defaultdict(list) - - for error in errors: - grouped[error.file_path].append(error) - - return grouped - - -@dataclass -class MutatedMethodLocation: - file: Path - function_name: str - line_number_start: int - line_number_end: int - - -@dataclass -class FailedTypeCheckMutant: - method_location: MutatedMethodLocation - name: str - error: TypeCheckingError - - -class MutatedMethodsCollector(cst.CSTVisitor): - METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,) - - def __init__(self, file: Path): - self.file = file - self.found_mutants: list[MutatedMethodLocation] = [] - - def visit_FunctionDef(self, node: cst.FunctionDef) -> bool: - name = node.name.value - if is_mutated_method_name(name): - range = self.get_metadata(cst.metadata.PositionProvider, node) - self.found_mutants.append( - MutatedMethodLocation( - file=self.file, - function_name=name, - line_number_start=range.start.line, - line_number_end=range.end.line, - ) - ) - - # do not continue visting children of this function - # mutated methods are not nested within other methods - return False - - -def is_mutated_method_name(name: str) -> bool: - return name.startswith(("x_", "xǁ")) and "__mutmut" in name - - -def unused(*_: Any) -> None: +def unused(*_: object) -> None: pass -def strip_prefix(s: str, *, prefix: str, strict: bool = False) -> str: - if s.startswith(prefix): - return s[len(prefix) :] - assert strict is False, f"String '{s}' does not start with prefix '{prefix}'" - return s - - class TestRunner(ABC): def run_stats(self, *, tests: Iterable[str]) -> int: raise NotImplementedError() @@ -544,16 +348,6 @@ def list_all_tests(self) -> ListAllTestsResult: raise NotImplementedError() -@contextmanager -def change_cwd(path: str | Path) -> Iterator[None]: - old_cwd = os.path.abspath(os.getcwd()) - os.chdir(path) - try: - yield - finally: - os.chdir(old_cwd) - - def collected_test_names() -> set[str]: return set(mutmut.duration_by_test.keys()) @@ -583,25 +377,23 @@ def new_tests(self) -> set[str]: class PytestRunner(TestRunner): def __init__(self) -> None: - assert mutmut.config is not None - self._pytest_add_cli_args: list[str] = mutmut.config.pytest_add_cli_args - self._pytest_add_cli_args_test_selection: list[str] = mutmut.config.pytest_add_cli_args_test_selection + self._pytest_add_cli_args: list[str] = Config.get().pytest_add_cli_args + self._pytest_add_cli_args_test_selection: list[str] = Config.get().pytest_add_cli_args_test_selection # tests_dir is a special case of a test selection option, # so also use pytest_add_cli_args_test_selection for the implementation - self._pytest_add_cli_args_test_selection += mutmut.config.tests_dir + self._pytest_add_cli_args_test_selection += Config.get().tests_dir # noinspection PyMethodMayBeStatic def execute_pytest(self, params: list[str], **kwargs: Any) -> int: import pytest params = ["--rootdir=.", "--tb=native"] + params + self._pytest_add_cli_args - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: params = ["-vv"] + params print("python -m pytest ", " ".join([f'"{param}"' for param in params])) exit_code = int(pytest.main(params, **kwargs)) - if mutmut.config.debug: + if Config.get().debug: print(" exit code", exit_code) if exit_code == 4: raise BadTestExecutionCommandsException(params) @@ -662,8 +454,7 @@ def pytest_deselected(self, items: Any) -> None: collector = TestsCollector() - assert mutmut.config is not None - tests_dir = mutmut.config.tests_dir # noqa: F841 + tests_dir = Config.get().tests_dir pytest_args = ["-x", "-q", "--collect-only"] + self._pytest_add_cli_args_test_selection with change_cwd("mutants"): @@ -746,7 +537,7 @@ def orig_function_and_class_names_from_key(mutant_name: str) -> tuple[str, str | spinner = itertools.cycle("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏") -def status_printer() -> Any: +def status_printer() -> Callable[..., None]: """Manage the printing and in-place updating of a line of characters .. note:: @@ -842,10 +633,14 @@ def run_forced_fail_test(runner: TestRunner) -> None: class CatchOutput: - def __init__(self, callback: Any = lambda s: None, spinner_title: str | None = None) -> None: + def __init__( + self, + callback: Callable[[str], None] = lambda s: None, + spinner_title: str | None = None, + ) -> None: self.strings: list[str] = [] self.spinner_title = spinner_title or "" - if mutmut.config is not None and mutmut.config.debug: + if Config.get().debug: self.spinner_title += "\n" class StdOutRedirect(TextIOBase): @@ -871,8 +666,7 @@ def start(self) -> None: print_status(self.spinner_title) sys.stdout = self.redirect sys.stderr = self.redirect - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: self.stop() def dump_output(self) -> None: @@ -885,112 +679,17 @@ def __enter__(self) -> CatchOutput: self.start() return self - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: self.stop() if self.spinner_title: print() -@dataclass -class Config: - also_copy: list[Path] - do_not_mutate: list[str] - max_stack_depth: int - debug: bool - paths_to_mutate: list[Path] - pytest_add_cli_args: list[str] - pytest_add_cli_args_test_selection: list[str] - tests_dir: list[str] - mutate_only_covered_lines: bool - type_check_command: list[str] - - def should_ignore_for_mutation(self, path: Path | str) -> bool: - if not str(path).endswith(".py"): - return True - for p in self.do_not_mutate: - if fnmatch.fnmatch(str(path), p): - return True - return False - - -def config_reader() -> Any: - path = Path("pyproject.toml") - if path.exists(): - if sys.version_info >= (3, 11): - from tomllib import loads - else: - # noinspection PyPackageRequirements - from toml import loads - data = loads(path.read_text("utf-8")) - - try: - config = data["tool"]["mutmut"] - except KeyError: - pass - else: - - def _toml_reader(key: str, default: Any) -> Any: - try: - result = config[key] - except KeyError: - return default - return result - - return _toml_reader - - config_parser = ConfigParser() - config_parser.read("setup.cfg") - - def _cfg_reader(key: str, default: Any) -> Any: - try: - result: Any = config_parser.get("mutmut", key) - except (NoOptionError, NoSectionError): - return default - if isinstance(default, list): - if "\n" in result: - result = [x for x in result.split("\n") if x] - else: - result = [result] - elif isinstance(default, bool): - result = result.lower() in ("1", "t", "true") - elif isinstance(default, int): - result = int(result) - return result - - return _cfg_reader - - -def ensure_config_loaded() -> None: - if mutmut.config is None: - mutmut.config = load_config() - - -def load_config() -> Config: - s = config_reader() - - return Config( - do_not_mutate=s("do_not_mutate", []), - also_copy=[Path(y) for y in s("also_copy", [])] - + [ - Path("tests/"), - Path("test/"), - Path("setup.cfg"), - Path("pyproject.toml"), - Path("pytest.ini"), - Path(".gitignore"), - ] - + list(Path(".").glob("test*.py")), - max_stack_depth=s("max_stack_depth", -1), - debug=s("debug", False), - mutate_only_covered_lines=s("mutate_only_covered_lines", False), - paths_to_mutate=[Path(y) for y in s("paths_to_mutate", [])] or guess_paths_to_mutate(), - tests_dir=s("tests_dir", []), - pytest_add_cli_args=s("pytest_add_cli_args", []), - pytest_add_cli_args_test_selection=s("pytest_add_cli_args_test_selection", []), - type_check_command=s("type_check_command", []), - ) - - @click.group() @click.version_option() def cli() -> None: @@ -1017,8 +716,7 @@ def run_stats_collection(runner: TestRunner, tests: Iterable[str] | None = None) print( "Stopping early, because we could not find any test case for any mutant. It seems that the selected tests do not cover any code that we mutated." ) - assert mutmut.config is not None - if not mutmut.config.debug: + if not Config.get().debug: print("You can set debug=true to see the executed test names in the output above.") else: print("In the last pytest run above, you can see which tests we executed.") @@ -1117,13 +815,12 @@ def save_cicd_stats(source_file_mutation_data_by_path: dict[str, SourceFileMutat # exports CI/CD stats to block pull requests from merging if mutation score is too low, or used in other ways in CI/CD pipelines @cli.command() def export_cicd_stats() -> None: - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() source_file_mutation_data_by_path: dict[str, SourceFileMutationData] = {} for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue meta_path = Path("mutants") / (str(path) + ".meta") @@ -1147,12 +844,14 @@ def export_cicd_stats() -> None: def collect_source_file_mutation_data( *, mutant_names: tuple[str, ...] | list[str] -) -> tuple[list[tuple[SourceFileMutationData, str, int | None]], dict[str, SourceFileMutationData]]: - assert mutmut.config is not None +) -> tuple[ + list[tuple[SourceFileMutationData, str, int | None]], + dict[str, SourceFileMutationData], +]: source_file_mutation_data_by_path: dict[str, SourceFileMutationData] = {} for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue assert str(path) not in source_file_mutation_data_by_path m = SourceFileMutationData(path=path) @@ -1185,7 +884,7 @@ def estimated_worst_case_time(mutant_name: str) -> float: @click.argument("mutant_names", required=False, nargs=-1) def print_time_estimates(mutant_names: tuple[str, ...]) -> None: assert isinstance(mutant_names, (tuple, list)), mutant_names - ensure_config_loaded() + Config.ensure_loaded() runner = PytestRunner() runner.prepare_main_test_run() @@ -1205,12 +904,12 @@ def print_time_estimates(mutant_names: tuple[str, ...]) -> None: @cli.command() @click.argument("mutant_name", required=True, nargs=1) -def tests_for_mutant(mutant_name: str) -> None: +def tests_for_mutant(mutant_name: tuple[str, ...]) -> None: if not load_stats(): print("Failed to load stats. Please run mutmut first to collect stats.") exit(1) - tests = tests_for_mutant_names([mutant_name]) + tests = tests_for_mutant_names(mutant_name) for test in sorted(tests): print(test) @@ -1222,34 +921,12 @@ def stop_all_children(mutants: list[tuple[SourceFileMutationData, str, int | Non # used to copy the global mutmut.config to subprocesses set_start_method("fork") -START_TIMES_BY_PID_LOCK = Lock() - - -def timeout_checker(mutants: list[tuple[SourceFileMutationData, str, int | None]]) -> Any: - def inner_timeout_checker() -> None: - while True: - sleep(1) - - now = datetime.now() - for m, mutant_name, result in mutants: - # copy dict inside lock, so it is not modified by another process while we iterate it - with START_TIMES_BY_PID_LOCK: - start_times_by_pid = dict(m.start_time_by_pid) - for pid, start_time in start_times_by_pid.items(): - run_time = now - start_time - if run_time.total_seconds() > (m.estimated_time_of_tests_by_mutant[mutant_name] + 1) * 15: - try: - os.kill(pid, signal.SIGXCPU) - except ProcessLookupError: - pass - - return inner_timeout_checker @cli.command() @click.option("--max-children", type=int) @click.argument("mutant_names", required=False, nargs=-1) -def run(mutant_names: tuple[str, ...], *, max_children: int | None) -> None: +def run(mutant_names: tuple[str, ...] | list[str], *, max_children: int | None) -> None: assert isinstance(mutant_names, (tuple, list)), mutant_names _run(mutant_names, max_children) @@ -1259,8 +936,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> # TODO: run no-ops once in a while to detect if we get false negatives # TODO: we should be able to get information on which tests killed mutants, which means we can get a list of tests and how many mutants each test kills. Those that kill zero mutants are redundant! os.environ["MUTANT_UNDER_TEST"] = "mutant_generation" - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() if max_children is None: max_children = os.cpu_count() or 4 @@ -1279,7 +955,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> f" done in {round(time.total_seconds() * 1000)}ms ({stats.mutated} files mutated, {stats.ignored} ignored, {stats.unmodified} unmodified)", ) - if mutmut.config.type_check_command: + if Config.get().type_check_command: with CatchOutput(spinner_title="Filtering mutations with type checker"): mutants_caught_by_type_checker = filter_mutants_with_type_checker() else: @@ -1315,8 +991,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> def read_one_child_exit_status() -> None: pid, wait_status = os.wait() exit_code = os.waitstatus_to_exitcode(wait_status) - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: print(" worker exit code", exit_code) source_file_mutation_data_by_pid[pid].register_result(pid=pid, exit_code=exit_code) @@ -1326,48 +1001,37 @@ def read_one_child_exit_status() -> None: # Run estimated fast mutants first, calculated as the estimated time for a surviving mutant. mutants = sorted(mutants, key=lambda x: estimated_worst_case_time(x[1])) - - gc.freeze() - start = datetime.now() try: + gc.freeze() print("Running mutation testing") - # Calculate times of tests - for m, mutant_name, result in mutants: + # Now do mutation + for mutation_data, mutant_name, result in mutants: mutant_name = mutant_name.replace("__init__.", "") tests = mutmut.tests_by_mangled_function_name.get(mangled_name_from_mutant_name(mutant_name), set()) estimated_time_of_tests = sum(mutmut.duration_by_test[test_name] for test_name in tests) - m.estimated_time_of_tests_by_mutant[mutant_name] = estimated_time_of_tests - - Thread(target=timeout_checker(mutants), daemon=True).start() - - # Now do mutation - for m, mutant_name, result in mutants: + mutation_data.estimated_time_of_tests_by_mutant[mutant_name] = estimated_time_of_tests print_stats(source_file_mutation_data_by_path) - mutant_name = mutant_name.replace("__init__.", "") - # Rerun mutant if it's explicitly mentioned, but otherwise let the result stand if not mutant_names and result is not None: continue - tests = mutmut.tests_by_mangled_function_name.get(mangled_name_from_mutant_name(mutant_name), set()) - if not tests: - m.exit_code_by_key[mutant_name] = 33 - m.save() + mutation_data.exit_code_by_key[mutant_name] = 33 + mutation_data.save() continue failed_type_check_mutant = mutants_caught_by_type_checker.get(mutant_name) if failed_type_check_mutant: - m.exit_code_by_key[mutant_name] = 37 - m.type_check_error_by_key[mutant_name] = failed_type_check_mutant.error.error_description - m.save() + mutation_data.exit_code_by_key[mutant_name] = 37 + mutation_data.type_check_error_by_key[mutant_name] = failed_type_check_mutant.error.error_description + mutation_data.save() continue pid = os.fork() - if not pid: + if pid == 0: # In the child os.environ["MUTANT_UNDER_TEST"] = mutant_name setproctitle(f"mutmut: {mutant_name}") @@ -1377,21 +1041,22 @@ def read_one_child_exit_status() -> None: if not sorted_tests: os._exit(33) - estimated_time_of_tests = m.estimated_time_of_tests_by_mutant[mutant_name] - cpu_time_limit = ceil((estimated_time_of_tests + 1) * 30 + process_time()) + cpu_time_limit_s = ceil((estimated_time_of_tests + 1) * 30 + process_time()) # signal SIGXCPU after . One second later signal SIGKILL if it is still running - resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit, cpu_time_limit + 1)) + resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit_s, cpu_time_limit_s + 1)) with CatchOutput(): - test_result = runner.run_tests(mutant_name=mutant_name, tests=sorted_tests) + result = runner.run_tests(mutant_name=mutant_name, tests=sorted_tests) - if test_result != 0: + if result != 0: pass - os._exit(test_result) + os._exit(result) else: # in the parent - source_file_mutation_data_by_pid[pid] = m - m.register_pid(pid=pid, key=mutant_name) + wall_time_limit_s = (estimated_time_of_tests + 1) * 15 + register_timeout(pid=pid, timeout_s=wall_time_limit_s) + source_file_mutation_data_by_pid[pid] = mutation_data + mutation_data.register_pid(pid=pid, key=mutant_name) running_children += 1 if running_children >= max_children: @@ -1409,12 +1074,14 @@ def read_one_child_exit_status() -> None: except KeyboardInterrupt: print("Stopping...") stop_all_children(mutants) + finally: + gc.unfreeze() - t = datetime.now() - start + elapsed_time = datetime.now() - start print_stats(source_file_mutation_data_by_path, force_output=True) print() - print(f"{count_tried / t.total_seconds():.2f} mutations/second") + print(f"{count_tried / elapsed_time.total_seconds():.2f} mutations/second") if mutant_names: print() @@ -1446,7 +1113,7 @@ def tests_for_mutant_names(mutant_names: tuple[str, ...] | list[str]) -> set[str @cli.command() @click.option("--all", default=False) def results(all: bool) -> None: - ensure_config_loaded() + Config.ensure_loaded() for path in walk_source_files(): if not str(path).endswith(".py"): continue @@ -1502,9 +1169,8 @@ def read_mutant_function(module: cst.Module, mutant_name: str) -> cst.FunctionDe def find_mutant(mutant_name: str) -> SourceFileMutationData: - assert mutmut.config is not None for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue m = SourceFileMutationData(path=path) @@ -1515,7 +1181,11 @@ def find_mutant(mutant_name: str) -> SourceFileMutationData: raise FileNotFoundError(f"Could not find mutant {mutant_name}") -def get_diff_for_mutant(mutant_name: str, source: str | None = None, path: Path | None = None) -> str: +def get_diff_for_mutant( + mutant_name: str, + source: str | None = None, + path: Path | str | None = None, +) -> str: if path is None: m = find_mutant(mutant_name) path = m.path @@ -1546,7 +1216,7 @@ def get_diff_for_mutant(mutant_name: str, source: str | None = None, path: Path @cli.command() @click.argument("mutant_name") def show(mutant_name: str) -> None: - ensure_config_loaded() + Config.ensure_loaded() print(get_diff_for_mutant(mutant_name)) return @@ -1555,7 +1225,7 @@ def show(mutant_name: str) -> None: @click.argument("mutant_name") def apply(mutant_name: str) -> None: # try: - ensure_config_loaded() + Config.ensure_loaded() apply_mutant(mutant_name) # except FileNotFoundError as e: # print(e) @@ -1586,7 +1256,7 @@ def apply_mutant(mutant_name: str) -> None: @cli.command() @click.option("--show-killed", is_flag=True, default=False, help="Display mutants killed by tests and type checker.") def browse(show_killed: bool) -> None: - ensure_config_loaded() + Config.ensure_loaded() from rich.syntax import Syntax from textual.app import App @@ -1596,8 +1266,8 @@ def browse(show_killed: bool) -> None: from textual.widgets import Footer from textual.widgets import Static - class ResultBrowser(App): # type: ignore[type-arg] - loading_id: str | None = None + class ResultBrowser(App[None]): + loading_id = None CSS_PATH = "result_browser_layout.tcss" BINDINGS = [ ("q", "quit()", "Quit"), @@ -1614,6 +1284,7 @@ class ResultBrowser(App): # type: ignore[type-arg] cursor_type = "row" source_file_mutation_data_and_stat_by_path: dict[str, tuple[SourceFileMutationData, Stat]] = {} + path_by_name: dict[str, Path] = {} def compose(self) -> Iterable[Any]: with Container(classes="container"): @@ -1640,13 +1311,12 @@ def on_mount(self) -> None: self.populate_files_table() def read_data(self) -> None: - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() self.source_file_mutation_data_and_stat_by_path = {} self.path_by_name: dict[str, Path] = {} for p in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(p): + if Config.get().should_ignore_for_mutation(p): continue source_file_mutation_data = SourceFileMutationData(path=p) source_file_mutation_data.load() @@ -1736,7 +1406,7 @@ def on_data_table_row_highlighted(self, event: Any) -> None: diff_view.update("") def load_thread() -> None: - ensure_config_loaded() + Config.ensure_loaded() try: d = get_diff_for_mutant(event.row_key.value, path=path) if event.row_key.value == self.loading_id: @@ -1747,10 +1417,14 @@ def load_thread() -> None: t = Thread(target=load_thread) t.start() - def retest(self, pattern: str) -> None: + def retest(self, pattern: str | None) -> None: + if pattern is None: + return self._run_subprocess_command("run", [pattern]) - def view_tests(self, mutant_name: str) -> None: + def view_tests(self, mutant_name: str | None) -> None: + if mutant_name is None: + return self._run_subprocess_command("tests-for-mutant", [mutant_name]) def _run_subprocess_command(self, command: str, args: list[str]) -> None: @@ -1769,14 +1443,13 @@ def get_mutant_name_from_selection(self) -> str | None: # noinspection PyTypeChecker mutants_table: DataTable[Any] = self.query_one("#mutants") # type: ignore[assignment] if mutants_table.cursor_row is None: - return + return None - return str(mutants_table.get_row_at(mutants_table.cursor_row)[0]) + result: str = mutants_table.get_row_at(mutants_table.cursor_row)[0] + return result def action_retest_mutant(self) -> None: - name = self.get_mutant_name_from_selection() - if name is not None: - self.retest(name) + self.retest(self.get_mutant_name_from_selection()) def action_retest_function(self) -> None: name = self.get_mutant_name_from_selection() @@ -1789,7 +1462,7 @@ def action_retest_module(self) -> None: self.retest(name.rpartition(".")[0] + ".*") def action_apply_mutant(self) -> None: - ensure_config_loaded() + Config.ensure_loaded() # noinspection PyTypeChecker mutants_table: DataTable[Any] = self.query_one("#mutants") # type: ignore[assignment] if mutants_table.cursor_row is None: diff --git a/src/mutmut/code_coverage.py b/src/mutmut/code_coverage.py index d8d69fbf..05a4f4dc 100644 --- a/src/mutmut/code_coverage.py +++ b/src/mutmut/code_coverage.py @@ -22,11 +22,11 @@ def get_covered_lines_for_file(filename: str, covered_lines: dict[str, set[int]] return None abs_filename = str((Path("mutants") / filename).absolute()) - lines = None + lines: set[int] = set() if abs_filename in covered_lines: - lines = covered_lines[abs_filename] + lines = set(covered_lines[abs_filename]) - return lines or set() + return lines # Gathers coverage for the given source files and @@ -54,11 +54,8 @@ def gather_coverage(runner: TestRunner, source_files: Iterable[Path]) -> dict[st for filename in source_files: abs_filename = str((mutants_path / filename).absolute()) - lines = coverage_data.lines(abs_filename) - if lines is None: - # file was not imported during test run, e.g. because test selection excluded this file - lines = [] - covered_lines[abs_filename] = set(lines) + lines = set(coverage_data.lines(abs_filename) or []) + covered_lines[abs_filename] = lines _unload_modules_not_in(modules) diff --git a/src/mutmut/configuration.py b/src/mutmut/configuration.py new file mode 100644 index 00000000..b8fa224a --- /dev/null +++ b/src/mutmut/configuration.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import fnmatch +import os +import platform +import sys +from collections.abc import Callable +from configparser import ConfigParser +from configparser import NoOptionError +from configparser import NoSectionError +from dataclasses import dataclass +from os.path import isdir +from os.path import isfile +from pathlib import Path +from typing import Any + + +def _config_reader() -> Callable[[str, Any], Any]: + path = Path("pyproject.toml") + if path.exists(): + if sys.version_info >= (3, 11): + from tomllib import loads + else: + # noinspection PyPackageRequirements + from toml import loads + data = loads(path.read_text("utf-8")) + + try: + config = data["tool"]["mutmut"] + except KeyError: + pass + else: + + def toml_conf(key: str, default: Any) -> Any: + try: + result = config[key] + except KeyError: + return default + return result + + return toml_conf + + config_parser = ConfigParser() + config_parser.read("setup.cfg") + + def setup_cfg_conf(key: str, default: Any) -> Any: + try: + result = config_parser.get("mutmut", key) + except (NoOptionError, NoSectionError): + return default + if isinstance(default, list): + if "\n" in result: + return [x for x in result.split("\n") if x] + else: + return [result] + elif isinstance(default, bool): + return result.lower() in ("1", "t", "true") + elif isinstance(default, int): + return int(result) + return result + + return setup_cfg_conf + + +def _guess_paths_to_mutate() -> list[str]: + """Guess the path to source code to mutate + + :rtype: str + """ + this_dir = os.getcwd().split(os.sep)[-1] + if isdir("lib"): + return ["lib"] + elif isdir("src"): + return ["src"] + elif isdir(this_dir): + return [this_dir] + elif isdir(this_dir.replace("-", "_")): + return [this_dir.replace("-", "_")] + elif isdir(this_dir.replace(" ", "_")): + return [this_dir.replace(" ", "_")] + elif isdir(this_dir.replace("-", "")): + return [this_dir.replace("-", "")] + elif isdir(this_dir.replace(" ", "")): + return [this_dir.replace(" ", "")] + if isfile(this_dir + ".py"): + return [this_dir + ".py"] + raise FileNotFoundError( + "Could not figure out where the code to mutate is. " + 'Please specify it by adding "paths_to_mutate=code_dir" in setup.cfg to the [mutmut] section.' + ) + + +def _load_config() -> Config: + s = _config_reader() + + return Config( + do_not_mutate=s("do_not_mutate", []), + also_copy=[Path(y) for y in s("also_copy", [])] + + [ + Path("tests/"), + Path("test/"), + Path("setup.cfg"), + Path("pyproject.toml"), + ] + + list(Path(".").glob("test*.py")), + max_stack_depth=s("max_stack_depth", -1), + debug=s("debug", False), + mutate_only_covered_lines=s("mutate_only_covered_lines", False), + paths_to_mutate=[Path(y) for y in s("paths_to_mutate", [])] or [Path(p) for p in _guess_paths_to_mutate()], + tests_dir=s("tests_dir", []), + pytest_add_cli_args=s("pytest_add_cli_args", []), + pytest_add_cli_args_test_selection=s("pytest_add_cli_args_test_selection", []), + type_check_command=s("type_check_command", []), + use_setproctitle=s( + "use_setproctitle", not platform.system() == "Darwin" + ), # False on Mac, true otherwise as default (https://github.com/boxed/mutmut/pull/450#issuecomment-4002571055) + ) + + +_config: Config | None = None + + +@dataclass +class Config: + also_copy: list[Path] + do_not_mutate: list[str] + max_stack_depth: int + debug: bool + paths_to_mutate: list[Path] + pytest_add_cli_args: list[str] + pytest_add_cli_args_test_selection: list[str] + tests_dir: list[str] + mutate_only_covered_lines: bool + type_check_command: list[str] + use_setproctitle: bool + + def should_ignore_for_mutation(self, path: Path | str) -> bool: + path_str = str(path) + if not path_str.endswith(".py"): + return True + for p in self.do_not_mutate: + if fnmatch.fnmatch(path_str, p): + return True + return False + + @staticmethod + def ensure_loaded() -> None: + global _config + if _config is None: + _config = _load_config() + + @staticmethod + def get() -> Config: + global _config + Config.ensure_loaded() + assert _config is not None + return _config + + @staticmethod + def reset() -> None: + global _config + _config = None diff --git a/src/mutmut/mutation/__init__.py b/src/mutmut/mutation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/mutation/data.py b/src/mutmut/mutation/data.py new file mode 100644 index 00000000..46a1f51b --- /dev/null +++ b/src/mutmut/mutation/data.py @@ -0,0 +1,61 @@ +import json +import os +import signal +from datetime import datetime +from pathlib import Path + + +class SourceFileMutationData: + def __init__(self, *, path: Path | str) -> None: + self.estimated_time_of_tests_by_mutant: dict[str, float] = {} + self.path = path + self.meta_path = Path("mutants") / (str(path) + ".meta") + self.key_by_pid: dict[int, str] = {} + self.exit_code_by_key: dict[str, int | None] = {} + self.durations_by_key: dict[str, float] = {} + self.start_time_by_pid: dict[int, datetime] = {} + self.type_check_error_by_key: dict[str, str | None] = {} + + def load(self) -> None: + try: + with open(self.meta_path) as f: + meta = json.load(f) + except FileNotFoundError: + return + + self.exit_code_by_key = meta.pop("exit_code_by_key") + self.type_check_error_by_key = meta.pop("type_check_error_by_key", {}) + self.durations_by_key = meta.pop("durations_by_key") + self.estimated_time_of_tests_by_mutant = meta.pop("estimated_durations_by_key") + assert not meta, f"Meta file {self.meta_path} constains unexpected keys: {set(meta.keys())}" + + def register_pid(self, *, pid: int, key: str) -> None: + self.key_by_pid[pid] = key + self.start_time_by_pid[pid] = datetime.now() + + def register_result(self, *, pid: int, exit_code: int) -> None: + assert self.key_by_pid[pid] in self.exit_code_by_key + key = self.key_by_pid[pid] + self.exit_code_by_key[key] = exit_code + self.durations_by_key[key] = (datetime.now() - self.start_time_by_pid[pid]).total_seconds() + # TODO: maybe rate limit this? Saving on each result can slow down mutation testing a lot if the test run is fast. + del self.key_by_pid[pid] + del self.start_time_by_pid[pid] + self.save() + + def stop_children(self) -> None: + for pid in self.key_by_pid.keys(): + os.kill(pid, signal.SIGTERM) + + def save(self) -> None: + with open(self.meta_path, "w") as f: + json.dump( + { + "exit_code_by_key": self.exit_code_by_key, + "type_check_error_by_key": self.type_check_error_by_key, + "durations_by_key": self.durations_by_key, + "estimated_durations_by_key": self.estimated_time_of_tests_by_mutant, + }, + f, + indent=4, + ) diff --git a/src/mutmut/mutation/enum_mutation.py b/src/mutmut/mutation/enum_mutation.py new file mode 100644 index 00000000..3b3c6317 --- /dev/null +++ b/src/mutmut/mutation/enum_mutation.py @@ -0,0 +1,34 @@ +"""Enum class detection and method type classification for mutation handling.""" + +import libcst as cst + +# Known enum base class names from the standard library +ENUM_BASE_CLASSES = frozenset({"Enum", "IntEnum", "Flag", "IntFlag", "StrEnum"}) + + +def is_enum_class(node: cst.ClassDef) -> bool: + """Check if a ClassDef inherits from any known enum base class. + + Works for: + - class Color(Enum): ... + - class Permission(Flag): ... + - class Status(enum.Enum): ... (Attribute access) + + Limitations: + - Cannot detect aliased imports: from enum import Enum as E + - Cannot detect custom enum base classes + """ + for base_arg in node.bases: + base = base_arg.value + + # Case 1: Simple name like `Enum`, `Flag`, `IntEnum` + if isinstance(base, cst.Name): + if base.value in ENUM_BASE_CLASSES: + return True + + # Case 2: Attribute access like `enum.Enum`, `enum.Flag` + elif isinstance(base, cst.Attribute): + if isinstance(base.attr, cst.Name) and base.attr.value in ENUM_BASE_CLASSES: + return True + + return False diff --git a/src/mutmut/file_mutation.py b/src/mutmut/mutation/file_mutation.py similarity index 50% rename from src/mutmut/file_mutation.py rename to src/mutmut/mutation/file_mutation.py index e3217282..ddeb80fa 100644 --- a/src/mutmut/file_mutation.py +++ b/src/mutmut/mutation/file_mutation.py @@ -5,18 +5,32 @@ from collections.abc import Mapping from collections.abc import Sequence from dataclasses import dataclass +from pathlib import Path from typing import Union +from typing import cast import libcst as cst import libcst.matchers as m from libcst.metadata import MetadataWrapper from libcst.metadata import PositionProvider -from mutmut.node_mutation import OPERATORS_TYPE -from mutmut.node_mutation import mutation_operators -from mutmut.trampoline_templates import create_trampoline_lookup -from mutmut.trampoline_templates import mangle_function_name -from mutmut.trampoline_templates import trampoline_impl +from mutmut.configuration import Config +from mutmut.mutation.enum_mutation import is_enum_class +from mutmut.mutation.mutators import OPERATORS_TYPE +from mutmut.mutation.mutators import MethodType +from mutmut.mutation.mutators import get_method_type +from mutmut.mutation.mutators import mutation_operators +from mutmut.mutation.pragma_handling import parse_pragma_lines +from mutmut.mutation.trampoline_templates import GENERATED_MARKER +from mutmut.mutation.trampoline_templates import build_enum_trampoline +from mutmut.mutation.trampoline_templates import build_function_trampoline +from mutmut.mutation.trampoline_templates import mangle_function_name +from mutmut.mutation.trampoline_templates import trampoline_impl +from mutmut.type_checking import TypeCheckingError +from mutmut.type_checking import run_type_checker +from mutmut.utils.file_utils import change_cwd +from mutmut.utils.format_utils import get_mutant_name +from mutmut.utils.format_utils import is_mutated_method_name NEVER_MUTATE_FUNCTION_NAMES = {"__getattribute__", "__setattr__", "__new__"} NEVER_MUTATE_FUNCTION_CALLS = {"len", "isinstance"} @@ -26,29 +40,45 @@ class Mutation: original_node: cst.CSTNode mutated_node: cst.CSTNode - contained_by_top_level_function: cst.CSTNode | None + contained_by_top_level_function: cst.FunctionDef | None -def mutate_file_contents(filename: str, code: str, covered_lines: set[int] | None = None) -> tuple[str, Sequence[str]]: +def mutate_file_contents( + filename: str, code: str, covered_lines: set[int] | None = None, mutate_enums: bool = True +) -> tuple[str, Sequence[str]]: """Create mutations for `code` and merge them to a single mutated file with trampolines. + :param mutate_enums: If True, enum classes will be mutated using external injection pattern. + If False, enum classes will be left unchanged. :return: A tuple of (mutated code, list of mutant function names)""" - module, mutations = create_mutations(code, covered_lines) + module, mutations, ignored_classes, ignored_functions = create_mutations(code, covered_lines) + + mutated_code, mutant_names = combine_mutations_to_source( + module, mutations, ignored_classes, ignored_functions, mutate_enums=mutate_enums + ) + + # TODO: implement function hashing to skip testing unchanged functions + + return mutated_code, mutant_names - return combine_mutations_to_source(module, mutations) +def create_mutations( + code: str, covered_lines: set[int] | None = None +) -> tuple[cst.Module, list[Mutation], set[str], set[str]]: + """Parse the code and create mutations. -def create_mutations(code: str, covered_lines: set[int] | None = None) -> tuple[cst.Module, list[Mutation]]: - """Parse the code and create mutations.""" - ignored_lines = pragma_no_mutate_lines(code) + :return: A tuple of (module, mutations, ignored_classes, ignored_functions)""" + ignored_lines, ignored_class_lines, ignored_function_lines = parse_pragma_lines(code) module = cst.parse_module(code) metadata_wrapper = MetadataWrapper(module) - visitor = MutationVisitor(mutation_operators, ignored_lines, covered_lines) + visitor = MutationVisitor( + mutation_operators, ignored_lines, covered_lines, ignored_class_lines, ignored_function_lines + ) module = metadata_wrapper.visit(visitor) - return module, visitor.mutations + return module, visitor.mutations, visitor.ignored_classes, visitor.ignored_functions class OuterFunctionProvider(cst.BatchableMetadataProvider[cst.CSTNode | None]): @@ -68,7 +98,7 @@ def bar(): def __init__(self) -> None: super().__init__() - def visit_Module(self, node: cst.Module) -> bool | None: + def visit_Module(self, node: cst.Module) -> bool: for child in node.body: if isinstance(child, cst.FunctionDef): # mark all nodes inside the function to belong to this function @@ -103,11 +133,22 @@ class MutationVisitor(cst.CSTVisitor): METADATA_DEPENDENCIES = (PositionProvider, OuterFunctionProvider) - def __init__(self, operators: OPERATORS_TYPE, ignore_lines: set[int], covered_lines: set[int] | None = None): + def __init__( + self, + operators: OPERATORS_TYPE, + ignore_lines: set[int], + covered_lines: set[int] | None = None, + ignored_class_lines: set[int] | None = None, + ignored_function_lines: set[int] | None = None, + ): self.mutations: list[Mutation] = [] self._operators = operators self._ignored_lines = ignore_lines self._covered_lines = covered_lines + self._ignored_class_lines = ignored_class_lines or set() + self._ignored_function_lines = ignored_function_lines or set() + self.ignored_classes: set[str] = set() + self.ignored_functions: set[str] = set() def on_visit(self, node: cst.CSTNode) -> bool: if self._skip_node_and_children(node): @@ -126,7 +167,7 @@ def _create_mutations(self, node: cst.CSTNode) -> None: mutation = Mutation( original_node=node, mutated_node=mutated_node, - contained_by_top_level_function=self.get_metadata(OuterFunctionProvider, node, None), + contained_by_top_level_function=self.get_metadata(OuterFunctionProvider, node, None), # type: ignore ) self.mutations.append(mutation) @@ -146,6 +187,20 @@ def _should_mutate_node(self, node: cst.CSTNode) -> bool: return True def _skip_node_and_children(self, node: cst.CSTNode) -> bool: + # Check if this is a class with pragma: no mutate class + if isinstance(node, cst.ClassDef): + position = self.get_metadata(PositionProvider, node, None) + if position and position.start.line in self._ignored_class_lines: + self.ignored_classes.add(node.name.value) + return True + + # Check if this is a function with pragma: no mutate function + if isinstance(node, cst.FunctionDef): + position = self.get_metadata(PositionProvider, node, None) + if position and position.start.line in self._ignored_function_lines: + self.ignored_functions.add(node.name.value) + return True + if ( isinstance(node, cst.Call) and isinstance(node.func, cst.Name) @@ -172,7 +227,14 @@ def _skip_node_and_children(self, node: cst.CSTNode) -> bool: # 1) copying them for the trampoline setup can cause side effects (e.g. multiple @app.post("/foo") definitions) # 2) decorators are executed when the function is defined, so we don't want to mutate their arguments and cause exceptions # 3) @property decorators break the trampoline signature assignment (which expects it to be a function) - if isinstance(node, (cst.FunctionDef, cst.ClassDef)) and len(node.decorators): + # Exception: @staticmethod and @classmethod are allowed because they are predictable and it's easy to set up trampolines for them + if isinstance(node, cst.FunctionDef) and len(node.decorators): + if len(node.decorators) == 1: + decorator = node.decorators[0].decorator + if isinstance(decorator, cst.Name) and decorator.value in ("staticmethod", "classmethod"): + return False + return True + if isinstance(node, cst.ClassDef) and len(node.decorators): return True return False @@ -185,12 +247,23 @@ def _skip_node_and_children(self, node: cst.CSTNode) -> bool: trampoline_impl_cst[-1] = trampoline_impl_cst[-1].with_changes(leading_lines=[cst.EmptyLine(), cst.EmptyLine()]) -def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation]) -> tuple[str, Sequence[str]]: +def combine_mutations_to_source( + module: cst.Module, + mutations: Sequence[Mutation], + ignored_classes: set[str] | None = None, + ignored_functions: set[str] | None = None, + mutate_enums: bool = True, +) -> tuple[str, Sequence[str]]: """Create mutated functions and trampolines for all mutations and compile them to a single source code. :param module: The original parsed module :param mutations: Mutations that should be applied. + :param ignored_classes: Class names to skip transformation for (e.g., enums with pragma: no mutate class) + :param ignored_functions: Function names to skip transformation for (pragma: no mutate function) + :param mutate_enums: Whether to mutate enum classes (True) or skip them entirely (False) :return: Mutated code and list of mutation names""" + ignored_classes = ignored_classes or set() + ignored_functions = ignored_functions or set() # copy start of the module (in particular __future__ imports) result: list[MODULE_STATEMENT] = get_statements_until_func_or_class(module.body) @@ -210,6 +283,10 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation for statement in remaining_statements: if isinstance(statement, cst.FunctionDef): func = statement + # Skip entire function if it has pragma: no mutate function + if func.name.value in ignored_functions: + result.append(func) + continue func_mutants = mutations_within_function.get(func) if not func_mutants: result.append(func) @@ -219,22 +296,48 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation mutation_names.extend(mutant_names) elif isinstance(statement, cst.ClassDef): cls = statement + # Skip entire class if it has pragma: no mutate class + if cls.name.value in ignored_classes: + result.append(cls) + continue if not isinstance(cls.body, cst.IndentedBlock): # we don't mutate single-line classes, e.g. `class A: a = 1; b = 2` result.append(cls) + elif is_enum_class(cls): + if not mutate_enums: + result.append(cls) + continue + external_nodes, modified_cls, enum_mutant_names = enum_trampoline_arrangement( + cls, mutations_within_function + ) + result.extend(external_nodes) + result.append(modified_cls) + mutation_names.extend(enum_mutant_names) else: + external_nodes_for_class: list[MODULE_STATEMENT] = [] mutated_body = [] for method in cls.body.body: method_mutants = mutations_within_function.get(method) if not isinstance(method, cst.FunctionDef) or not method_mutants: mutated_body.append(method) continue - nodes, mutant_names = function_trampoline_arrangement( - method, method_mutants, class_name=cls.name.value - ) - mutated_body.extend(nodes) - mutation_names.extend(mutant_names) + method_type = get_method_type(method) + if method_type in (MethodType.STATICMETHOD, MethodType.CLASSMETHOD): + ext_nodes, assignment, method_mutant_names = _external_method_injection( + method, method_mutants, cls.name.value, method_type + ) + external_nodes_for_class.extend(ext_nodes) + mutated_body.append(assignment) + mutation_names.extend(method_mutant_names) + else: + nodes, mutant_names = function_trampoline_arrangement( + method, method_mutants, class_name=cls.name.value + ) + mutated_body.extend(nodes) + mutation_names.extend(mutant_names) + + result.extend(external_nodes_for_class) result.append(cls.with_changes(body=cls.body.with_changes(body=mutated_body))) else: result.append(statement) @@ -243,6 +346,55 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation return mutated_module.code, mutation_names +def _external_method_injection( + method: cst.FunctionDef, mutants: Sequence[Mutation], class_name: str, method_type: MethodType +) -> tuple[Sequence[MODULE_STATEMENT], cst.SimpleStatementLine, Sequence[str]]: + """Create external trampoline for a method using external injection pattern. + + This moves mutation code outside the class and uses a simple assignment + inside the class body. Works for staticmethod, classmethod, and instance methods. + + :param method: The method to create external trampoline for + :param mutants: The mutations for this method + :param class_name: The containing class name + :param method_type: MethodType.STATICMETHOD, MethodType.CLASSMETHOD, or MethodType.INSTANCE + :return: A tuple of (external_nodes, class_body_assignment, mutant_names) + """ + external_nodes: list[MODULE_STATEMENT] = [] + mutant_names: list[str] = [] + method_name = method.name.value + prefix = f"_{class_name}_{method_name}" + mangled_name = mangle_function_name(name=method_name, class_name=class_name) + "__mutmut" + + orig_func = method.with_changes(name=cst.Name(f"{prefix}_orig"), decorators=[]) + external_nodes.append(orig_func) + + for i, mutant in enumerate(mutants): + mutant_func_name = f"{prefix}_mutant_{i + 1}" + full_mutant_name = f"{mangled_name}_{i + 1}" + mutant_names.append(full_mutant_name) + + mutated = method.with_changes(name=cst.Name(mutant_func_name), decorators=[]) + mutated = cast(cst.FunctionDef, deep_replace(mutated, mutant.original_node, mutant.mutated_node)) + external_nodes.append(mutated) + trampoline_code = build_enum_trampoline( + class_name=class_name, method_name=method_name, mutant_names=mutant_names, method_type=method_type + ) + trampoline_nodes = list(cst.parse_module(trampoline_code).body) + external_nodes.extend(trampoline_nodes) + + if method_type == MethodType.STATICMETHOD: + assignment_code = f"{method_name} = staticmethod({prefix}_trampoline)" + elif method_type == MethodType.CLASSMETHOD: + assignment_code = f"{method_name} = classmethod({prefix}_trampoline)" + else: + assignment_code = f"{method_name} = {prefix}_trampoline" + + assignment = cast(cst.SimpleStatementLine, cst.parse_statement(assignment_code)) + + return external_nodes, assignment, mutant_names + + def function_trampoline_arrangement( function: cst.FunctionDef, mutants: Iterable[Mutation], class_name: str | None ) -> tuple[Sequence[MODULE_STATEMENT], Sequence[str]]: @@ -266,16 +418,26 @@ def function_trampoline_arrangement( for i, mutant in enumerate(mutants): mutant_name = f"{mangled_name}_{i + 1}" mutant_names.append(mutant_name) - mutated_method_base = function.with_changes(name=cst.Name(mutant_name)) - mutated_method_result = deep_replace(mutated_method_base, mutant.original_node, mutant.mutated_node) - nodes.append(mutated_method_result) # type: ignore[arg-type] - - mutants_dict = list( - cst.parse_module(create_trampoline_lookup(orig_name=name, mutants=mutant_names, class_name=class_name)).body + mutated_method = function.with_changes(name=cst.Name(mutant_name)) + mutated_method = cast(cst.FunctionDef, deep_replace(mutated_method, mutant.original_node, mutant.mutated_node)) + nodes.append(mutated_method) + + # trampoline that forwards the calls + is_async = function.asynchronous is not None + trampoline = list( + cst.parse_module( + build_function_trampoline( + orig_name=name, + mutants=mutant_names, + class_name=class_name, + is_async=is_async, + is_async_generator=is_async and _is_generator(function), + ) + ).body ) - mutants_dict[0] = mutants_dict[0].with_changes(leading_lines=[cst.EmptyLine()]) + trampoline[0] = trampoline[0].with_changes(leading_lines=[cst.EmptyLine()]) - nodes.extend(mutants_dict) + nodes.extend(trampoline) return nodes, mutant_names @@ -324,13 +486,13 @@ def _get_local_name(func_name: str) -> cst.BaseExpression: ], ) # for non-async functions, simply return the value or generator - result_statement = cst.SimpleStatementLine([cst.Return(result)]) + result_statement: cst.BaseStatement = cst.SimpleStatementLine([cst.Return(result)]) if function.asynchronous: is_generator = _is_generator(function) if is_generator: # async for i in _mutmut_trampoline(...): yield i - result_statement = cst.For( # type: ignore[assignment] + result_statement = cst.For( target=cst.Name("i"), iter=result, body=cst.IndentedBlock([cst.SimpleStatementLine([cst.Expr(cst.Yield(cst.Name("i")))])]), @@ -354,6 +516,53 @@ def _get_local_name(func_name: str) -> cst.BaseExpression: ) +def enum_trampoline_arrangement( + cls: cst.ClassDef, mutations_by_method: Mapping[cst.CSTNode, Sequence[Mutation]] +) -> tuple[Sequence[MODULE_STATEMENT], cst.ClassDef, Sequence[str]]: + """Create external functions and minimal enum class for enum mutation. + + This pattern moves all mutation-related code OUTSIDE the enum class body, + avoiding the enum metaclass conflict that occurs when class-level attributes + are added. The enum class only contains simple method assignments. + + :param cls: The enum class definition + :param mutations_by_method: Mapping of method nodes to their mutations + :return: A tuple of (external_nodes, modified_class, mutant_names) + """ + external_nodes: list[MODULE_STATEMENT] = [] + mutant_names: list[str] = [] + new_body: list[cst.BaseStatement | cst.BaseSmallStatement] = [] + class_name = cls.name.value + + for item in cls.body.body: + if not isinstance(item, cst.FunctionDef): + new_body.append(item) + continue + + method = item + method_mutants = mutations_by_method.get(method) + + if not method_mutants: + new_body.append(method) + continue + + method_type = get_method_type(method) + if method_type is None: + new_body.append(method) + continue + + ext_nodes, assignment, method_mutant_names = _external_method_injection( + method, method_mutants, class_name, method_type + ) + external_nodes.extend(ext_nodes) + new_body.append(assignment) + mutant_names.extend(method_mutant_names) + + modified_cls = cls.with_changes(body=cls.body.with_changes(body=new_body)) + + return external_nodes, modified_cls, mutant_names + + def get_statements_until_func_or_class(statements: Sequence[MODULE_STATEMENT]) -> list[MODULE_STATEMENT]: """Get all statements until we encounter the first function or class definition""" result: list[MODULE_STATEMENT] = [] @@ -383,9 +592,11 @@ def pragma_no_mutate_lines(source: str) -> set[int]: } -def deep_replace(tree: cst.CSTNode, old_node: cst.CSTNode, new_node: cst.CSTNode) -> cst.CSTNode: +def deep_replace( + tree: cst.CSTNode, old_node: cst.CSTNode, new_node: cst.CSTNode +) -> cst.CSTNode | cst.RemovalSentinel | cst.FlattenSentinel[cst.CSTNode]: """Like the CSTNode.deep_replace method, except that we only replace up to one occurrence of old_node.""" - return tree.visit(ChildReplacementTransformer(old_node, new_node)) # type: ignore[return-value] + return tree.visit(ChildReplacementTransformer(old_node, new_node)) class ChildReplacementTransformer(cst.CSTTransformer): @@ -423,7 +634,6 @@ def __init__(self, original_function: cst.FunctionDef): self.original_function: cst.FunctionDef = original_function def visit_FunctionDef(self, node: cst.FunctionDef) -> bool | None: - # do not recurse into inner function definitions if self.original_function != node: return False return None @@ -431,3 +641,94 @@ def visit_FunctionDef(self, node: cst.FunctionDef) -> bool | None: def visit_Yield(self, node: cst.Yield) -> bool: self.is_generator = True return False + + +@dataclass +class MutatedMethodLocation: + file: Path + function_name: str + line_number_start: int + line_number_end: int + + +@dataclass +class FailedTypeCheckMutant: + method_location: MutatedMethodLocation + name: str + error: TypeCheckingError + + +class MutatedMethodsCollector(cst.CSTVisitor): + METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,) + + def __init__(self, file: Path): + self.file = file + self.found_mutants: list[MutatedMethodLocation] = [] + + def visit_FunctionDef(self, node: cst.FunctionDef) -> bool: + name = node.name.value + if is_mutated_method_name(name): + range = self.get_metadata(cst.metadata.PositionProvider, node) + self.found_mutants.append( + MutatedMethodLocation( + file=self.file, + function_name=name, + line_number_start=range.start.line, + line_number_end=range.end.line, + ) + ) + + # do not continue visting children of this function + # mutated methods are not nested within other methods + return False + + +def group_by_path(errors: list[TypeCheckingError]) -> dict[Path, list[TypeCheckingError]]: + grouped: dict[Path, list[TypeCheckingError]] = defaultdict(list) + + for error in errors: + grouped[error.file_path].append(error) + + return grouped + + +def filter_mutants_with_type_checker() -> dict[str, FailedTypeCheckMutant]: + with change_cwd(Path("mutants")): + errors = run_type_checker(Config.get().type_check_command) + errors_by_path = group_by_path(errors) + + mutants_to_skip: dict[str, FailedTypeCheckMutant] = {} + + for path, errors_of_file in errors_by_path.items(): + with open(path, encoding="utf-8") as file: + source = file.read() + wrapper = cst.MetadataWrapper(cst.parse_module(source)) + visitor = MutatedMethodsCollector(path) + wrapper.visit(visitor) + mutated_methods = visitor.found_mutants + + for error in errors_of_file: + assert error.file_path == visitor.file + mutant = next( + (m for m in mutated_methods if m.line_number_start <= error.line_number <= m.line_number_end), None + ) + if mutant is None: + source_lines = source.splitlines() + error_line = source_lines[error.line_number - 1] if error.line_number <= len(source_lines) else "" + if GENERATED_MARKER in error_line: + continue + raise Exception( + f"Could not find mutant for type error {error.file_path}:{error.line_number} ({error.error_description}). " + "Probably, a code mutation influenced types in unexpected locations. " + "If your project normally has no type errors and uses mypy/pyrefly, please file an issue with steps to reproduce on github." + ) + + mutant_name = get_mutant_name(path.relative_to(Path(".").absolute()), mutant.function_name) + + mutants_to_skip[mutant_name] = FailedTypeCheckMutant( + method_location=mutant, + name=mutant_name, + error=error, + ) + + return mutants_to_skip diff --git a/src/mutmut/node_mutation.py b/src/mutmut/mutation/mutators.py similarity index 88% rename from src/mutmut/node_mutation.py rename to src/mutmut/mutation/mutators.py index bf4b1f1d..0429976f 100644 --- a/src/mutmut/node_mutation.py +++ b/src/mutmut/mutation/mutators.py @@ -4,6 +4,7 @@ from collections.abc import Callable from collections.abc import Iterable from collections.abc import Sequence +from enum import Enum from typing import Any from typing import cast @@ -12,7 +13,7 @@ OPERATORS_TYPE = Sequence[ tuple[ - type[cst.CSTNode], + type, Callable[[Any], Iterable[cst.CSTNode]], ] ] @@ -21,6 +22,41 @@ NON_ESCAPE_SEQUENCE = re.compile(r"((? MethodType | None: + """Determine the method type based on decorators. + + Returns: + MethodType.STATICMETHOD - for @staticmethod + MethodType.CLASSMETHOD - for @classmethod + MethodType.INSTANCE - for no decorators (regular instance method) + None - for other/multiple decorators (should be skipped) + """ + if not method.decorators: + return MethodType.INSTANCE + + if len(method.decorators) != 1: + # Multiple decorators - skip + return None + + decorator = method.decorators[0].decorator + if isinstance(decorator, cst.Name): + if decorator.value == "staticmethod": + return MethodType.STATICMETHOD + elif decorator.value == "classmethod": + return MethodType.CLASSMETHOD + + # Other decorator - skip + return None + + def operator_number(node: cst.BaseNumber) -> Iterable[cst.BaseNumber]: if isinstance(node, cst.Integer | cst.Float): yield node.with_changes(value=repr(node.evaluated_value + 1)) @@ -233,11 +269,10 @@ def operator_assignment( if not node.value: # do not mutate `a: sometype` to an assignment `a: sometype = ""` return - mutated_value: cst.BaseExpression if m.matches(node.value, m.Name("None")): mutated_value = cst.SimpleString('""') else: - mutated_value = cst.Name("None") + mutated_value = cst.Name("None") # type: ignore[assignment] yield node.with_changes(value=mutated_value) @@ -251,8 +286,8 @@ def operator_match(node: cst.Match) -> Iterable[cst.CSTNode]: # Operators that should be called on specific node types mutation_operators: OPERATORS_TYPE = [ - (cst.BaseNumber, operator_number), # type: ignore[type-abstract] - (cst.BaseString, operator_string), # type: ignore[type-abstract] + (cst.BaseNumber, operator_number), + (cst.BaseString, operator_string), (cst.Name, operator_name), (cst.Assign, operator_assignment), (cst.AnnAssign, operator_assignment), @@ -263,8 +298,8 @@ def operator_match(node: cst.Match) -> Iterable[cst.CSTNode]: (cst.Call, operator_symmetric_string_methods_swap), (cst.Call, operator_unsymmetrical_string_methods_swap), (cst.Lambda, operator_lambda), - (cst.CSTNode, operator_keywords), # type: ignore[type-abstract] - (cst.CSTNode, operator_swap_op), # type: ignore[type-abstract] + (cst.CSTNode, operator_keywords), + (cst.CSTNode, operator_swap_op), (cst.Match, operator_match), ] @@ -279,3 +314,5 @@ def _simple_mutation_mapping( # TODO: detect regexes and mutate them in nasty ways? Maybe mutate all strings as if they are regexes + +# TODO: implement removal of inner decorators diff --git a/src/mutmut/mutation/pragma_handling.py b/src/mutmut/mutation/pragma_handling.py new file mode 100644 index 00000000..76d7137d --- /dev/null +++ b/src/mutmut/mutation/pragma_handling.py @@ -0,0 +1,41 @@ +"""Pragma comment parsing for mutation control.""" + + +def parse_pragma_lines(source: str) -> tuple[set[int], set[int], set[int]]: + """Parse all pragma: no mutate variants. + + Each set is mutually exclusive. + + Supported pragmas: + - ``# pragma: no mutate`` - skip this line only + - ``# pragma: no mutate class`` - skip entire class + - ``# pragma: no mutate: class`` - skip entire class (alternative syntax) + - ``# pragma: no mutate function`` - skip entire function + - ``# pragma: no mutate: function`` - skip entire function (alternative syntax) + + :return: A tuple of (no_mutate_lines, class_lines, function_lines) + """ + no_mutate_lines: set[int] = set() + class_lines: set[int] = set() + function_lines: set[int] = set() + + for i, line in enumerate(source.split("\n")): + if "# pragma:" not in line: + continue + + pragma_content = line.partition("# pragma:")[-1] + line_num = i + 1 + + if "no mutate" not in pragma_content: + continue + + # Check for specific variants first (more specific matches) + if "no mutate class" in pragma_content or "no mutate: class" in pragma_content: + class_lines.add(line_num) + elif "no mutate function" in pragma_content or "no mutate: function" in pragma_content: + function_lines.add(line_num) + else: + # Generic "no mutate" (not class or function) + no_mutate_lines.add(line_num) + + return no_mutate_lines, class_lines, function_lines diff --git a/src/mutmut/mutation/trampoline_templates.py b/src/mutmut/mutation/trampoline_templates.py new file mode 100644 index 00000000..280e0cf8 --- /dev/null +++ b/src/mutmut/mutation/trampoline_templates.py @@ -0,0 +1,183 @@ +from mutmut.mutation.mutators import MethodType + +CLASS_NAME_SEPARATOR = "ǁ" + +GENERATED_MARKER = "# mutmut: generated" + + +def _mark_generated(code: str) -> str: + """Append the generated marker comment to every code line in a block.""" + lines = [] + for line in code.splitlines(): + stripped = line.strip() + if stripped and not stripped.startswith("#"): + line = f"{line} {GENERATED_MARKER}" + lines.append(line) + return "\n".join(lines) + + +def mangle_function_name(*, name: str, class_name: str | None) -> str: + assert CLASS_NAME_SEPARATOR not in name + if class_name: + assert CLASS_NAME_SEPARATOR not in class_name + prefix = f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}" + else: + prefix = "x_" + return f"{prefix}{name}" + + +def build_function_trampoline( + *, + orig_name: str, + mutants: list[str], + class_name: str | None, + is_async: bool = False, + is_async_generator: bool = False, +) -> str: + mangled_name = mangle_function_name(name=orig_name, class_name=class_name) + + type_annotation = "ClassVar[MutantDict]" if class_name is not None else "MutantDict" + mutants_dict = ( + f"{mangled_name}__mutmut_mutants : {type_annotation} = {{\n" + + ", \n ".join(f"{repr(m)}: {m}" for m in mutants) + + "\n}" + ) + access_prefix = "" + access_suffix = "" + self_arg = "" + if class_name is not None: + access_prefix = 'object.__getattribute__(self, "' + access_suffix = '")' + self_arg = ", self" + + trampoline_name = "_mutmut_trampoline" + trampoline_call = f"{trampoline_name}({access_prefix}{mangled_name}__mutmut_orig{access_suffix}, {access_prefix}{mangled_name}__mutmut_mutants{access_suffix}, args, kwargs{self_arg})" + self_prefix = "self, " if class_name is not None else "" + + if is_async_generator: + body = f"""\ +async def {orig_name}({self_prefix}*args, **kwargs): + async for i in {trampoline_call}: + yield i""" + elif is_async: + body = f"""\ +async def {orig_name}({self_prefix}*args, **kwargs): + result = await {trampoline_call} + return result""" + else: + body = f"""\ +def {orig_name}({self_prefix}*args, **kwargs): + result = {trampoline_call} + return result""" + + return _mark_generated(f""" +{mutants_dict} + +{body} + +{orig_name} = _mutmut_wraps({mangled_name}__mutmut_orig)({orig_name}) +{mangled_name}__mutmut_orig.__name__ = '{mangled_name}' +""") + + +def build_enum_trampoline( + *, class_name: str, method_name: str, mutant_names: list[str], method_type: MethodType +) -> str: + """Generate external trampoline code for enum methods. + + This pattern moves all mutation-related code OUTSIDE the enum class body, + avoiding the enum metaclass conflict. The enum class only contains a simple + assignment like `method_name = _ClassName_method_trampoline`. + + :param class_name: The enum class name + :param method_name: The method being mutated + :param mutant_names: List of mutant function names (mangled) + :param method_type: 'instance', 'static', or 'classmethod' + :return: String containing the external functions and mutants dict + """ + prefix = f"_{class_name}_{method_name}" + mangled_name = mangle_function_name(name=method_name, class_name=class_name) + + # Build mutants dict + mutants_dict_entries = ", ".join(f"{repr(m)}: {prefix}_mutant_{i + 1}" for i, m in enumerate(mutant_names)) + mutants_dict = f"{prefix}_mutants = {{{mutants_dict_entries}}}" + + orig_name_fix = f"{prefix}_orig.__name__ = '{mangled_name}'" + + # Build trampoline based on method type + if method_type == MethodType.STATICMETHOD: + trampoline = f""" +def {prefix}_trampoline(*args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + elif method_type == MethodType.CLASSMETHOD: + trampoline = f""" +def {prefix}_trampoline(cls, *args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs, cls) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + else: # instance method + trampoline = f""" +def {prefix}_trampoline(self, *args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs, self) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + + return _mark_generated(f"{mutants_dict}\n{orig_name_fix}\n{trampoline}") + + +# noinspection PyUnresolvedReferences +# language=python +trampoline_impl = _mark_generated(""" +from functools import wraps as _mutmut_wraps +from typing import Annotated +from typing import Callable +from typing import ClassVar + + +MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore + + +def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore + \"""Forward call to original or mutated function, depending on the environment\""" + import os # type: ignore + mutant_under_test = os.environ.get('MUTANT_UNDER_TEST', '') # type: ignore + if not mutant_under_test: + # No mutant being tested - call original function + if self_arg is not None and not hasattr(orig, '__self__'): + return orig(self_arg, *call_args, **call_kwargs) + else: + return orig(*call_args, **call_kwargs) + if mutant_under_test == 'fail': # type: ignore + from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore + raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore + elif mutant_under_test == 'stats': # type: ignore + from mutmut.__main__ import record_trampoline_hit # type: ignore + record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore + else: + result = orig(*call_args, **call_kwargs) # type: ignore + return result # type: ignore + prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore + if not mutant_under_test.startswith(prefix): # type: ignore + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore + else: + result = orig(*call_args, **call_kwargs) # type: ignore + return result # type: ignore + mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore + if self_arg is not None: # type: ignore + # call to a class method where self is not bound + result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore + else: + result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore + return result # type: ignore + +""") diff --git a/src/mutmut/threading/__init__.py b/src/mutmut/threading/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/threading/timeout.py b/src/mutmut/threading/timeout.py new file mode 100644 index 00000000..a8acf4f3 --- /dev/null +++ b/src/mutmut/threading/timeout.py @@ -0,0 +1,56 @@ +import heapq +import os +import signal +import time +from threading import Condition +from threading import Thread + +_timeout_heap: list[tuple[float, int]] = [] # (timeout_timestamp, pid) +_heap_lock = Condition() +_checker_started = False + + +def register_timeout(pid: int, timeout_s: float) -> None: + """Register a timeout for a given PID. + + Starts the timeout checker thread if not already started. + + On timeout sends SIGXCPU to the process. + + Args: + pid: The process ID to register the timeout for. + timeout_s: The number of seconds until the timeout occurs. + """ + global _checker_started + if not _checker_started: + _checker_started = True + Thread(target=_timeout_checker_thread, name=f"{os.getpid()}-mutmut-timeout-checker", daemon=True).start() + + deadline = time.time() + timeout_s + with _heap_lock: + heapq.heappush(_timeout_heap, (deadline, pid)) + _heap_lock.notify() + + +def _timeout_checker_thread() -> None: + """Thread function that checks for timeouts and terminates processes. + + We make a trade-off here in the name of simplicity by not exposing a + mechanism to cancel timeouts, which saves us an O(n) operation on each + timeout we would cancel. Instead, we let expired entries for already-terminated + processes remain in the heap until they reach the top and are popped off. + The downside is a bit of memory bloat but each tuple is ~72 bytes so + even with 10,000 backed up timeouts it's less than 1MB. + """ + while True: + with _heap_lock: + while not _timeout_heap: + _heap_lock.wait() + now = time.time() + while _timeout_heap and _timeout_heap[0][0] <= now: + _, pid = heapq.heappop(_timeout_heap) + try: + os.kill(pid, signal.SIGXCPU) + except ProcessLookupError: + pass # Process already terminated + time.sleep(1) diff --git a/src/mutmut/trampoline_templates.py b/src/mutmut/trampoline_templates.py deleted file mode 100644 index 0b051938..00000000 --- a/src/mutmut/trampoline_templates.py +++ /dev/null @@ -1,63 +0,0 @@ -CLASS_NAME_SEPARATOR = "ǁ" - - -def create_trampoline_lookup(*, orig_name: str, mutants: list[str], class_name: str | None) -> str: - mangled_name = mangle_function_name(name=orig_name, class_name=class_name) - - mutants_dict = ( - f"{mangled_name}__mutmut_mutants : ClassVar[MutantDict] = {{ # type: ignore\n" - + ", \n ".join(f"{repr(m)}: {m}" for m in mutants) - + "\n}" - ) - return f""" -{mutants_dict} -{mangled_name}__mutmut_orig.__name__ = '{mangled_name}' -""" - - -def mangle_function_name(*, name: str, class_name: str | None) -> str: - assert CLASS_NAME_SEPARATOR not in name - if class_name: - assert CLASS_NAME_SEPARATOR not in class_name - prefix = f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}" - else: - prefix = "x_" - return f"{prefix}{name}" - - -# noinspection PyUnresolvedReferences -# language=python -trampoline_impl = """ -from typing import Annotated -from typing import Callable -from typing import ClassVar - -MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore - - -def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore - \"""Forward call to original or mutated function, depending on the environment\""" - import os # type: ignore - mutant_under_test = os.environ['MUTANT_UNDER_TEST'] # type: ignore - if mutant_under_test == 'fail': # type: ignore - from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore - raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore - elif mutant_under_test == 'stats': # type: ignore - from mutmut.__main__ import record_trampoline_hit # type: ignore - record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore - # (for class methods, orig is bound and thus does not need the explicit self argument) - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore - if not mutant_under_test.startswith(prefix): # type: ignore - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore - if self_arg is not None: # type: ignore - # call to a class method where self is not bound - result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore - else: - result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore - return result # type: ignore - -""" diff --git a/src/mutmut/utils/__init__.py b/src/mutmut/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/utils/file_utils.py b/src/mutmut/utils/file_utils.py new file mode 100644 index 00000000..d3e73be8 --- /dev/null +++ b/src/mutmut/utils/file_utils.py @@ -0,0 +1,14 @@ +import os +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path + + +@contextmanager +def change_cwd(path: Path | str) -> Iterator[None]: + old_cwd = Path(os.getcwd()).resolve() + os.chdir(path) + try: + yield + finally: + os.chdir(old_cwd) diff --git a/src/mutmut/utils/format_utils.py b/src/mutmut/utils/format_utils.py new file mode 100644 index 00000000..a1228719 --- /dev/null +++ b/src/mutmut/utils/format_utils.py @@ -0,0 +1,55 @@ +"""Utility functions for mutmut name formatting and key generation.""" + +import os +from pathlib import Path + +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR + + +def make_mutant_key(func_name: str, class_name: str | None = None) -> str: + """Create a consistent key for identifying a function/method for mutation tracking. + + :param func_name: The function or method name + :param class_name: The containing class name, or None for top-level functions + :return: A key string like "xǁMyClassǁmethod" for methods or "x_foo" for functions + """ + if class_name: + return f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}{func_name}" + else: + return f"x_{func_name}" + + +def parse_mutant_key(key: str) -> tuple[str, str | None]: + """Parse a mutant key back into function name and optional class name. + + :param key: A key string like "xǁMyClassǁmethod" or "x_foo" + :return: A tuple of (func_name, class_name) where class_name is None for top-level functions + """ + if CLASS_NAME_SEPARATOR in key: + class_name = key[key.index(CLASS_NAME_SEPARATOR) + 1 : key.rindex(CLASS_NAME_SEPARATOR)] + func_name = key[key.rindex(CLASS_NAME_SEPARATOR) + 1 :] + return func_name, class_name + else: + assert key.startswith("x_"), f"Invalid key format: {key}" + return key[2:], None + + +def is_mutated_method_name(name: str) -> bool: + return name.startswith(("x_", "xǁ")) and "__mutmut" in name + + +def strip_prefix(s: str, *, prefix: str, strict: bool = False) -> str: + if s.startswith(prefix): + return s[len(prefix) :] + assert strict is False, f"String '{s}' does not start with prefix '{prefix}'" + return s + + +def get_mutant_name(relative_source_path: Path, mutant_method_name: str) -> str: + module_name = str(relative_source_path)[: -len(relative_source_path.suffix)].replace(os.sep, ".") + module_name = strip_prefix(module_name, prefix="src.") + + # FYI, we currently use "mutant_name" inconsistently, for both the whole identifier including the path and only the mangled method name + mutant_name = f"{module_name}.{mutant_method_name}" + mutant_name = mutant_name.replace(".__init__.", ".") + return mutant_name diff --git a/src/mutmut/utils/safe_setproctitle.py b/src/mutmut/utils/safe_setproctitle.py new file mode 100644 index 00000000..427ba09a --- /dev/null +++ b/src/mutmut/utils/safe_setproctitle.py @@ -0,0 +1,24 @@ +"""Safe wrapper for setproctitle that handles fork-unsafe behavior on macOS. + +setproctitle uses CoreFoundation APIs on macOS which aren't fork-safe. +Calling setproctitle after fork() causes segfaults in the child process. + +This module provides a safe_setproctitle() function that: +- Works normally on Linux +- Is a no-op on macOS to avoid crashes after fork() + +Related: https://github.com/boxed/mutmut/pull/450#issuecomment-4002571055 +""" + +from mutmut.configuration import Config + +if Config.get().use_setproctitle: + from setproctitle import setproctitle as _setproctitle + + def safe_setproctitle(title: str) -> None: + """Set the process title.""" + _setproctitle(title) +else: + + def safe_setproctitle(title: str) -> None: + """No-op on macOS where setproctitle crashes after fork.""" diff --git a/tests/data/test_generation/__init__.py b/tests/data/test_generation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/e2e_utils.py b/tests/e2e/e2e_utils.py index 30becf80..c7a6ae71 100644 --- a/tests/e2e/e2e_utils.py +++ b/tests/e2e/e2e_utils.py @@ -8,8 +8,8 @@ import mutmut from mutmut.__main__ import SourceFileMutationData from mutmut.__main__ import _run -from mutmut.__main__ import ensure_config_loaded from mutmut.__main__ import walk_source_files +from mutmut.configuration import Config @contextmanager @@ -25,11 +25,12 @@ def change_cwd(path): def read_all_stats_for_project(project_path: Path) -> dict[str, dict]: """Create a single dict from all mutant results in *.meta files""" with change_cwd(project_path): - ensure_config_loaded() + Config.reset() + Config.ensure_loaded() stats = {} for p in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(p): # type: ignore + if Config.get().should_ignore_for_mutation(p): continue data = SourceFileMutationData(path=p) data.load() @@ -46,19 +47,25 @@ def read_json_file(path: Path): def write_json_file(path: Path, data: Any): with open(path, "w") as file: json.dump(data, file, indent=2) + file.write("\n") # ensure newline at end of file for POSIX compliance + + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +E2E_PROJECTS = REPO_ROOT / "e2e_projects" def run_mutmut_on_project(project: str) -> dict: """Runs mutmut on this project and verifies that the results stay the same for all mutations.""" mutmut._reset_globals() - project_path = Path("..").parent / "e2e_projects" / project + project_path = E2E_PROJECTS / project mutants_path = project_path / "mutants" shutil.rmtree(mutants_path, ignore_errors=True) # mutmut run with change_cwd(project_path): + mutmut._reset_globals() _run([], None) return read_all_stats_for_project(project_path) diff --git a/tests/e2e/test_e2e_my_lib.py b/tests/e2e/test_e2e_my_lib.py index c2aac7e1..5883697e 100644 --- a/tests/e2e/test_e2e_my_lib.py +++ b/tests/e2e/test_e2e_my_lib.py @@ -64,6 +64,23 @@ def test_my_lib_result_snapshot(): "my_lib.xǁPointǁto_origin__mutmut_3": 0, "my_lib.xǁPointǁto_origin__mutmut_4": 0, "my_lib.xǁPointǁ__len____mutmut_1": 33, + "my_lib.xǁPointǁfrom_coords__mutmut_1": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_2": 0, + "my_lib.xǁPointǁfrom_coords__mutmut_3": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_4": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_5": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_6": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_1": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_2": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_3": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_1": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_2": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_3": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_4": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_5": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_6": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_7": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_8": 1, "my_lib.x_escape_sequences__mutmut_1": 1, "my_lib.x_escape_sequences__mutmut_2": 0, "my_lib.x_escape_sequences__mutmut_3": 1, @@ -79,6 +96,10 @@ def test_my_lib_result_snapshot(): "my_lib.x_func_with_star__mutmut_2": 1, "my_lib.x_func_with_star__mutmut_3": 1, "my_lib.x_func_with_arbitrary_args__mutmut_1": 1, + "my_lib.xǁColorǁis_primary__mutmut_1": 1, + "my_lib.xǁColorǁdarken__mutmut_1": 1, + "my_lib.xǁColorǁdarken__mutmut_2": 1, + "my_lib.xǁColorǁfrom_name__mutmut_1": 1, } } ) diff --git a/tests/e2e/test_e2e_type_checking.py b/tests/e2e/test_e2e_type_checking.py index e3aa1c7f..f2c4d0c8 100644 --- a/tests/e2e/test_e2e_type_checking.py +++ b/tests/e2e/test_e2e_type_checking.py @@ -16,8 +16,8 @@ def test_type_checking_result_snapshot(): "type_checking.xǁPersonǁset_name__mutmut_1": 37, "type_checking.x_mutate_me__mutmut_1": 37, "type_checking.x_mutate_me__mutmut_2": 37, - "type_checking.x_mutate_me__mutmut_3": 1, - "type_checking.x_mutate_me__mutmut_4": 1, + "type_checking.x_mutate_me__mutmut_3": 37, + "type_checking.x_mutate_me__mutmut_4": 37, "type_checking.x_mutate_me__mutmut_5": 37, } } diff --git a/tests/mutation/test_enum_handling.py b/tests/mutation/test_enum_handling.py new file mode 100644 index 00000000..3b30b77e --- /dev/null +++ b/tests/mutation/test_enum_handling.py @@ -0,0 +1,132 @@ +"""Tests for enum class detection and handling.""" + +import libcst as cst + +from mutmut.mutation.enum_mutation import ENUM_BASE_CLASSES +from mutmut.mutation.enum_mutation import is_enum_class +from mutmut.mutation.mutators import MethodType +from mutmut.mutation.mutators import get_method_type + + +class TestEnumBaseClasses: + """Tests for ENUM_BASE_CLASSES constant.""" + + def test_contains_standard_enum_types(self): + assert "Enum" in ENUM_BASE_CLASSES + assert "IntEnum" in ENUM_BASE_CLASSES + assert "Flag" in ENUM_BASE_CLASSES + assert "IntFlag" in ENUM_BASE_CLASSES + assert "StrEnum" in ENUM_BASE_CLASSES + + def test_is_frozenset(self): + assert isinstance(ENUM_BASE_CLASSES, frozenset) + + +class TestIsEnumClass: + """Tests for is_enum_class function.""" + + def test_simple_enum(self): + code = "class Color(Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_int_enum(self): + code = "class Priority(IntEnum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_flag_enum(self): + code = "class Permission(Flag): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_attribute_access_enum(self): + code = "class Status(enum.Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_regular_class(self): + code = "class MyClass: pass" + module = cst.parse_module(code) + cls = module.body[0] + assert not is_enum_class(cls) + + def test_class_with_other_base(self): + code = "class MyClass(SomeBase): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert not is_enum_class(cls) + + def test_class_with_multiple_bases(self): + code = "class MyClass(Base1, Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + +class TestGetMethodType: + """Tests for get_method_type function.""" + + def test_instance_method(self): + code = """ +class Foo: + def method(self): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.INSTANCE + + def test_staticmethod(self): + code = """ +class Foo: + @staticmethod + def method(): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.STATICMETHOD + + def test_classmethod(self): + code = """ +class Foo: + @classmethod + def method(cls): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.CLASSMETHOD + + def test_other_single_decorator(self): + code = """ +class Foo: + @property + def method(self): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) is None + + def test_multiple_decorators(self): + code = """ +class Foo: + @staticmethod + @some_decorator + def method(): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) is None diff --git a/tests/test_mutation.py b/tests/mutation/test_mutation.py similarity index 65% rename from tests/test_mutation.py rename to tests/mutation/test_mutation.py index 7c9dd4d9..9ef05c7e 100644 --- a/tests/test_mutation.py +++ b/tests/mutation/test_mutation.py @@ -5,20 +5,19 @@ import libcst as cst import pytest -import mutmut -from mutmut.__main__ import CLASS_NAME_SEPARATOR from mutmut.__main__ import CatchOutput from mutmut.__main__ import MutmutProgrammaticFailException from mutmut.__main__ import get_diff_for_mutant from mutmut.__main__ import orig_function_and_class_names_from_key from mutmut.__main__ import run_forced_fail_test -from mutmut.file_mutation import create_mutations -from mutmut.file_mutation import mutate_file_contents -from mutmut.trampoline_templates import mangle_function_name +from mutmut.mutation.file_mutation import create_mutations +from mutmut.mutation.file_mutation import mutate_file_contents +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.mutation.trampoline_templates import mangle_function_name def mutants_for_source(source: str, covered_lines: set[int] | None = None) -> list[str]: - module, mutated_nodes = create_mutations(source, covered_lines) + module, mutated_nodes, _, _ = create_mutations(source, covered_lines) mutants: list[str] = [module.deep_replace(m.original_node, m.mutated_node).code for m in mutated_nodes] # type: ignore return mutants @@ -432,6 +431,383 @@ def foo(): # pragma: no mutate assert mutants +def test_pragma_no_mutate_class(): + """Test that pragma: no mutate class skips entire class from mutation.""" + source = """ +class Foo: # pragma: no mutate class + def method(self): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should not have any mutant versions or trampoline attributes + assert "xǁFooǁmethod__mutmut" not in mutated_code + # Original class should be preserved unchanged + assert "def method(self):" in mutated_code + assert "return 1 + 1" in mutated_code + + +def test_pragma_no_mutate_class_with_colon(): + """Test that pragma: no mutate: class also works (alternative syntax).""" + source = """ +class Bar: # pragma: no mutate: class + def method(self): + return 2 + 2 +""".strip() + mutated_code = mutated_module(source) + assert "xǁBarǁmethod__mutmut" not in mutated_code + assert "def method(self):" in mutated_code + + +def test_pragma_no_mutate_class_does_not_affect_other_classes(): + """Test that pragma: no mutate class only affects the annotated class.""" + source = """ +class Skipped: # pragma: no mutate class + def method(self): + return 1 + +class Mutated: + def method(self): + return 1 +""".strip() + mutated_code = mutated_module(source) + # Skipped class should not have mutants + assert "xǁSkippedǁmethod__mutmut" not in mutated_code + # Mutated class should have mutants + assert "xǁMutatedǁmethod__mutmut_orig" in mutated_code + + +def test_pragma_no_mutate_vs_no_mutate_class(): + """Test that regular pragma: no mutate does NOT skip entire class (only that line).""" + source = """ +class Foo: # pragma: no mutate + def method(self): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Regular pragma should NOT skip the class - methods should still be mutated + assert "xǁFooǁmethod__mutmut" in mutated_code + + +def test_pragma_no_mutate_class_for_enum(): + """Test the enum use case - pragma prevents trampoline attribute injection.""" + source = """ +from enum import Enum + +class Color(Enum): # pragma: no mutate class + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + mutated_code = mutated_module(source) + # No mutant attributes should be added to the enum class + assert "__mutmut_mutants" not in mutated_code + assert "xǁColorǁdescribe__mutmut" not in mutated_code + # Original enum should be preserved + assert "class Color(Enum):" in mutated_code + assert "RED = 1" in mutated_code + + +def test_pragma_no_mutate_function(): + """Test that pragma: no mutate function skips entire function from mutation.""" + source = """ +def foo(): # pragma: no mutate function + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should not have any mutant versions or trampoline + assert "x_foo__mutmut" not in mutated_code + assert "__mutmut_mutants" not in mutated_code + # Original function should be preserved + assert "def foo():" in mutated_code + assert "return 1 + 1" in mutated_code + + +def test_pragma_no_mutate_function_with_colon(): + """Test that pragma: no mutate: function also works (alternative syntax).""" + source = """ +def bar(): # pragma: no mutate: function + return 2 + 2 +""".strip() + mutated_code = mutated_module(source) + assert "x_bar__mutmut" not in mutated_code + assert "def bar():" in mutated_code + + +def test_pragma_no_mutate_function_does_not_affect_other_functions(): + """Test that pragma: no mutate function only affects the annotated function.""" + source = """ +def skipped(): # pragma: no mutate function + return 1 + +def mutated(): + return 1 +""".strip() + mutated_code = mutated_module(source) + # Skipped function should not have mutants + assert "x_skipped__mutmut" not in mutated_code + # Mutated function should have mutants + assert "x_mutated__mutmut_orig" in mutated_code + + +def test_pragma_no_mutate_vs_no_mutate_function(): + """Test that regular pragma: no mutate does NOT skip entire function.""" + source = """ +def foo(): # pragma: no mutate + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Regular pragma should NOT skip the function - it should still be mutated + assert "x_foo__mutmut" in mutated_code + + +def test_enum_mutation_uses_external_injection(): + """Test that enum classes use external injection pattern to avoid metaclass conflicts.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + mutated_code = mutated_module(source) + # Should NOT have mutant attributes injected INTO the class body (breaks enums) + # The mutant dict should be OUTSIDE the class (before the class definition) + assert "_Color_describe_mutants" in mutated_code + # External trampoline function should exist + assert "_Color_describe_trampoline" in mutated_code + # The method inside the class should be a simple assignment + assert "describe = _Color_describe_trampoline" in mutated_code + # Ensure no ClassVar inside the class (which would break enum) + # Split to get just the class body + class_start = mutated_code.find("class Color(Enum):") + assert class_start > mutated_code.find("_Color_describe_mutants") # mutants dict is BEFORE class + + +def test_enum_mutation_with_staticmethod(): + """Test that @staticmethod in enum classes works correctly.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + + @staticmethod + def helper(): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should have external trampoline + assert "_Color_helper_trampoline" in mutated_code + # Assignment should use staticmethod wrapper + assert "helper = staticmethod(_Color_helper_trampoline)" in mutated_code + + +def test_enum_mutation_with_classmethod(): + """Test that @classmethod in enum classes works correctly.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + + @classmethod + def from_string(cls): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should have external trampoline + assert "_Color_from_string_trampoline" in mutated_code + # Assignment should use classmethod wrapper + assert "from_string = classmethod(_Color_from_string_trampoline)" in mutated_code + + +def test_enum_mutation_preserves_enum_members(): + """Test that enum members are preserved when methods are mutated.""" + source = """ +from enum import Enum + +class Status(Enum): + PENDING = 'pending' + ACTIVE = 'active' + DONE = 'done' + + def is_active(self): + return self == Status.ACTIVE +""".strip() + mutated_code = mutated_module(source) + # Enum members should be unchanged + assert "PENDING = 'pending'" in mutated_code + assert "ACTIVE = 'active'" in mutated_code + assert "DONE = 'done'" in mutated_code + # But method should be mutated externally + assert "_Status_is_active_trampoline" in mutated_code + + +def test_enum_mutation_disabled(): + """Test that mutate_enums=False skips enum mutation entirely.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + + def describe(self): + return self.name.lower() +""".strip() + mutated_code, _ = mutate_file_contents("", source, mutate_enums=False) + # No mutation code should be added + assert "__mutmut_mutants" not in mutated_code + assert "_Color_describe_trampoline" not in mutated_code + # Original enum should be preserved as-is + assert "class Color(Enum):" in mutated_code + assert "def describe(self):" in mutated_code + + +def test_enum_mutation_runtime_execution(): + """Test that mutated enum code can actually be executed and mutants activated.""" + import os + + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + + mutated_code, mutant_names = mutate_file_contents("test.py", source) + assert len(mutant_names) > 0, "Should have at least one mutant" + + # Test original behavior + old_env = os.environ.get("MUTANT_UNDER_TEST") + try: + os.environ["MUTANT_UNDER_TEST"] = "none" + namespace = {"__name__": "test_module"} + exec(mutated_code, namespace) + Color = namespace["Color"] + + # Verify enum members work + assert Color.RED.value == 1 + assert Color.GREEN.value == 2 + + # Verify original method works + assert Color.RED.describe() == "red" + + # Test mutant activation + mutant_name = "test_module." + mutant_names[0] + os.environ["MUTANT_UNDER_TEST"] = mutant_name + + # Mutant should change lower() to upper() + assert Color.RED.describe() == "RED" + finally: + if old_env is not None: + os.environ["MUTANT_UNDER_TEST"] = old_env + elif "MUTANT_UNDER_TEST" in os.environ: + del os.environ["MUTANT_UNDER_TEST"] + + +def test_regular_class_staticmethod_mutation(): + """Test that @staticmethod in regular classes is now mutated using external injection.""" + source = """ +class Calculator: + @staticmethod + def add(a, b): + return a + b +""".strip() + mutated_code = mutated_module(source) + # Should use external injection pattern + assert "_Calculator_add_trampoline" in mutated_code + assert "_Calculator_add_orig" in mutated_code + # Assignment should use staticmethod wrapper + assert "add = staticmethod(_Calculator_add_trampoline)" in mutated_code + + +def test_regular_class_classmethod_mutation(): + """Test that @classmethod in regular classes is now mutated using external injection.""" + source = """ +class Factory: + @classmethod + def create(cls, value): + return value + 1 +""".strip() + mutated_code = mutated_module(source) + # Should use external injection pattern + assert "_Factory_create_trampoline" in mutated_code + assert "_Factory_create_orig" in mutated_code + # Assignment should use classmethod wrapper + assert "create = classmethod(_Factory_create_trampoline)" in mutated_code + + +def test_regular_class_mixed_methods(): + """Test that regular classes correctly handle mix of instance, static, and class methods.""" + source = """ +class MyClass: + def instance_method(self): + return 1 + 1 + + @staticmethod + def static_method(): + return 2 + 2 + + @classmethod + def class_method(cls): + return 3 + 3 +""".strip() + mutated_code = mutated_module(source) + # Instance method uses internal trampoline (inside class) + assert "xǁMyClassǁinstance_method__mutmut_orig" in mutated_code + # Static and class methods use external injection + assert "_MyClass_static_method_trampoline" in mutated_code + assert "_MyClass_class_method_trampoline" in mutated_code + assert "static_method = staticmethod(_MyClass_static_method_trampoline)" in mutated_code + assert "class_method = classmethod(_MyClass_class_method_trampoline)" in mutated_code + + +def test_regular_class_staticmethod_runtime(): + """Test that staticmethod mutation in regular classes works at runtime.""" + import os + + source = """ +class Calculator: + @staticmethod + def add(a, b): + return a + b +""".strip() + + mutated_code, mutant_names = mutate_file_contents("test.py", source) + assert len(mutant_names) > 0, "Should have at least one mutant" + + old_env = os.environ.get("MUTANT_UNDER_TEST") + try: + os.environ["MUTANT_UNDER_TEST"] = "none" + namespace = {"__name__": "test_module"} + exec(mutated_code, namespace) + Calculator = namespace["Calculator"] + + # Verify original works + assert Calculator.add(2, 3) == 5 + + # Test mutant activation (a + b -> a - b) + mutant_name = "test_module." + mutant_names[0] + os.environ["MUTANT_UNDER_TEST"] = mutant_name + + # Mutant should change + to - + assert Calculator.add(5, 3) == 2 + finally: + if old_env is not None: + os.environ["MUTANT_UNDER_TEST"] = old_env + elif "MUTANT_UNDER_TEST" in os.environ: + del os.environ["MUTANT_UNDER_TEST"] + + def test_mutate_only_covered_lines_none(): source = """def foo():\n return 1+1\n""".strip() mutants = mutants_for_source(source, covered_lines=set()) @@ -644,7 +1020,6 @@ def foo(): @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_failing_test(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(return_value=1) run_forced_fail_test(runner) @@ -663,7 +1038,6 @@ def test_run_forced_fail_test_with_failing_test(_start, _stop, _dump_output, cap @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_mutmut_programmatic_fail_exception(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(side_effect=MutmutProgrammaticFailException()) run_forced_fail_test(runner) @@ -678,7 +1052,6 @@ def test_run_forced_fail_test_with_mutmut_programmatic_fail_exception(_start, _s @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_all_tests_passing(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(return_value=0) with pytest.raises(SystemExit) as error: diff --git a/tests/mutation/test_pragma_handling.py b/tests/mutation/test_pragma_handling.py new file mode 100644 index 00000000..1df6c21e --- /dev/null +++ b/tests/mutation/test_pragma_handling.py @@ -0,0 +1,97 @@ +"""Tests for pragma comment parsing.""" + +from mutmut.mutation.pragma_handling import parse_pragma_lines + + +class TestParsePragmaLines: + """Tests for parse_pragma_lines function.""" + + def test_no_pragmas(self): + source = """def foo(): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == set() + + def test_simple_no_mutate(self): + source = """def foo(): + return 1 + 1 # pragma: no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {2} + assert class_lines == set() + assert function_lines == set() + + def test_no_mutate_class(self): + source = """class Foo: # pragma: no mutate class + def method(self): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == {1} + assert function_lines == set() + + def test_no_mutate_class_with_colon(self): + source = """class Foo: # pragma: no mutate: class + def method(self): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == {1} + assert function_lines == set() + + def test_no_mutate_function(self): + source = """def foo(): # pragma: no mutate function + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == {1} + + def test_no_mutate_function_with_colon(self): + source = """def foo(): # pragma: no mutate: function + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == {1} + + def test_mixed_pragmas(self): + source = """class Skipped: # pragma: no mutate class + def method(self): + return 1 + 1 + +def skipped_func(): # pragma: no mutate function + return 2 + 2 + +def mutated(): + return 3 + 3 # pragma: no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {9} + assert class_lines == {1} + assert function_lines == {5} + + def test_pragma_no_cover_with_no_mutate(self): + source = """def foo(): + return 1 + 1 # pragma: no cover, no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {2} + assert class_lines == set() + assert function_lines == set() + + def test_other_pragma_ignored(self): + source = """def foo(): + return 1 + 1 # pragma: no cover +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == set() diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 00000000..041798b4 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,347 @@ +from pathlib import Path + +import pytest + +from mutmut.configuration import Config +from mutmut.configuration import _config_reader +from mutmut.configuration import _guess_paths_to_mutate +from mutmut.configuration import _load_config + + +@pytest.fixture(autouse=True) +def reset_config(): + """Reset config singleton before and after each test.""" + Config.reset() + yield + Config.reset() + + +@pytest.fixture +def in_tmp_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Change to a temporary directory for the duration of the test.""" + monkeypatch.chdir(tmp_path) + return tmp_path + + +class TestConfigSingleton: + def test_get_loads_config(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config = Config.get() + assert config is not None + assert isinstance(config, Config) + + def test_get_returns_same_instance(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config1 = Config.get() + config2 = Config.get() + assert config1 is config2 + + def test_reset_clears_singleton(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config1 = Config.get() + Config.reset() + config2 = Config.get() + assert config1 is not config2 + + def test_ensure_loaded_is_idempotent(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + Config.ensure_loaded() + config1 = Config.get() + Config.ensure_loaded() + config2 = Config.get() + assert config1 is config2 + + +class TestShouldIgnoreForMutation: + def test_ignores_non_python_files(self): + config = Config( + also_copy=[], + do_not_mutate=[], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.txt") is True + assert config.should_ignore_for_mutation("foo.js") is True + assert config.should_ignore_for_mutation("foo") is True + + def test_does_not_ignore_python_files(self): + config = Config( + also_copy=[], + do_not_mutate=[], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.py") is False + assert config.should_ignore_for_mutation("src/foo.py") is False + + def test_respects_do_not_mutate_exact_match(self): + config = Config( + also_copy=[], + do_not_mutate=["foo.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.py") is True + assert config.should_ignore_for_mutation("bar.py") is False + + def test_respects_do_not_mutate_glob_pattern(self): + config = Config( + also_copy=[], + do_not_mutate=["**/test_*.py", "src/ignore_*.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("tests/test_foo.py") is True + assert config.should_ignore_for_mutation("src/ignore_me.py") is True + assert config.should_ignore_for_mutation("src/keep_me.py") is False + + def test_accepts_path_objects(self): + config = Config( + also_copy=[], + do_not_mutate=["foo.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation(Path("foo.py")) is True + assert config.should_ignore_for_mutation(Path("bar.py")) is False + + +class TestConfigReaderPyprojectToml: + def test_reads_from_pyproject_toml(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +max_stack_depth = 10 +paths_to_mutate = ["src", "lib"] +do_not_mutate = ["**/migrations/*"] +""") + reader = _config_reader() + assert reader("debug", False) is True + assert reader("max_stack_depth", -1) == 10 + assert reader("paths_to_mutate", []) == ["src", "lib"] + assert reader("do_not_mutate", []) == ["**/migrations/*"] + + def test_returns_default_for_missing_key(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +""") + reader = _config_reader() + assert reader("nonexistent", "default_value") == "default_value" + assert reader("max_stack_depth", -1) == -1 + + def test_handles_missing_mutmut_section(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.other] +foo = "bar" +""") + # Should fall through to setup.cfg reader + reader = _config_reader() + assert reader("debug", False) is False + + +class TestConfigReaderSetupCfg: + def test_reads_from_setup_cfg(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +max_stack_depth = 5 +paths_to_mutate = src +""") + reader = _config_reader() + assert reader("debug", False) is True + assert reader("max_stack_depth", -1) == 5 + assert reader("paths_to_mutate", []) == ["src"] + + def test_parses_multiline_list(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +do_not_mutate = + **/migrations/* + **/test_*.py + src/generated.py +""") + reader = _config_reader() + assert reader("do_not_mutate", []) == [ + "**/migrations/*", + "**/test_*.py", + "src/generated.py", + ] + + def test_parses_bool_values(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +""") + reader = _config_reader() + assert reader("debug", False) is True + + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = 1 +""") + reader = _config_reader() + assert reader("debug", False) is True + + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = false +""") + reader = _config_reader() + assert reader("debug", False) is False + + def test_parses_int_values(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +max_stack_depth = 42 +""") + reader = _config_reader() + assert reader("max_stack_depth", -1) == 42 + + def test_returns_default_for_missing_section(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[other] +foo = bar +""") + reader = _config_reader() + assert reader("debug", False) is False + + def test_returns_default_for_missing_key(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +""") + reader = _config_reader() + assert reader("nonexistent", "default") == "default" + + +class TestConfigReaderPriority: + def test_pyproject_toml_takes_precedence(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +""") + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = false +""") + reader = _config_reader() + assert reader("debug", False) is True + + +class TestGuessPathsToMutate: + def test_guesses_lib_directory(self, in_tmp_dir: Path): + (in_tmp_dir / "lib").mkdir() + assert _guess_paths_to_mutate() == ["lib"] + + def test_guesses_src_directory(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + assert _guess_paths_to_mutate() == ["src"] + + def test_prefers_lib_over_src(self, in_tmp_dir: Path): + (in_tmp_dir / "lib").mkdir() + (in_tmp_dir / "src").mkdir() + assert _guess_paths_to_mutate() == ["lib"] + + def test_guesses_directory_matching_cwd_name(self, in_tmp_dir: Path): + # tmp_path has a random name, create a subdir matching it + dir_name = in_tmp_dir.name + (in_tmp_dir / dir_name).mkdir() + assert _guess_paths_to_mutate() == [dir_name] + + def test_guesses_py_file_matching_cwd_name(self, in_tmp_dir: Path): + dir_name = in_tmp_dir.name + (in_tmp_dir / f"{dir_name}.py").touch() + assert _guess_paths_to_mutate() == [f"{dir_name}.py"] + + def test_raises_when_cannot_guess(self, in_tmp_dir: Path): + with pytest.raises(FileNotFoundError, match="Could not figure out"): + _guess_paths_to_mutate() + + +class TestLoadConfig: + def test_loads_all_config_values(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +max_stack_depth = 10 +paths_to_mutate = ["src"] +do_not_mutate = ["**/test_*.py"] +tests_dir = ["tests/unit"] +pytest_add_cli_args = ["-x", "--tb=short"] +pytest_add_cli_args_test_selection = ["--no-header"] +also_copy = ["fixtures"] +mutate_only_covered_lines = true +type_check_command = ["mypy", "--strict"] +""") + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert config.debug is True + assert config.max_stack_depth == 10 + assert config.paths_to_mutate == [Path("src")] + assert config.do_not_mutate == ["**/test_*.py"] + assert config.tests_dir == ["tests/unit"] + assert config.pytest_add_cli_args == ["-x", "--tb=short"] + assert config.pytest_add_cli_args_test_selection == ["--no-header"] + assert Path("fixtures") in config.also_copy + assert config.mutate_only_covered_lines is True + assert config.type_check_command == ["mypy", "--strict"] + + def test_uses_defaults_when_no_config(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert config.debug is False + assert config.max_stack_depth == -1 + assert config.paths_to_mutate == [Path("src")] + assert config.do_not_mutate == [] + assert config.mutate_only_covered_lines is False + assert config.type_check_command == [] + + def test_also_copy_includes_defaults(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert Path("tests/") in config.also_copy + assert Path("test/") in config.also_copy + assert Path("setup.cfg") in config.also_copy + assert Path("pyproject.toml") in config.also_copy diff --git a/tests/test_generation_error_handling.py b/tests/test_generation_error_handling.py index 2c601f31..f556e392 100644 --- a/tests/test_generation_error_handling.py +++ b/tests/test_generation_error_handling.py @@ -6,19 +6,14 @@ import mutmut.__main__ from mutmut.__main__ import InvalidGeneratedSyntaxException from mutmut.__main__ import create_mutants +from mutmut.configuration import Config source_dir = Path(__file__).parent / "data" / "test_generation" source_dir = source_dir.relative_to(Path.cwd()) -class MockConfig: - def should_ignore_for_mutation(self, path: Path) -> bool: - return False - - def test_mutant_generation_raises_exception_on_invalid_syntax(monkeypatch): mutmut._reset_globals() - mutmut.config = MockConfig() shutil.rmtree("mutants", ignore_errors=True) @@ -30,7 +25,7 @@ def test_mutant_generation_raises_exception_on_invalid_syntax(monkeypatch): source_dir / "invalid_syntax.py", ] monkeypatch.setattr(mutmut.__main__, "walk_source_files", lambda: source_files) - monkeypatch.setattr("mutmut.config.should_ignore_for_mutation", lambda _path: False) + monkeypatch.setattr(Config.get(), "should_ignore_for_mutation", lambda _path: False) # should raise an exception, because we copy the invalid_syntax.py file and then verify # if it is valid syntax diff --git a/tests/test_mutation regression.py b/tests/test_mutation regression.py index 6d567025..15eb6db2 100644 --- a/tests/test_mutation regression.py +++ b/tests/test_mutation regression.py @@ -1,8 +1,8 @@ import libcst as cst from inline_snapshot import snapshot -from mutmut.file_mutation import create_trampoline_wrapper -from mutmut.file_mutation import mutate_file_contents +from mutmut.mutation.file_mutation import create_trampoline_wrapper +from mutmut.mutation.file_mutation import mutate_file_contents def _get_trampoline_wrapper(source: str, mangled_name: str, class_name: str | None = None) -> str: @@ -90,37 +90,52 @@ def add(self, value): import lib lib.foo() -from typing import Annotated -from typing import Callable -from typing import ClassVar - -MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore - - -def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore - """Forward call to original or mutated function, depending on the environment""" - import os # type: ignore - mutant_under_test = os.environ['MUTANT_UNDER_TEST'] # type: ignore - if mutant_under_test == 'fail': # type: ignore - from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore - raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore - elif mutant_under_test == 'stats': # type: ignore - from mutmut.__main__ import record_trampoline_hit # type: ignore - record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore - # (for class methods, orig is bound and thus does not need the explicit self argument) - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore - if not mutant_under_test.startswith(prefix): # type: ignore - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore - if self_arg is not None: # type: ignore +from functools import wraps as _mutmut_wraps # mutmut: generated +from typing import Annotated # mutmut: generated +from typing import Callable # mutmut: generated +from typing import ClassVar # mutmut: generated + + +MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore # mutmut: generated + + +def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore # mutmut: generated + """Forward call to original or mutated function, depending on the environment""" # mutmut: generated + import os # type: ignore # mutmut: generated + mutant_under_test = os.environ.get('MUTANT_UNDER_TEST', '') # type: ignore # mutmut: generated + if not mutant_under_test: # mutmut: generated + # No mutant being tested - call original function + if self_arg is not None and not hasattr(orig, '__self__'): # mutmut: generated + return orig(self_arg, *call_args, **call_kwargs) # mutmut: generated + else: # mutmut: generated + return orig(*call_args, **call_kwargs) # mutmut: generated + if mutant_under_test == 'fail': # type: ignore # mutmut: generated + from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore # mutmut: generated + raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore # mutmut: generated + elif mutant_under_test == 'stats': # type: ignore # mutmut: generated + from mutmut.__main__ import record_trampoline_hit # type: ignore # mutmut: generated + record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore # mutmut: generated + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore # mutmut: generated + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore # mutmut: generated + else: # mutmut: generated + result = orig(*call_args, **call_kwargs) # type: ignore # mutmut: generated + return result # type: ignore # mutmut: generated + prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore # mutmut: generated + if not mutant_under_test.startswith(prefix): # type: ignore # mutmut: generated + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore # mutmut: generated + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore # mutmut: generated + else: # mutmut: generated + result = orig(*call_args, **call_kwargs) # type: ignore # mutmut: generated + return result # type: ignore # mutmut: generated + mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore # mutmut: generated + if self_arg is not None: # type: ignore # mutmut: generated # call to a class method where self is not bound - result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore - else: - result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore - return result # type: ignore + result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore # mutmut: generated + else: # mutmut: generated + result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore # mutmut: generated + return result # type: ignore # mutmut: generated def foo(a: list[int], b): args = [a, b]# type: ignore @@ -136,11 +151,17 @@ def x_foo__mutmut_1(a: list[int], b): def x_foo__mutmut_2(a: list[int], b): return a[0] >= b -x_foo__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore -'x_foo__mutmut_1': x_foo__mutmut_1, \n\ - 'x_foo__mutmut_2': x_foo__mutmut_2 -} -x_foo__mutmut_orig.__name__ = 'x_foo' +x_foo__mutmut_mutants : MutantDict = { # mutmut: generated +'x_foo__mutmut_1': x_foo__mutmut_1, # mutmut: generated + 'x_foo__mutmut_2': x_foo__mutmut_2 # mutmut: generated +} # mutmut: generated + +def foo(*args, **kwargs): # mutmut: generated + result = _mutmut_trampoline(x_foo__mutmut_orig, x_foo__mutmut_mutants, args, kwargs) # mutmut: generated + return result # mutmut: generated + +foo = _mutmut_wraps(x_foo__mutmut_orig)(foo) # mutmut: generated +x_foo__mutmut_orig.__name__ = 'x_foo' # mutmut: generated def bar(): args = []# type: ignore @@ -153,10 +174,16 @@ def x_bar__mutmut_orig(): def x_bar__mutmut_1(): yield 2 -x_bar__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore -'x_bar__mutmut_1': x_bar__mutmut_1 -} -x_bar__mutmut_orig.__name__ = 'x_bar' +x_bar__mutmut_mutants : MutantDict = { # mutmut: generated +'x_bar__mutmut_1': x_bar__mutmut_1 # mutmut: generated +} # mutmut: generated + +def bar(*args, **kwargs): # mutmut: generated + result = _mutmut_trampoline(x_bar__mutmut_orig, x_bar__mutmut_mutants, args, kwargs) # mutmut: generated + return result # mutmut: generated + +bar = _mutmut_wraps(x_bar__mutmut_orig)(bar) # mutmut: generated +x_bar__mutmut_orig.__name__ = 'x_bar' # mutmut: generated class Adder: def __init__(self, amount): @@ -168,10 +195,16 @@ def xǁAdderǁ__init____mutmut_orig(self, amount): def xǁAdderǁ__init____mutmut_1(self, amount): self.amount = None \n\ - xǁAdderǁ__init____mutmut_mutants : ClassVar[MutantDict] = { # type: ignore - 'xǁAdderǁ__init____mutmut_1': xǁAdderǁ__init____mutmut_1 - } - xǁAdderǁ__init____mutmut_orig.__name__ = 'xǁAdderǁ__init__' + xǁAdderǁ__init____mutmut_mutants : ClassVar[MutantDict] = { # mutmut: generated + 'xǁAdderǁ__init____mutmut_1': xǁAdderǁ__init____mutmut_1 # mutmut: generated + } # mutmut: generated + \n\ + def __init__(self, *args, **kwargs): # mutmut: generated + result = _mutmut_trampoline(object.__getattribute__(self, "xǁAdderǁ__init____mutmut_orig"), object.__getattribute__(self, "xǁAdderǁ__init____mutmut_mutants"), args, kwargs, self) # mutmut: generated + return result # mutmut: generated + \n\ + __init__ = _mutmut_wraps(xǁAdderǁ__init____mutmut_orig)(__init__) # mutmut: generated + xǁAdderǁ__init____mutmut_orig.__name__ = 'xǁAdderǁ__init__' # mutmut: generated def add(self, value): args = [value]# type: ignore @@ -184,10 +217,16 @@ def xǁAdderǁadd__mutmut_orig(self, value): def xǁAdderǁadd__mutmut_1(self, value): return self.amount - value \n\ - xǁAdderǁadd__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore - 'xǁAdderǁadd__mutmut_1': xǁAdderǁadd__mutmut_1 - } - xǁAdderǁadd__mutmut_orig.__name__ = 'xǁAdderǁadd' + xǁAdderǁadd__mutmut_mutants : ClassVar[MutantDict] = { # mutmut: generated + 'xǁAdderǁadd__mutmut_1': xǁAdderǁadd__mutmut_1 # mutmut: generated + } # mutmut: generated + \n\ + def add(self, *args, **kwargs): # mutmut: generated + result = _mutmut_trampoline(object.__getattribute__(self, "xǁAdderǁadd__mutmut_orig"), object.__getattribute__(self, "xǁAdderǁadd__mutmut_mutants"), args, kwargs, self) # mutmut: generated + return result # mutmut: generated + \n\ + add = _mutmut_wraps(xǁAdderǁadd__mutmut_orig)(add) # mutmut: generated + xǁAdderǁadd__mutmut_orig.__name__ = 'xǁAdderǁadd' # mutmut: generated print(Adder(1).add(2))\ ''') diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/test_safe_setproctitle.py b/tests/utils/test_safe_setproctitle.py new file mode 100644 index 00000000..7b528c38 --- /dev/null +++ b/tests/utils/test_safe_setproctitle.py @@ -0,0 +1,86 @@ +"""Tests for safe_setproctitle module. + +This test verifies that setproctitle crashes on macOS Python 3.14+ after fork +when CoreFoundation has been loaded (which happens during normal mutmut operation). +When setproctitle is fixed upstream, this test will fail and we can remove the workaround. +""" + +import os +import platform +import signal + +import pytest +from setproctitle import setproctitle + +from mutmut.utils.safe_setproctitle import safe_setproctitle + +# Only run this test on macOS +IS_MACOS = platform.system() == "Darwin" + + +@pytest.mark.skipif(not IS_MACOS, reason="setproctitle only crashes after fork on macOS") +def test_setproctitle_crashes_after_fork_with_corefoundation_loaded(): + """Verify setproctitle segfaults after fork when CoreFoundation is loaded. + + This test exists to detect when setproctitle is fixed upstream. + If this test FAILS, it means setproctitle no longer crashes and we can + remove the workaround in safe_setproctitle.py. + + The crash only happens when CoreFoundation has been loaded before fork. + We trigger this by calling setproctitle once in the parent before forking. + """ + # Import and call setproctitle in the parent first - this loads CoreFoundation + + setproctitle("parent process") + + pid = os.fork() + + if pid == 0: + # Child process - call setproctitle again + try: + setproctitle("child process") + # If we get here, setproctitle didn't crash - exit with success + os._exit(0) + except Exception: + # If it raises a Python exception instead of segfaulting + os._exit(1) + else: + # Parent process - wait for child and check exit status + _, status = os.waitpid(pid, 0) + + if os.WIFSIGNALED(status): + exit_signal = os.WTERMSIG(status) + assert exit_signal == signal.SIGSEGV, ( + f"Expected SIGSEGV (11), got signal {exit_signal}. setproctitle crash behavior may have changed." + ) + else: + exit_code = os.WEXITSTATUS(status) + pytest.fail( + f"setproctitle did NOT crash (exit code {exit_code}). " + "The library may have been fixed! Consider removing the " + "workaround in safe_setproctitle.py" + ) + + +@pytest.mark.skipif(not IS_MACOS, reason="safe_setproctitle workaround only applies to macOS") +def test_safe_setproctitle_does_not_crash_after_fork(): + """Verify our safe_setproctitle wrapper doesn't crash after fork.""" + pid = os.fork() + + if pid == 0: + # Child process - use our safe wrapper + try: + safe_setproctitle("test title") + os._exit(0) # Success + except Exception: + os._exit(1) # Failed with exception + else: + # Parent process + _, status = os.waitpid(pid, 0) + + if os.WIFSIGNALED(status): + exit_signal = os.WTERMSIG(status) + pytest.fail(f"safe_setproctitle crashed with signal {exit_signal}! The workaround is not working.") + else: + exit_code = os.WEXITSTATUS(status) + assert exit_code == 0, f"safe_setproctitle failed with exit code {exit_code}"