Coverage for src/sopsy/utils.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-24 08:38 +0000

1"""SOPSy utils.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import logging 

7import subprocess 

8from pathlib import Path 

9from typing import Any 

10 

11import yaml 

12 

13from sopsy.errors import SopsyCommandFailedError 

14from sopsy.errors import SopsyConfigNotFoundError 

15from sopsy.errors import SopsyUnparsableOutpoutTypeError 

16 

17DEFAULT_CONFIG_FILE = Path(".sops.yaml") 

18logger = logging.getLogger(__name__) 

19 

20 

21def build_config( 

22 config_path: Path | None, config_dict: dict[str, Any] | None 

23) -> dict[str, Any]: 

24 """Merge config file content and python config dict.""" 

25 config: dict[str, Any] = {} 

26 if not config_path: 

27 config_path = DEFAULT_CONFIG_FILE 

28 config_path = find_sops_config(Path(config_path)) 

29 if config_path: 

30 config = yaml.safe_load(config_path.read_text()) 

31 if config_dict: 

32 for k in config_dict: 

33 if k not in config: 

34 config[k] = config_dict[k] 

35 elif isinstance(config_dict[k], dict): 

36 config[k].update(config_dict[k]) 

37 elif isinstance(config_dict[k], list): 

38 config[k].insert(0, *config_dict[k]) 

39 else: 

40 config[k] = config_dict[k] 

41 logger.debug("config: %s", config) 

42 return config 

43 

44 

45def find_sops_config(config_path: Path = DEFAULT_CONFIG_FILE) -> Path | None: 

46 """Try to find the configuration file until the filesystem root.""" 

47 if config_path.is_absolute(): 

48 if config_path.exists(): 

49 return config_path 

50 msg = f"config path {config_path} does not exists" 

51 raise SopsyConfigNotFoundError(msg) 

52 

53 cwd = Path.cwd() 

54 while True: 

55 cwd_config = cwd.joinpath(config_path) 

56 if cwd_config.exists(): 

57 return cwd_config 

58 cwd_parent = cwd.parent 

59 if cwd == cwd_parent: 

60 msg = f"config file does not exists '{config_path}'" 

61 if config_path == DEFAULT_CONFIG_FILE: 

62 # do not fail on default config file absence 

63 logger.warning("default %s", msg) 

64 return None 

65 raise SopsyConfigNotFoundError(msg) 

66 cwd = cwd.parent 

67 

68 

69def get_dict(data: bytes | str) -> dict[str, Any]: 

70 """Parse data and return a dict from it.""" 

71 out = {} 

72 

73 if isinstance(data, bytes): 

74 data = data.decode() 

75 

76 if data.startswith("{"): 

77 try: 

78 out = json.loads(data) 

79 except json.JSONDecodeError as json_err: 

80 raise SopsyUnparsableOutpoutTypeError from json_err 

81 else: 

82 try: 

83 out = yaml.safe_load(data) 

84 except yaml.YAMLError as yaml_err: 

85 raise SopsyUnparsableOutpoutTypeError from yaml_err 

86 

87 return out 

88 

89 

90def run_cmd(cmd: list[str], *, to_dict: bool) -> bytes | dict[str, Any] | None: 

91 """Run the given SOPS command.""" 

92 try: 

93 logger.debug("run_cmd: %s", cmd) 

94 proc = subprocess.run(cmd, capture_output=True, check=True) # noqa: S603 

95 except subprocess.CalledProcessError as proc_err: 

96 raise SopsyCommandFailedError(proc_err.stderr.decode()) from proc_err 

97 if {"-i", "--in-place", "--output"}.intersection(cmd): 

98 return None 

99 if to_dict: 

100 return get_dict(proc.stdout) 

101 return proc.stdout