Module taskcat._cfn.stack_url_helper
Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View Source
"""
Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import json
import logging
import os
from pathlib import Path
from urllib.parse import urlparse
LOG = logging.getLogger(__name__)
class StackURLHelper:
MAX_DEPTH = 20 # Handle at most 20 levels of nesting in TemplateURL expressions
# TODO: Allow user to inject this
# SUBSTITUTION = {
# "QSS3BucketName": "aws-quickstart",
# "QSS3KeyPrefix": "QSS3KeyPrefix/",
# "qss3KeyPrefix": "qss3KeyPrefix/",
# "AWS::Region": "us-east-1",
# "AWS::AccountId": "8888XXXX9999",
# }
SUBSTITUTION = {
"AWS::Region": "us-east-1",
"AWS::URLSuffix": "amazonaws.com",
"AWS::AccountId": "8888XXXX9999",
}
def __init__(
self,
template_mappings=None,
template_parameters=None,
parameter_values=None,
):
if template_mappings:
self.mappings = template_mappings
else:
self.mappings = {}
if template_parameters:
self.template_parameters = template_parameters
else:
self.template_parameters = {}
if parameter_values:
self.parameter_values = parameter_values
else:
self.parameter_values = {}
default_parameters: dict = {}
for parameter in self.template_parameters:
properties = self.template_parameters.get(parameter)
if "Default" in properties.keys():
default_parameters[parameter] = properties["Default"]
self.SUBSTITUTION.update(default_parameters)
self.SUBSTITUTION.update(self.parameter_values)
def rewrite_vars(self, original_string, depth=1):
"""Replace the ${var} placeholders with ##var##"""
parts = original_string.split("${")
parts = parts[1].split("}")
rep_text = "${" + parts[0] + "}"
rep_with = "##" + parts[0] + "##"
result = original_string.replace(rep_text, rep_with)
if len(result.split("${")) > 1:
result = self.rewrite_vars(result, depth=(depth + 1))
return result
def rewrite_sub_vars(self, original_string, depth=1):
"""Replace the '##var##' placeholders with 'var'"""
if "##" not in original_string:
return original_string
parts = original_string.split("##")
parts = parts[1].split("##")
rep_text = "##" + parts[0] + "##"
rep_with = "" + parts[0] + ""
result = original_string.replace(rep_text, rep_with)
if "##" in result: # Recurse if we have more variables
result = self.rewrite_sub_vars(result, depth=(depth + 1))
return result
@staticmethod
def rewrite_sub_vars_with_values(expression, values):
"""Rewrite sub vars with actual variable values"""
result = expression
# replace each key we have a value for
for key in values:
rep_text = "##" + key + "##"
rep_with = "" + str(values[key]) + ""
result = result.replace(rep_text, rep_with)
return result
@staticmethod
def values_to_dict(values):
"""Rewrite sub vars with actual variable values"""
# Create dictionary of values
values_dict_string = values.replace("(", "{")
values_dict_string = values_dict_string.replace(")", "}")
values_dict_string = values_dict_string.replace("'", '"')
# for values or keys not quoted
# Split by :
values_split_string = values_dict_string
# Trim stuff so we can get the key values
values_split_string = values_split_string.replace(" ", "")
values_split_string = values_split_string.replace("{", "")
values_split_string = values_split_string.replace("}", "")
values_split = values_split_string.split(",")
values_split_final = []
for value in values_split:
values = value.split(":")
values_split_final.extend(values)
for value in values_split_final:
if value[0] != "'" and value[-1] != "'":
if value[0] != '"' and value[-1] != '"':
values_dict_string = values_dict_string.replace(
value, '"' + value + '"'
)
values_dict = json.loads(values_dict_string)
return values_dict
def evaluate_fn_sub(self, expression):
"""Return expression with values replaced"""
results = []
# Builtins - Fudge some defaults here since we don't have runtime info
# ${AWS::Region} ${AWS::AccountId}
expression = self.rewrite_sub_vars_with_values(expression, self.SUBSTITUTION)
# Handle Sub of form [ StringToSub, { "key" : "value", "key": "value" }]
if "[" in expression:
temp_expression = expression.split("[")[1].split(",")[0]
values = expression.split("[")[1].split("(")[1].split(")")[0]
values = self.values_to_dict("(" + values + ")")
temp_expression = self.rewrite_sub_vars_with_values(temp_expression, values)
else:
temp_expression = expression.split("': '")[1].split("'")[0]
# if we still have them we just use their values (ie: Parameters)
result = self.rewrite_sub_vars(temp_expression)
results.append(result)
return results
@staticmethod
def evaluate_fn_join(expression):
"""Return the joined stuff"""
results = []
new_values_list = []
temp = expression.split("[")[1]
delimiter = temp.split(",")[0].strip("'")
values = expression.split("[")[2]
values = values.split("]]")[0]
values_list = values.split(", ")
for value in values_list:
new_values_list.append(value.strip("'"))
result = delimiter.join(new_values_list)
results.append(result)
return results
@staticmethod
def evaluate_fn_if(expression):
"""Return both possible parts of the expression"""
results = []
value_true = expression.split(",")[1].strip()
value_false = expression.split(",")[2].strip().strip("]")
# if we don't have '' this can break things
results.append("'" + value_true.strip("'") + "'")
results.append("'" + value_false.strip("'") + "'")
return results
def evaluate_fn_ref(self, expression):
"""Since this is runtime data the best we can do is the name in place"""
# TODO: Allow user to inject RunTime values for these
results = []
temp = expression.split(": ")[1]
# pylint: disable=consider-iterating-dictionary
if temp.strip("'") in self.SUBSTITUTION.keys():
temp = self.SUBSTITUTION[temp.strip("'")]
temp = "'" + temp + "'"
results.append(temp)
return results
def find_in_map_lookup(self, mappings_map, first_key, final_key):
step1 = self.mappings[mappings_map.strip("'")]
step2 = step1[first_key.strip("'")]
result = step2[final_key.strip("'")]
return result
def evaluate_fn_findinmap(self, expression):
result = []
mappings_map = expression.split("[")[1].split("]")[0].split(",")[0].strip()
first_key = expression.split("[")[1].split("]")[0].split(",")[1].strip()
final_key = expression.split("[")[1].split("]")[0].split(",")[2].strip()
result.append(
"'" + self.find_in_map_lookup(mappings_map, first_key, final_key) + "'"
)
return result
@staticmethod
def evaluate_fn_getatt(expression):
raise Exception("Fn::GetAtt: not supported")
@staticmethod
def evaluate_fn_split(expression):
raise Exception("Fn::Split: not supported")
def evaluate_expression_controller(self, expression):
"""Figure out what type of expression and pass off to handler"""
results = []
if "Fn::If" in expression:
results = self.evaluate_fn_if(expression)
elif "Fn::Sub" in expression:
results = self.evaluate_fn_sub(expression)
elif "Fn::Join" in expression:
results = self.evaluate_fn_join(expression)
elif "Ref" in expression:
results = self.evaluate_fn_ref(expression)
elif "Fn::FindInMap" in expression:
results = self.evaluate_fn_findinmap(expression)
elif "Fn::GetAtt" in expression:
results = self.evaluate_fn_getatt(expression)
elif "Fn::Split" in expression:
results = self.evaluate_fn_split(expression)
else:
# This is a NON expression repl { and } with ( and ) to break recursion
results.append("(" + expression + ")")
return results
def evaluate_string(self, template_url, depth=0):
"""Recursively find expressions in the URL and send them to be evaluated"""
# Recursion bail out
if depth > self.MAX_DEPTH:
raise Exception(
"Template URL contains more than {} levels or nesting".format(
self.MAX_DEPTH
)
)
template_urls = []
# Evaluate expressions
if "{" in template_url:
parts = template_url.split("{")
parts = parts[-1].split("}") # Last open bracket
# This function will handle Fn::Sub Fn::If etc.
replacements = self.evaluate_expression_controller(
parts[0]
) # First closed bracket after
for replacement in replacements:
template_url_temp = template_url
template_url_temp = template_url_temp.replace(
"{" + parts[0] + "}", replacement
)
evaluated_strings = self.evaluate_string(
template_url_temp, depth=(depth + 1)
)
for evaluated_string in evaluated_strings:
template_urls.append(evaluated_string)
else:
template_urls.append(template_url)
return template_urls
def _flatten_template_controller(self, template_url):
"""Recursively evaluate subs/ifs"""
url_list = []
# Replace ${SOMEVAR} with ##SOMEVAR## so finding actual "expressions" is easier
template_url_string = str(template_url)
parts = template_url_string.split("${")
if len(parts) > 1:
template_url_string = self.rewrite_vars(template_url_string)
# Evaluate expressions recursively
if "{" in template_url_string:
replacements = self.evaluate_string(
template_url_string
) # first closed bracket
for replacement in replacements:
url_list.append(replacement)
else:
url_list.append(template_url)
return url_list
def flatten_template_url(self, template_url):
"""Flatten template_url and return all permutations"""
path_list = []
url_list = self._flatten_template_controller(template_url)
# Extract the path portion from the URL
for url in url_list:
# TODO: figure where the ' is coming from
output = urlparse(str(url.strip("'")))
path_list.append(output.path)
path_list = list(dict.fromkeys(path_list))
# print(url_list)
# print(path_list)
return path_list
@staticmethod
def _remove_one_level(path_string):
result = path_string
result = result.find("/", 0)
result = path_string[result + 1 : len(path_string)]
return result
def find_local_child_template(self, parent_template_path, child_template_path):
final_template_path = ""
# Start where the Parent template is
project_root = Path(os.path.dirname(parent_template_path))
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
# Take the path piece by piece and try in current folder
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
# Take the path piece by piece and try in one folder up folder
project_root = Path(
os.path.normpath(os.path.dirname(parent_template_path) + "/../")
)
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
return ""
def template_url_to_path(
self,
current_template_path,
template_url,
):
child_local_paths = []
child_template_paths = self.flatten_template_url(template_url)
# TODO: Add logic to try for S3 paths
for child_template_path in child_template_paths:
child_local_paths.append(
self.find_local_child_template(
current_template_path, child_template_path
)
)
return child_local_paths
Variables
LOG
Classes
StackURLHelper
class StackURLHelper(
template_mappings=None,
template_parameters=None,
parameter_values=None
)
View Source
class StackURLHelper:
MAX_DEPTH = 20 # Handle at most 20 levels of nesting in TemplateURL expressions
# TODO: Allow user to inject this
# SUBSTITUTION = {
# "QSS3BucketName": "aws-quickstart",
# "QSS3KeyPrefix": "QSS3KeyPrefix/",
# "qss3KeyPrefix": "qss3KeyPrefix/",
# "AWS::Region": "us-east-1",
# "AWS::AccountId": "8888XXXX9999",
# }
SUBSTITUTION = {
"AWS::Region": "us-east-1",
"AWS::URLSuffix": "amazonaws.com",
"AWS::AccountId": "8888XXXX9999",
}
def __init__(
self,
template_mappings=None,
template_parameters=None,
parameter_values=None,
):
if template_mappings:
self.mappings = template_mappings
else:
self.mappings = {}
if template_parameters:
self.template_parameters = template_parameters
else:
self.template_parameters = {}
if parameter_values:
self.parameter_values = parameter_values
else:
self.parameter_values = {}
default_parameters: dict = {}
for parameter in self.template_parameters:
properties = self.template_parameters.get(parameter)
if "Default" in properties.keys():
default_parameters[parameter] = properties["Default"]
self.SUBSTITUTION.update(default_parameters)
self.SUBSTITUTION.update(self.parameter_values)
def rewrite_vars(self, original_string, depth=1):
"""Replace the ${var} placeholders with ##var##"""
parts = original_string.split("${")
parts = parts[1].split("}")
rep_text = "${" + parts[0] + "}"
rep_with = "##" + parts[0] + "##"
result = original_string.replace(rep_text, rep_with)
if len(result.split("${")) > 1:
result = self.rewrite_vars(result, depth=(depth + 1))
return result
def rewrite_sub_vars(self, original_string, depth=1):
"""Replace the '##var##' placeholders with 'var'"""
if "##" not in original_string:
return original_string
parts = original_string.split("##")
parts = parts[1].split("##")
rep_text = "##" + parts[0] + "##"
rep_with = "" + parts[0] + ""
result = original_string.replace(rep_text, rep_with)
if "##" in result: # Recurse if we have more variables
result = self.rewrite_sub_vars(result, depth=(depth + 1))
return result
@staticmethod
def rewrite_sub_vars_with_values(expression, values):
"""Rewrite sub vars with actual variable values"""
result = expression
# replace each key we have a value for
for key in values:
rep_text = "##" + key + "##"
rep_with = "" + str(values[key]) + ""
result = result.replace(rep_text, rep_with)
return result
@staticmethod
def values_to_dict(values):
"""Rewrite sub vars with actual variable values"""
# Create dictionary of values
values_dict_string = values.replace("(", "{")
values_dict_string = values_dict_string.replace(")", "}")
values_dict_string = values_dict_string.replace("'", '"')
# for values or keys not quoted
# Split by :
values_split_string = values_dict_string
# Trim stuff so we can get the key values
values_split_string = values_split_string.replace(" ", "")
values_split_string = values_split_string.replace("{", "")
values_split_string = values_split_string.replace("}", "")
values_split = values_split_string.split(",")
values_split_final = []
for value in values_split:
values = value.split(":")
values_split_final.extend(values)
for value in values_split_final:
if value[0] != "'" and value[-1] != "'":
if value[0] != '"' and value[-1] != '"':
values_dict_string = values_dict_string.replace(
value, '"' + value + '"'
)
values_dict = json.loads(values_dict_string)
return values_dict
def evaluate_fn_sub(self, expression):
"""Return expression with values replaced"""
results = []
# Builtins - Fudge some defaults here since we don't have runtime info
# ${AWS::Region} ${AWS::AccountId}
expression = self.rewrite_sub_vars_with_values(expression, self.SUBSTITUTION)
# Handle Sub of form [ StringToSub, { "key" : "value", "key": "value" }]
if "[" in expression:
temp_expression = expression.split("[")[1].split(",")[0]
values = expression.split("[")[1].split("(")[1].split(")")[0]
values = self.values_to_dict("(" + values + ")")
temp_expression = self.rewrite_sub_vars_with_values(temp_expression, values)
else:
temp_expression = expression.split("': '")[1].split("'")[0]
# if we still have them we just use their values (ie: Parameters)
result = self.rewrite_sub_vars(temp_expression)
results.append(result)
return results
@staticmethod
def evaluate_fn_join(expression):
"""Return the joined stuff"""
results = []
new_values_list = []
temp = expression.split("[")[1]
delimiter = temp.split(",")[0].strip("'")
values = expression.split("[")[2]
values = values.split("]]")[0]
values_list = values.split(", ")
for value in values_list:
new_values_list.append(value.strip("'"))
result = delimiter.join(new_values_list)
results.append(result)
return results
@staticmethod
def evaluate_fn_if(expression):
"""Return both possible parts of the expression"""
results = []
value_true = expression.split(",")[1].strip()
value_false = expression.split(",")[2].strip().strip("]")
# if we don't have '' this can break things
results.append("'" + value_true.strip("'") + "'")
results.append("'" + value_false.strip("'") + "'")
return results
def evaluate_fn_ref(self, expression):
"""Since this is runtime data the best we can do is the name in place"""
# TODO: Allow user to inject RunTime values for these
results = []
temp = expression.split(": ")[1]
# pylint: disable=consider-iterating-dictionary
if temp.strip("'") in self.SUBSTITUTION.keys():
temp = self.SUBSTITUTION[temp.strip("'")]
temp = "'" + temp + "'"
results.append(temp)
return results
def find_in_map_lookup(self, mappings_map, first_key, final_key):
step1 = self.mappings[mappings_map.strip("'")]
step2 = step1[first_key.strip("'")]
result = step2[final_key.strip("'")]
return result
def evaluate_fn_findinmap(self, expression):
result = []
mappings_map = expression.split("[")[1].split("]")[0].split(",")[0].strip()
first_key = expression.split("[")[1].split("]")[0].split(",")[1].strip()
final_key = expression.split("[")[1].split("]")[0].split(",")[2].strip()
result.append(
"'" + self.find_in_map_lookup(mappings_map, first_key, final_key) + "'"
)
return result
@staticmethod
def evaluate_fn_getatt(expression):
raise Exception("Fn::GetAtt: not supported")
@staticmethod
def evaluate_fn_split(expression):
raise Exception("Fn::Split: not supported")
def evaluate_expression_controller(self, expression):
"""Figure out what type of expression and pass off to handler"""
results = []
if "Fn::If" in expression:
results = self.evaluate_fn_if(expression)
elif "Fn::Sub" in expression:
results = self.evaluate_fn_sub(expression)
elif "Fn::Join" in expression:
results = self.evaluate_fn_join(expression)
elif "Ref" in expression:
results = self.evaluate_fn_ref(expression)
elif "Fn::FindInMap" in expression:
results = self.evaluate_fn_findinmap(expression)
elif "Fn::GetAtt" in expression:
results = self.evaluate_fn_getatt(expression)
elif "Fn::Split" in expression:
results = self.evaluate_fn_split(expression)
else:
# This is a NON expression repl { and } with ( and ) to break recursion
results.append("(" + expression + ")")
return results
def evaluate_string(self, template_url, depth=0):
"""Recursively find expressions in the URL and send them to be evaluated"""
# Recursion bail out
if depth > self.MAX_DEPTH:
raise Exception(
"Template URL contains more than {} levels or nesting".format(
self.MAX_DEPTH
)
)
template_urls = []
# Evaluate expressions
if "{" in template_url:
parts = template_url.split("{")
parts = parts[-1].split("}") # Last open bracket
# This function will handle Fn::Sub Fn::If etc.
replacements = self.evaluate_expression_controller(
parts[0]
) # First closed bracket after
for replacement in replacements:
template_url_temp = template_url
template_url_temp = template_url_temp.replace(
"{" + parts[0] + "}", replacement
)
evaluated_strings = self.evaluate_string(
template_url_temp, depth=(depth + 1)
)
for evaluated_string in evaluated_strings:
template_urls.append(evaluated_string)
else:
template_urls.append(template_url)
return template_urls
def _flatten_template_controller(self, template_url):
"""Recursively evaluate subs/ifs"""
url_list = []
# Replace ${SOMEVAR} with ##SOMEVAR## so finding actual "expressions" is easier
template_url_string = str(template_url)
parts = template_url_string.split("${")
if len(parts) > 1:
template_url_string = self.rewrite_vars(template_url_string)
# Evaluate expressions recursively
if "{" in template_url_string:
replacements = self.evaluate_string(
template_url_string
) # first closed bracket
for replacement in replacements:
url_list.append(replacement)
else:
url_list.append(template_url)
return url_list
def flatten_template_url(self, template_url):
"""Flatten template_url and return all permutations"""
path_list = []
url_list = self._flatten_template_controller(template_url)
# Extract the path portion from the URL
for url in url_list:
# TODO: figure where the ' is coming from
output = urlparse(str(url.strip("'")))
path_list.append(output.path)
path_list = list(dict.fromkeys(path_list))
# print(url_list)
# print(path_list)
return path_list
@staticmethod
def _remove_one_level(path_string):
result = path_string
result = result.find("/", 0)
result = path_string[result + 1 : len(path_string)]
return result
def find_local_child_template(self, parent_template_path, child_template_path):
final_template_path = ""
# Start where the Parent template is
project_root = Path(os.path.dirname(parent_template_path))
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
# Take the path piece by piece and try in current folder
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
# Take the path piece by piece and try in one folder up folder
project_root = Path(
os.path.normpath(os.path.dirname(parent_template_path) + "/../")
)
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
return ""
def template_url_to_path(
self,
current_template_path,
template_url,
):
child_local_paths = []
child_template_paths = self.flatten_template_url(template_url)
# TODO: Add logic to try for S3 paths
for child_template_path in child_template_paths:
child_local_paths.append(
self.find_local_child_template(
current_template_path, child_template_path
)
)
return child_local_paths
Class variables
MAX_DEPTH
SUBSTITUTION
Static methods
evaluate_fn_getatt
def evaluate_fn_getatt(
expression
)
View Source
@staticmethod
def evaluate_fn_getatt(expression):
raise Exception("Fn::GetAtt: not supported")
evaluate_fn_if
def evaluate_fn_if(
expression
)
Return both possible parts of the expression
View Source
@staticmethod
def evaluate_fn_if(expression):
"""Return both possible parts of the expression"""
results = []
value_true = expression.split(",")[1].strip()
value_false = expression.split(",")[2].strip().strip("]")
# if we don't have '' this can break things
results.append("'" + value_true.strip("'") + "'")
results.append("'" + value_false.strip("'") + "'")
return results
evaluate_fn_join
def evaluate_fn_join(
expression
)
Return the joined stuff
View Source
@staticmethod
def evaluate_fn_join(expression):
"""Return the joined stuff"""
results = []
new_values_list = []
temp = expression.split("[")[1]
delimiter = temp.split(",")[0].strip("'")
values = expression.split("[")[2]
values = values.split("]]")[0]
values_list = values.split(", ")
for value in values_list:
new_values_list.append(value.strip("'"))
result = delimiter.join(new_values_list)
results.append(result)
return results
evaluate_fn_split
def evaluate_fn_split(
expression
)
View Source
@staticmethod
def evaluate_fn_split(expression):
raise Exception("Fn::Split: not supported")
rewrite_sub_vars_with_values
def rewrite_sub_vars_with_values(
expression,
values
)
Rewrite sub vars with actual variable values
View Source
@staticmethod
def rewrite_sub_vars_with_values(expression, values):
"""Rewrite sub vars with actual variable values"""
result = expression
# replace each key we have a value for
for key in values:
rep_text = "##" + key + "##"
rep_with = "" + str(values[key]) + ""
result = result.replace(rep_text, rep_with)
return result
values_to_dict
def values_to_dict(
values
)
Rewrite sub vars with actual variable values
View Source
@staticmethod
def values_to_dict(values):
"""Rewrite sub vars with actual variable values"""
# Create dictionary of values
values_dict_string = values.replace("(", "{")
values_dict_string = values_dict_string.replace(")", "}")
values_dict_string = values_dict_string.replace("'", '"')
# for values or keys not quoted
# Split by :
values_split_string = values_dict_string
# Trim stuff so we can get the key values
values_split_string = values_split_string.replace(" ", "")
values_split_string = values_split_string.replace("{", "")
values_split_string = values_split_string.replace("}", "")
values_split = values_split_string.split(",")
values_split_final = []
for value in values_split:
values = value.split(":")
values_split_final.extend(values)
for value in values_split_final:
if value[0] != "'" and value[-1] != "'":
if value[0] != '"' and value[-1] != '"':
values_dict_string = values_dict_string.replace(
value, '"' + value + '"'
)
values_dict = json.loads(values_dict_string)
return values_dict
Methods
evaluate_expression_controller
def evaluate_expression_controller(
self,
expression
)
Figure out what type of expression and pass off to handler
View Source
def evaluate_expression_controller(self, expression):
"""Figure out what type of expression and pass off to handler"""
results = []
if "Fn::If" in expression:
results = self.evaluate_fn_if(expression)
elif "Fn::Sub" in expression:
results = self.evaluate_fn_sub(expression)
elif "Fn::Join" in expression:
results = self.evaluate_fn_join(expression)
elif "Ref" in expression:
results = self.evaluate_fn_ref(expression)
elif "Fn::FindInMap" in expression:
results = self.evaluate_fn_findinmap(expression)
elif "Fn::GetAtt" in expression:
results = self.evaluate_fn_getatt(expression)
elif "Fn::Split" in expression:
results = self.evaluate_fn_split(expression)
else:
# This is a NON expression repl { and } with ( and ) to break recursion
results.append("(" + expression + ")")
return results
evaluate_fn_findinmap
def evaluate_fn_findinmap(
self,
expression
)
View Source
def evaluate_fn_findinmap(self, expression):
result = []
mappings_map = expression.split("[")[1].split("]")[0].split(",")[0].strip()
first_key = expression.split("[")[1].split("]")[0].split(",")[1].strip()
final_key = expression.split("[")[1].split("]")[0].split(",")[2].strip()
result.append(
"'" + self.find_in_map_lookup(mappings_map, first_key, final_key) + "'"
)
return result
evaluate_fn_ref
def evaluate_fn_ref(
self,
expression
)
Since this is runtime data the best we can do is the name in place
View Source
def evaluate_fn_ref(self, expression):
"""Since this is runtime data the best we can do is the name in place"""
# TODO: Allow user to inject RunTime values for these
results = []
temp = expression.split(": ")[1]
# pylint: disable=consider-iterating-dictionary
if temp.strip("'") in self.SUBSTITUTION.keys():
temp = self.SUBSTITUTION[temp.strip("'")]
temp = "'" + temp + "'"
results.append(temp)
return results
evaluate_fn_sub
def evaluate_fn_sub(
self,
expression
)
Return expression with values replaced
View Source
def evaluate_fn_sub(self, expression):
"""Return expression with values replaced"""
results = []
# Builtins - Fudge some defaults here since we don't have runtime info
# ${AWS::Region} ${AWS::AccountId}
expression = self.rewrite_sub_vars_with_values(expression, self.SUBSTITUTION)
# Handle Sub of form [ StringToSub, { "key" : "value", "key": "value" }]
if "[" in expression:
temp_expression = expression.split("[")[1].split(",")[0]
values = expression.split("[")[1].split("(")[1].split(")")[0]
values = self.values_to_dict("(" + values + ")")
temp_expression = self.rewrite_sub_vars_with_values(temp_expression, values)
else:
temp_expression = expression.split("': '")[1].split("'")[0]
# if we still have them we just use their values (ie: Parameters)
result = self.rewrite_sub_vars(temp_expression)
results.append(result)
return results
evaluate_string
def evaluate_string(
self,
template_url,
depth=0
)
Recursively find expressions in the URL and send them to be evaluated
View Source
def evaluate_string(self, template_url, depth=0):
"""Recursively find expressions in the URL and send them to be evaluated"""
# Recursion bail out
if depth > self.MAX_DEPTH:
raise Exception(
"Template URL contains more than {} levels or nesting".format(
self.MAX_DEPTH
)
)
template_urls = []
# Evaluate expressions
if "{" in template_url:
parts = template_url.split("{")
parts = parts[-1].split("}") # Last open bracket
# This function will handle Fn::Sub Fn::If etc.
replacements = self.evaluate_expression_controller(
parts[0]
) # First closed bracket after
for replacement in replacements:
template_url_temp = template_url
template_url_temp = template_url_temp.replace(
"{" + parts[0] + "}", replacement
)
evaluated_strings = self.evaluate_string(
template_url_temp, depth=(depth + 1)
)
for evaluated_string in evaluated_strings:
template_urls.append(evaluated_string)
else:
template_urls.append(template_url)
return template_urls
find_in_map_lookup
def find_in_map_lookup(
self,
mappings_map,
first_key,
final_key
)
View Source
def find_in_map_lookup(self, mappings_map, first_key, final_key):
step1 = self.mappings[mappings_map.strip("'")]
step2 = step1[first_key.strip("'")]
result = step2[final_key.strip("'")]
return result
find_local_child_template
def find_local_child_template(
self,
parent_template_path,
child_template_path
)
View Source
def find_local_child_template(self, parent_template_path, child_template_path):
final_template_path = ""
# Start where the Parent template is
project_root = Path(os.path.dirname(parent_template_path))
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
# Take the path piece by piece and try in current folder
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
# Take the path piece by piece and try in one folder up folder
project_root = Path(
os.path.normpath(os.path.dirname(parent_template_path) + "/../")
)
# Get rid of any "//"
child_template_path_tmp = os.path.normpath(child_template_path)
while "/" in str(child_template_path_tmp):
child_template_path_tmp = self._remove_one_level(child_template_path_tmp)
final_template_path = Path(
"/".join([str(project_root), str(child_template_path_tmp)])
)
if final_template_path.exists() and final_template_path.is_file():
return str(final_template_path)
return ""
flatten_template_url
def flatten_template_url(
self,
template_url
)
Flatten template_url and return all permutations
View Source
def flatten_template_url(self, template_url):
"""Flatten template_url and return all permutations"""
path_list = []
url_list = self._flatten_template_controller(template_url)
# Extract the path portion from the URL
for url in url_list:
# TODO: figure where the ' is coming from
output = urlparse(str(url.strip("'")))
path_list.append(output.path)
path_list = list(dict.fromkeys(path_list))
# print(url_list)
# print(path_list)
return path_list
rewrite_sub_vars
def rewrite_sub_vars(
self,
original_string,
depth=1
)
Replace the '##var##' placeholders with 'var'
View Source
def rewrite_sub_vars(self, original_string, depth=1):
"""Replace the '##var##' placeholders with 'var'"""
if "##" not in original_string:
return original_string
parts = original_string.split("##")
parts = parts[1].split("##")
rep_text = "##" + parts[0] + "##"
rep_with = "" + parts[0] + ""
result = original_string.replace(rep_text, rep_with)
if "##" in result: # Recurse if we have more variables
result = self.rewrite_sub_vars(result, depth=(depth + 1))
return result
rewrite_vars
def rewrite_vars(
self,
original_string,
depth=1
)
Replace the ${var} placeholders with ##var##
View Source
def rewrite_vars(self, original_string, depth=1):
"""Replace the ${var} placeholders with ##var##"""
parts = original_string.split("${")
parts = parts[1].split("}")
rep_text = "${" + parts[0] + "}"
rep_with = "##" + parts[0] + "##"
result = original_string.replace(rep_text, rep_with)
if len(result.split("${")) > 1:
result = self.rewrite_vars(result, depth=(depth + 1))
return result
template_url_to_path
def template_url_to_path(
self,
current_template_path,
template_url
)
View Source
def template_url_to_path(
self,
current_template_path,
template_url,
):
child_local_paths = []
child_template_paths = self.flatten_template_url(template_url)
# TODO: Add logic to try for S3 paths
for child_template_path in child_template_paths:
child_local_paths.append(
self.find_local_child_template(
current_template_path, child_template_path
)
)
return child_local_paths