Coverage for src/sopsy/sopsy.py: 100%

80 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-08 20:24 +0000

1"""SOPSy, a Python wrapper around SOPS.""" 

2 

3from __future__ import annotations 

4 

5import shutil 

6import tempfile 

7from enum import Enum 

8from pathlib import Path 

9from typing import Any 

10 

11import yaml 

12 

13from sopsy.errors import SopsyCommandNotFoundError 

14from sopsy.errors import SopsyError 

15from sopsy.utils import build_config 

16from sopsy.utils import run_cmd 

17 

18 

19class SopsyInOutType(Enum): 

20 """SOPS output types. 

21 

22 Intended to be passed on `Sops().input_type` and `Sops().output_type`. 

23 

24 Attributes: 

25 BINARY (str): Binary type. 

26 DOTENV (str): DotEnv type. 

27 JSON (str): JSON type. 

28 YAML (str): YAML type. 

29 """ 

30 

31 BINARY = "binary" 

32 DOTENV = "dotenv" 

33 JSON = "json" 

34 YAML = "yaml" 

35 

36 def __str__(self) -> str: 

37 """Return the string used for str() calls.""" 

38 return f"{self.value}" 

39 

40 

41class SopsyInputSource(Enum): 

42 """SOPS input source. 

43 

44 Used to determinie wether input data come from a file or stdin. 

45 

46 Attributes: 

47 FILE (str): From an on-disk file. 

48 STDIN (str): From stdin. 

49 """ 

50 

51 FILE = "file" 

52 STDIN = "stdin" 

53 

54 

55class Sops: 

56 """SOPS file object. 

57 

58 Attributes: 

59 bin: Path to the SOPS binary. 

60 file: Path to the SOPS file or content to encrypt/decrypt. 

61 global_args: The list of arguments that will be passed to the `sops` shell 

62 command. It can be used to customize it. Use it only if you know what you 

63 are doing. 

64 input_source: Wether input data come from a file or stdin. 

65 """ 

66 

67 def __init__( # noqa: C901 

68 self, 

69 file: str | Path | bytes, 

70 *, 

71 binary_path: str | Path | None = None, 

72 config: str | Path | None = None, 

73 config_dict: dict[str, Any] | None = None, 

74 extract: str | None = None, 

75 in_place: bool = False, 

76 input_type: str | SopsyInOutType | None = None, 

77 output: str | Path | None = None, 

78 output_type: str | SopsyInOutType | None = None, 

79 input_source: SopsyInputSource = SopsyInputSource.FILE, 

80 ) -> None: 

81 """Initialize SOPS object. 

82 

83 Examples: 

84 >>> from pathlib import Path 

85 >>> from sopsy import Sops 

86 >>> sops = Sops( 

87 >>> binary_path="/app/bin/my_custom_sops", 

88 >>> config=Path(".config/sops.yml"), 

89 >>> file=Path("secrets.json"), 

90 >>> ) 

91 

92 Args: 

93 file: Path to the SOPS file or content to encrypt/decrypt. 

94 config: Path to a custom SOPS config file. 

95 config_dict: Allow to pass SOPS config as a python dict. 

96 extract: Extract a specific key or branch from the input document. 

97 in_place: Write output back to the same file instead of stdout. 

98 input_type: If not set, sops will use the file's extension to determine 

99 the type. 

100 output: Save the output after encryption or decryption to the file 

101 specified. 

102 output_type: If not set, sops will use the input file's extension to 

103 determine the output format. 

104 binary_path: Path to the SOPS binary. If not defined it will search for it 

105 in the PATH environment variable. 

106 input_source: Wether input data come from a file or stdin. 

107 """ 

108 self.bin: Path = Path(binary_path) if binary_path else Path("sops") 

109 self.file: str | Path | bytes = file 

110 self.global_args: list[str] = [] 

111 self.input_source: SopsyInputSource = input_source 

112 if extract: 

113 self.global_args.extend(["--extract", extract]) 

114 if in_place: 

115 self.global_args.extend(["--in-place"]) 

116 if input_type: 

117 self.global_args.extend(["--input-type", str(input_type)]) 

118 self.input_type: str | SopsyInOutType | None = input_type 

119 if output: 

120 self.global_args.extend(["--output", str(output)]) 

121 if output_type: 

122 self.global_args.extend(["--output-type", str(output_type)]) 

123 

124 if isinstance(config, str): 

125 config = Path(config) 

126 if config_dict is None: 

127 config_dict = {} 

128 config_dict = build_config(config_path=config, config_dict=config_dict) 

