Coverage for cfnStack/determine_actions.py: 80%

123 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-17 23:03 +0000

1#!/usr/bin/env python3 

2 

3import logging 

4import os.path 

5import importlib.resources 

6 

7import yaml 

8import jinja2 

9import boto3 

10 

11import cfnStack 

12 

13class ActionParser: 

14 

15 _default_region = "us-east-1" 

16 

17 def __init__(self, 

18 only_profiles: list[str] = [], 

19 stackname: list[str] = [], 

20 confirm: bool = False, 

21 category: str|None=None, 

22 config: str|None=None, 

23 stack: str|None=None, 

24 description: str = "No Description Given", 

25 regions: list[str] = [], 

26 capabilities: list[str] = [], 

27 dynamic_tags: list[str|dict] = [], 

28 parameters: list[str|dict] = [], 

29 profiles: list[str] = [], 

30 delete: bool = False, 

31 **kwargs): 

32 

33 self.logger = logging.getLogger(__name__) 

34 

35 self.stackname = stackname 

36 

37 self.kwargs = kwargs 

38 self.errors = dict() 

39 self.delete = delete 

40 self.description = description 

41 self.capabilities = capabilities 

42 self.regions = regions 

43 self.parameters = parameters 

44 self.only_profiles = only_profiles 

45 self.profiles = profiles 

46 self.dynamic_tags = dynamic_tags 

47 self.live_add = self.parse_params() 

48 self.category_dir = category 

49 self.all_category_configs = self.determine_category_configs(category, config, stack) 

50 

51 self.wanted_stacks = self.filter_wanted_stacks() 

52 

53 self.action_stacks = self.filter_actions() 

54 

55 

56 def parse_params(self): 

57 

58 live_add = dict(parameters={}) 

59 

60 for param in self.parameters: 

61 if isinstance(param, str): 

62 param_key, param_val = param.split(":", 1) 

63 live_add["parameters"][param_key] = param_val 

64 elif isinstance(param, dict): 

65 live_add["parameters"][param["Key"]] = param["Value"] 

66 

67 return live_add 

68 

69 

70 

71 def determine_category_configs(self, category, config, stack) -> dict: 

72 

73 category_configs: dict = dict() 

74 

75 if category is not None: 

76 if os.path.isdir(category) is False: 

77 self.logger.error("Unable to Find Cateogry Directory : {}".format(category)) 

78 self.errors["category"] = "Unable to Find Cateogry Directory : {}".format(category) 

79 else: 

80 self.logger.debug("Working in Category: {}".format(category)) 

81 config_file = os.path.join(category, "config.yaml") 

82 

83 with open(config_file) as config_fobj: 

84 category_configs = yaml.safe_load(config_fobj) 

85 

86 elif config is not None: 

87 if os.path.isfile(config) is False: 

88 self.logger.error("Unable to Find Configuration File : {}".format(config)) 

89 self.errors["config"] = "Unable to Find Configuration File : {}".format(config) 

90 else: 

91 self.logger.debug("Categories not in Use in a direct yaml config : {}.".format(config)) 

92 config_file = config 

93 with open(config_file) as config_fobj: 

94 category_configs = yaml.safe_load(config_fobj) 

95 

96 elif stack is not None: 

97 if os.path.isfile(stack) is False: 

98 self.logger.error("Cannot Find Direct Stack Configuration : {}".format(stack)) 

99 self.errors["stack"] = "Cannot Find Direct Stack Configuration : {}".format(stack) 

100 else: 

101 self.logger.debug("Using default configuration for direct stack.") 

102 

103 render_data = dict(filename=stack, 

104 description=self.description, 

105 capabilities=self.capabilities, 

106 parameters= 

107 ["{}:{}".format(k, v) for k, v in self.live_add["parameters"].items()], 

108 tags=self.dynamic_tags) 

109 

110 if len(self.profiles) == 0: 

111 render_data["profiles"] = ["default"] 

112 else: 

113 render_data["profiles"] = self.profiles 

114 

115 if len(self.regions) == 0: 

116 render_data["regions"] = [self._default_region] 

117 else: 

118 render_data["regions"] = self.regions 

119 

120 with importlib.resources.path(cfnStack, "default_stack.yaml.jinja") as stack_template_path: 

121 with open(stack_template_path, "r") as stack_template_fobj: 

122 stack_template_str = stack_template_fobj.read() 

123 

