|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import argparse |
| 4 | +import shlex |
| 5 | +import subprocess |
| 6 | +import sys |
| 7 | +from collections.abc import Sequence |
| 8 | +from functools import cached_property |
| 9 | +from typing import Any |
| 10 | + |
| 11 | + |
| 12 | +class CLIError(Exception): |
| 13 | + pass |
| 14 | + |
| 15 | + |
| 16 | +class YKAll: |
| 17 | + def __call__(self) -> None: |
| 18 | + try: |
| 19 | + self.args.command_func() |
| 20 | + except CLIError as e: |
| 21 | + print(f"Error: {e}", file=sys.stderr) |
| 22 | + sys.exit(1) |
| 23 | + |
| 24 | + def command_default( |
| 25 | + self, custom_args: Sequence[str, str] | None = None |
| 26 | + ) -> None: |
| 27 | + self._run_ykman(self.extra_args) |
| 28 | + |
| 29 | + def command_addtotp(self) -> None: |
| 30 | + self._run_ykman( |
| 31 | + ["oath", "accounts", "add", "-t", "-f"] + self.extra_args |
| 32 | + ) |
| 33 | + |
| 34 | + def command_deltotp(self) -> None: |
| 35 | + self._run_ykman(["oath", "accounts", "delete"] + self.extra_args) |
| 36 | + |
| 37 | + def _run_ykman(self, cmd_args: Sequence[str, str]) -> None: |
| 38 | + self._run(["ykman", "list"]) |
| 39 | + if not cmd_args: |
| 40 | + return |
| 41 | + for serial in sorted(self.serial_numbers): |
| 42 | + try: |
| 43 | + self._run(["ykman", "--device", str(serial)] + cmd_args) |
| 44 | + except subprocess.CalledProcessError as e: |
| 45 | + sys.exit(e.returncode) |
| 46 | + if {"-h", "--help"} & set(cmd_args): |
| 47 | + break |
| 48 | + |
| 49 | + def _run(self, cmd: Sequence[str], *args: Any, **kwargs: Any) -> Any: |
| 50 | + kwargs.setdefault("check", True) |
| 51 | + print("+", " ".join(shlex.quote(c) for c in cmd), file=sys.stderr) |
| 52 | + return subprocess.run(cmd, *args, **kwargs) |
| 53 | + |
| 54 | + @cached_property |
| 55 | + def args(self) -> argparse.Namespace: |
| 56 | + return self._args[0] |
| 57 | + |
| 58 | + @cached_property |
| 59 | + def extra_args(self) -> dict[str, str]: |
| 60 | + return self._args[1] |
| 61 | + |
| 62 | + @cached_property |
| 63 | + def _args(self) -> tuple[argparse.Namespace, dict[str, str]]: |
| 64 | + ap = argparse.ArgumentParser( |
| 65 | + description="ykman helpers", add_help=False |
| 66 | + ) |
| 67 | + ap.add_argument( |
| 68 | + "-H", action="help", help="Show this help message and exit." |
| 69 | + ) |
| 70 | + ap.set_defaults(command_func=self.command_default) |
| 71 | + subp = ap.add_subparsers( |
| 72 | + title="Commands", metavar="command", required=False |
| 73 | + ) |
| 74 | + default_p = subp.add_parser("default", help="Default") |
| 75 | + default_p.set_defaults(command_func=self.command_default) |
| 76 | + addtotp_p = subp.add_parser("addtotp", help="Add TOTP account") |
| 77 | + addtotp_p.set_defaults(command_func=self.command_addtotp) |
| 78 | + deltotp_p = subp.add_parser("deltotp", help="Add TOTP account") |
| 79 | + deltotp_p.set_defaults(command_func=self.command_deltotp) |
| 80 | + if len(sys.argv) > 1 and not hasattr(self, f"command_{sys.argv[1]}"): |
| 81 | + sys.argv.insert(1, "default") |
| 82 | + return ap.parse_known_args() |
| 83 | + |
| 84 | + @cached_property |
| 85 | + def serial_numbers(self) -> set[int]: |
| 86 | + output = subprocess.check_output(["ykman", "list", "-s"], text=True) |
| 87 | + return {int(s) for s in output.strip().splitlines()} |
| 88 | + |
| 89 | + |
| 90 | +if __name__ == "__main__": |
| 91 | + YKAll()() |
0 commit comments