Coverage for src/sopsy/utils.py: 100%
74 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-24 08:38 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-24 08:38 +0000
1"""SOPSy utils."""
3from __future__ import annotations
5import json
6import logging
7import subprocess
8from pathlib import Path
9from typing import Any
11import yaml
13from sopsy.errors import SopsyCommandFailedError
14from sopsy.errors import SopsyConfigNotFoundError
15from sopsy.errors import SopsyUnparsableOutpoutTypeError
17DEFAULT_CONFIG_FILE = Path(".sops.yaml")
18logger = logging.getLogger(__name__)
21def build_config(
22 config_path: Path | None, config_dict: dict[str, Any] | None
23) -> dict[str, Any]:
24 """Merge config file content and python config dict."""
25 config: dict[str, Any] = {}
26 if not config_path:
27 config_path = DEFAULT_CONFIG_FILE
28 config_path = find_sops_config(Path(config_path))
29 if config_path:
30 config = yaml.safe_load(config_path.read_text())
31 if config_dict:
32 for k in config_dict:
33 if k not in config:
34 config[k] = config_dict[k]
35 elif isinstance(config_dict[k], dict):
36 config[k].update(config_dict[k])
37 elif isinstance(config_dict[k], list):
38 config[k].insert(0, *config_dict[k])
39 else:
40 config[k] = config_dict[k]
41 logger.debug("config: %s", config)
42 return config
45def find_sops_config(config_path: Path = DEFAULT_CONFIG_FILE) -> Path | None:
46 """Try to find the configuration file until the filesystem root."""
47 if config_path.is_absolute():
48 if config_path.exists():
49 return config_path
50 msg = f"config path {config_path} does not exists"
51 raise SopsyConfigNotFoundError(msg)
53 cwd = Path.cwd()
54 while True:
55 cwd_config = cwd.joinpath(config_path)
56 if cwd_config.exists():
57 return cwd_config
58 cwd_parent = cwd.parent
59 if cwd == cwd_parent:
60 msg = f"config file does not exists '{config_path}'"
61 if config_path == DEFAULT_CONFIG_FILE:
62 # do not fail on default config file absence
63 logger.warning("default %s", msg)
64 return None
65 raise SopsyConfigNotFoundError(msg)
66 cwd = cwd.parent
69def get_dict(data: bytes | str) -> dict[str, Any]:
70 """Parse data and return a dict from it."""
71 out = {}
73 if isinstance(data, bytes):
74 data = data.decode()
76 if data.startswith("{"):
77 try:
78 out = json.loads(data)
79 except json.JSONDecodeError as json_err:
80 raise SopsyUnparsableOutpoutTypeError from json_err
81 else:
82 try:
83 out = yaml.safe_load(data)
84 except yaml.YAMLError as yaml_err:
85 raise SopsyUnparsableOutpoutTypeError from yaml_err
87 return out
90def run_cmd(cmd: list[str], *, to_dict: bool) -> bytes | dict[str, Any] | None:
91 """Run the given SOPS command."""
92 try:
93 logger.debug("run_cmd: %s", cmd)
94 proc = subprocess.run(cmd, capture_output=True, check=True) # noqa: S603
95 except subprocess.CalledProcessError as proc_err:
96 raise SopsyCommandFailedError(proc_err.stderr.decode()) from proc_err
97 if {"-i", "--in-place", "--output"}.intersection(cmd):
98 return None
99 if to_dict:
100 return get_dict(proc.stdout)
101 return proc.stdout