124 config_template = jinja2.Environment(loader=jinja2.BaseLoader, 

125 autoescape=jinja2.select_autoescape( 

126 enabled_extensions=('html', 'xml'), 

127 default_for_string=False 

128 )).from_string(stack_template_str) 

129 

130 config_rendered = config_template.render(**render_data) 

131 

132 self.logger.debug(render_data) 

133 self.logger.debug("Live Rendered: {}".format(config_rendered)) 

134 

135 category_configs = yaml.safe_load(config_rendered) 

136 

137 self.logger.debug("configs: {}".format(category_configs)) 

138 

139 return category_configs 

140 

141 def filter_wanted_stacks(self): 

142 

143 if len(self.stackname) == 0: 

144 # All Stacks 

145 wanted_stacks = list(self.all_category_configs.keys()) 

146 else: 

147 

148 wanted_stacks = [stack for stack in self.stackname if stack in self.all_category_configs.keys()] 

149 

150 missing_stacks = [mstack for mstack in self.stackname if mstack not in self.category_configs.keys()] 

151 

152 if len(missing_stacks) > 0: 

153 self.logger.error( 

154 "Requested Stack(s) {} not requested configured in category {}".format(",".join(missing_stacks), 

155 self.category)) 

156 

157 self.errors["stacks"] = "Missing Stack(s): {}".format(" , ".join(missing_stacks)) 

158 

159 return wanted_stacks 

160 

161 

162 def filter_actions(self) -> list[dict]: 

163 

164 action_tuples = list() 

165 

166 for wstack in self.wanted_stacks: 

167 

168 this_config = self.all_category_configs[wstack] 

169 

170 if self.category_dir is not None: 

171 # Front with Path 

172 this_config_file = os.path.join(self.category_dir, this_config["file"]) 

173 else: 

174 # Assume Path to File Given 

175 this_config_file = os.path.join(this_config["file"]) 

176 

177 with open(this_config_file, "r") as stack_config_file_obj: 

178 stack_config_json = stack_config_file_obj.read() 

179 

180 if len(self.only_profiles) == 0: 

181 

182 wprofiles = self.all_category_configs[wstack]["profiles"] 

183 else: 

184 

185 wprofiles = [prof for prof in self.only_profiles if prof in self.all_category_configs[wstack]["profiles"]] 

186 

187 mprofiles = [prof for prof in self.only_profiles if prof not in self.all_category_configs[wstack]["profiles"]] 

188 

189 if len(mprofiles) > 0: 

190 self.logger.error("{} missing requested profiles {}.".format(wstack, ",".join(mprofiles))) 

191 self.errors["mprofile"] = "{} missing requested profiles {}.".format(wstack, ",".join(mprofiles)) 

192 

193 wregions = this_config.get("regions", [self._default_region]) 

194 

195 if isinstance(wregions, str) and wregions.startswith("all"): 

196 wregions = [wregions] 

197 

198 for wprofile in wprofiles: 

199 

200 self.logger.info("Profiles {}".format(wprofiles)) 

201 self.logger.info("This Profiles {}".format(wprofile)) 

202 

203 this_wregions = wregions 

204 

205 if len(wregions) == 1 and wregions[0].startswith("all"): 

206 # Get and Wrap All Regions 

207 fast_service_name="cloudformation" 

208 

209 if ":" in wregions[0]: 

210 fast_service_name = wregions[0].split(":")[1] 

211 

212 self.logger.warning("Deployment requested to all regions for service: {}".format(fast_service_name)) 

213 fast_session_args = dict() 

214 

215 if wprofile != "default": 

216 fast_session_args["profile_name"] = wprofile 

217 

218 fast_session = boto3.session.Session(**fast_session_args) 

219 this_wregions = fast_session.get_available_regions(service_name=fast_service_name) 

220 self.logger.info("Expanding to {} regions for profile {}".format(len(this_wregions), wprofile)) 

221 self.logger.debug("Wanted Regions for Profile {}, {}".format(wprofile, ", ".join(this_wregions))) 

222 

223 for wregion in this_wregions: 

224 

225 action_tuples.append({ 

226 "stack": wstack, 

227 "stack_cfg": this_config, 

228 "region": wregion, 

229 "profile": wprofile, 

230 "stack_config_json": stack_config_json, 

231 "delete": self.delete 

232 }) 

233 

234 return action_tuples 

235 

236 

237