Coverage for src/sopsy/utils.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-23 04:59 +0000

1"""SOPSy utils.""" 

2 

3from __future__ import annotations 

4 

5import subprocess 

6from pathlib import Path 

7from typing import Any 

8 

9import yaml 

10 

11from sopsy.errors import SopsyCommandFailedError 

12from sopsy.errors import SopsyConfigNotFoundError 

13from sopsy.errors import SopsyUnparsableOutpoutTypeError 

14 

15DEFAULT_CONFIG_FILE = Path(".sops.yaml") 

16 

17 

18def build_config( 

19 config_path: Path | None, config_dict: dict[str, Any] | None 

20) -> dict[str, Any]: 

21 """Merge config file content and python config dict.""" 

22 config = {} 

23 if not config_path: 

24 config_path = DEFAULT_CONFIG_FILE 

25 config_path = find_sops_config(Path(config_path)) 

26 config = yaml.safe_load(config_path.read_text()) 

27 if config_dict: 

28 for k in config_dict: 

29 if k not in config: 

30 config[k] = config_dict[k] 

31 elif isinstance(config_dict[k], dict): 

32 config[k].update(config_dict[k]) 

33 elif isinstance(config_dict[k], list): 

34 config[k].insert(0, *config_dict[k]) 

35 else: 

36 config[k] = config_dict[k] 

37 return config 

38 

39 

40def find_sops_config(config_path: Path = DEFAULT_CONFIG_FILE) -> Path: 

41 """Try to find the configuration file until the filesystem root.""" 

42 if config_path.is_absolute(): 

43 if config_path.exists(): 

44 return config_path 

45 msg = f"config path {config_path} does not exists" 

46 raise SopsyConfigNotFoundError(msg) 

47 

48 cwd = Path.cwd() 

49 while True: 

50 cwd_config = cwd.joinpath(config_path) 

51 if cwd_config.exists(): 

52 return cwd_config 

53 cwd_parent = cwd.parent 

54 if cwd == cwd_parent: 

55 msg = f"config path {config_path} does not exists" 

56 raise SopsyConfigNotFoundError(msg) 

57 cwd = cwd.parent 

58 

59 

60def get_dict(data: bytes | str) -> dict[str, Any]: 

61 """Parse data and return a dict from it.""" 

62 out = {} 

63 

64 # pyyaml can load either yaml or json content 

65 try: 

66 out = yaml.safe_load(data) 

67 except yaml.YAMLError as yaml_err: 

68 raise SopsyUnparsableOutpoutTypeError from yaml_err 

69 

70 return out 

71 

72 

73def run_cmd(cmd: list[str], *, to_dict: bool) -> bytes | dict[str, Any] | None: 

74 """Run the given SOPS command.""" 

75 try: 

76 proc = subprocess.run(cmd, capture_output=True, check=True) # noqa: S603 

77 except subprocess.CalledProcessError as proc_err: 

78 raise SopsyCommandFailedError(proc_err.stderr.decode()) from proc_err 

79 if {"-i", "--in-place", "--output"}.intersection(cmd): 

80 return None 

81 if to_dict: 

82 return get_dict(proc.stdout) 

83 return proc.stdout