Coverage for src/sopsy/utils.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-02 12:01 +0000

1"""SOPSy utils.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import logging 

7import subprocess 

8from pathlib import Path 

9from typing import Any 

10 

11import yaml 

12 

13from sopsy.errors import SopsyCommandFailedError 

14from sopsy.errors import SopsyConfigNotFoundError 

15from sopsy.errors import SopsyUnparsableOutpoutTypeError 

16 

17DEFAULT_CONFIG_FILE = Path(".sops.yaml") 

18logger = logging.getLogger(__name__) 

19 

20 

21def build_config( 

22 config_path: Path | None, config_dict: dict[str, Any] | None 

23) -> dict[str, Any]: 

24 """Merge config file content and python config dict.""" 

25 config: dict[str, Any] = {} 

26 if not config_path: 

27 config_path = DEFAULT_CONFIG_FILE 

28 config_path = find_sops_config(Path(config_path)) 

29 if config_path: 

30 config = yaml.safe_load(config_path.read_text()) 

31 if config_dict: 

32 for k in config_dict: 

33 if k not in config: 

34 config[k] = config_dict[k] 

35 elif isinstance(config_dict[k], dict): 

36 config[k].update(config_dict[k]) 

37 elif isinstance(config_dict[k], list): 

38 config[k].insert(0, *config_dict[k]) 

39 else: 

40 config[k] = config_dict[k] 

41 logger.debug("config: %s", config) 

42 return config 

43 

44 

45def find_sops_config(config_path: Path = DEFAULT_CONFIG_FILE) -> Path | None: 

46 """Try to find the configuration file until the filesystem root.""" 

47 if config_path.is_absolute(): 

48 if config_path.exists(): 

49 return config_path 

50 msg = f"config path {config_path} does not exists" 

51 raise SopsyConfigNotFoundError(msg) 

52 

53 cwd = Path.cwd() 

54 while True: 

55 cwd_config = cwd.joinpath(config_path) 

56 if cwd_config.exists(): 

57 return cwd_config 

58 cwd_parent = cwd.parent 

59 if cwd == cwd_parent: 

60 msg = f"config file does not exists '{config_path}'" 

61 if config_path == DEFAULT_CONFIG_FILE: 

62 # do not fail on default config file absence 

63 logger.warning("default %s", msg) 

64 return None 

65 raise SopsyConfigNotFoundError(msg) 

66 cwd = cwd.parent 

67 

68 

69def get_dict(data: bytes | str) -> dict[str, Any]: 

70 """Parse data and return a dict from it.""" 

71 out = {} 

72 

73 if isinstance(data, bytes): 

74 data = data.decode() 

75 

76 if data.startswith("{"): 

77 try: 

78 out = json.loads(data) 

79 except json.JSONDecodeError as json_err: 

80 raise SopsyUnparsableOutpoutTypeError from json_err 

81 else: 

82 try: 

83 out = yaml.safe_load(data) 

84 except yaml.YAMLError as yaml_err: 

85 raise SopsyUnparsableOutpoutTypeError from yaml_err 

86 

87 return out 

88 

89 

90def run_cmd( 

91 cmd: list[str], *, to_dict: bool, input_data: str | bytes | None = None 

92) -> str | bytes | dict[str, Any] | None: 

93 """Run the given SOPS command.""" 

94 try: 

95 logger.debug("run_cmd: %s", cmd) 

96 logger.debug("to_dict: %s", to_dict) 

97 logger.debug("input_data: %s", input_data) 

98 proc = subprocess.run( # noqa: S603 

99 cmd, 

100 input=input_data, 

101 text=isinstance(input_data, str), 

102 capture_output=True, 

103 check=True, 

104 ) 

105 except subprocess.CalledProcessError as proc_err: 

106 msg = proc_err.stderr 

107 if isinstance(msg, bytes): 

108 msg = msg.decode() 

109 raise SopsyCommandFailedError(msg) from proc_err 

110 if {"-i", "--in-place", "--output"}.intersection(cmd): 

111 return None 

112 if to_dict: 

113 return get_dict(proc.stdout) 

114 return proc.stdout