Coverage for src/sopsy/utils.py: 100%
58 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-23 04:59 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-23 04:59 +0000
1"""SOPSy utils."""
3from __future__ import annotations
5import subprocess
6from pathlib import Path
7from typing import Any
9import yaml
11from sopsy.errors import SopsyCommandFailedError
12from sopsy.errors import SopsyConfigNotFoundError
13from sopsy.errors import SopsyUnparsableOutpoutTypeError
15DEFAULT_CONFIG_FILE = Path(".sops.yaml")
18def build_config(
19 config_path: Path | None, config_dict: dict[str, Any] | None
20) -> dict[str, Any]:
21 """Merge config file content and python config dict."""
22 config = {}
23 if not config_path:
24 config_path = DEFAULT_CONFIG_FILE
25 config_path = find_sops_config(Path(config_path))
26 config = yaml.safe_load(config_path.read_text())
27 if config_dict:
28 for k in config_dict:
29 if k not in config:
30 config[k] = config_dict[k]
31 elif isinstance(config_dict[k], dict):
32 config[k].update(config_dict[k])
33 elif isinstance(config_dict[k], list):
34 config[k].insert(0, *config_dict[k])
35 else:
36 config[k] = config_dict[k]
37 return config
40def find_sops_config(config_path: Path = DEFAULT_CONFIG_FILE) -> Path:
41 """Try to find the configuration file until the filesystem root."""
42 if config_path.is_absolute():
43 if config_path.exists():
44 return config_path
45 msg = f"config path {config_path} does not exists"
46 raise SopsyConfigNotFoundError(msg)
48 cwd = Path.cwd()
49 while True:
50 cwd_config = cwd.joinpath(config_path)
51 if cwd_config.exists():
52 return cwd_config
53 cwd_parent = cwd.parent
54 if cwd == cwd_parent:
55 msg = f"config path {config_path} does not exists"
56 raise SopsyConfigNotFoundError(msg)
57 cwd = cwd.parent
60def get_dict(data: bytes | str) -> dict[str, Any]:
61 """Parse data and return a dict from it."""
62 out = {}
64 # pyyaml can load either yaml or json content
65 try:
66 out = yaml.safe_load(data)
67 except yaml.YAMLError as yaml_err:
68 raise SopsyUnparsableOutpoutTypeError from yaml_err
70 return out
73def run_cmd(cmd: list[str], *, to_dict: bool) -> bytes | dict[str, Any] | None:
74 """Run the given SOPS command."""
75 try:
76 proc = subprocess.run(cmd, capture_output=True, check=True) # noqa: S603
77 except subprocess.CalledProcessError as proc_err:
78 raise SopsyCommandFailedError(proc_err.stderr.decode()) from proc_err
79 if {"-i", "--in-place", "--output"}.intersection(cmd):
80 return None
81 if to_dict:
82 return get_dict(proc.stdout)
83 return proc.stdout