129 with tempfile.NamedTemporaryFile(mode="w", delete=False) as fp: 

130 yaml.dump(config_dict, fp) 

131 config_tmp = fp.name 

132 self.config: list[str] = ["--config", config_tmp] 

133 

134 if input_source == SopsyInputSource.STDIN and not input_type: 

135 msg = "When using stdin source, input MUST be specified" 

136 raise SopsyError(msg) 

137 

138 if input_source == SopsyInputSource.STDIN and isinstance(file, Path): 

139 msg = "Path type cannot be used with stdin input source." 

140 raise SopsyError(msg) 

141 

142 if not shutil.which(self.bin): 

143 msg = ( 

144 f"{self.bin} command not found, " 

145 "you may need to install it and/or add it to your PATH" 

146 ) 

147 raise SopsyCommandNotFoundError(msg) 

148 

149 def decrypt(self, *, to_dict: bool = True) -> str | bytes | dict[str, Any] | None: 

150 """Decrypt SOPS file. 

151 

152 Examples: 

153 >>> from sopsy import Sops, SopsyInOutType 

154 >>> sops = Sops("secrets.json", output_type=SopsyInOutType.YAML) 

155 >>> sops.decrypt(to_dict=False) 

156 hello: world 

157 

158 Args: 

159 to_dict: Return the output as a Python dict. 

160 

161 Returns: 

162 The output of the sops command. 

163 """ 

164 cmd = [str(self.bin), *self.config, "decrypt", *self.global_args] 

165 if self.input_source == SopsyInputSource.STDIN: 

166 cmd.extend(["--filename-override", f"dummy.{self.input_type}"]) 

167 input_data = self.file 

168 assert not isinstance(input_data, Path) # noqa: S101 

169 else: 

170 cmd.append(str(self.file)) 

171 input_data = None 

172 return run_cmd(cmd, to_dict=to_dict, input_data=input_data) 

173 

174 def encrypt(self, *, to_dict: bool = True) -> str | bytes | dict[str, Any] | None: 

175 """Encrypt SOPS file. 

176 

177 Examples: 

178 >>> import json 

179 >>> from pathlib import Path 

180 >>> from sopsy import Sops 

181 >>> secrets = Path("secrets.json") 

182 >>> secrets.write_text(json.dumps({"hello": "world"})) 

183 >>> sops = Sops(secrets, in_place=True) 

184 >>> sops.encrypt() 

185 

186 Args: 

187 to_dict: Return the output as a Python dict. 

188 

189 Returns: 

190 The output of the sops command. 

191 """ 

192 cmd = [str(self.bin), *self.config, "encrypt", *self.global_args] 

193 if self.input_source == SopsyInputSource.STDIN: 

194 cmd.extend(["--filename-override", f"dummy.{self.input_type}"]) 

195 input_data = self.file 

196 assert not isinstance(input_data, Path) # noqa: S101 

197 else: 

198 cmd.append(str(self.file)) 

199 input_data = None 

200 return run_cmd(cmd, to_dict=to_dict, input_data=input_data) 

201 

202 def get(self, key: str, *, default: Any = None) -> Any: # noqa: ANN401 

203 """Get a specific key from a SOPS encrypted file. 

204 

205 Examples: 

206 >>> from sopsy import Sops 

207 >>> sops = Sops("secrets.json") 

208 >>> sops.get("hello") 

209 b'world' 

210 >>> sops.get("nonexistent", default="DefaultValue") 

211 'DefaultValue' 

212 

213 Args: 

214 key: The key to fetch in the SOPS file content. 

215 default: A default value in case the key does not exist or is empty. 

216 

217 Returns: 

218 The value of the given key, or the default value. 

219 """ 

220 data: dict[Any, Any] = self.decrypt() # pyright: ignore[reportAssignmentType] 

221 return data.get(key) or default 

222 

223 def rotate(self, *, to_dict: bool = True) -> str | bytes | dict[str, Any] | None: 

224 """Rotate encryption keys and re-encrypt values from SOPS file. 

225 

226 Examples: 

227 >>> from sopsy import Sops 

228 >>> sops = Sops("secrets.json", in_place=True) 

229 >>> sops.rotate() 

230 

231 Args: 

232 to_dict: Return the output as a Python dict. 

233 

234 Returns: 

235 The output of the sops command. 

236 """ 

237 cmd = [str(self.bin), *self.config, "rotate", *self.global_args, str(self.file)] 

238 return run_cmd(cmd, to_dict=to_dict)