Module taskcat._cfn.template
None
None
View Source
import logging
import re
from pathlib import Path
from time import sleep
from typing import Dict, List, Union
from yaml.scanner import ScannerError
import cfnlint
from taskcat._cfn.stack_url_helper import StackURLHelper
from taskcat.exceptions import TaskCatException
LOG = logging.getLogger(__name__)
class TemplateCache:
def __init__(self, store: dict = None):
self._templates = store if store else {}
self._lock: Dict[str, bool] = {}
def get(self, template_path: str) -> cfnlint.Template:
while self._lock.get(template_path):
sleep(0.1)
if template_path not in self._templates:
try:
self._lock[template_path] = True
try:
self._templates[template_path] = cfnlint.decode.cfn_yaml.load(
template_path
)
except ScannerError as e:
LOG.error(
f"Failed to parse template {template_path} {e.problem} at "
f"{e.problem_mark}"
)
raise
self._lock[template_path] = False
except Exception: # pylint: disable=broad-except
self._lock[template_path] = False
raise
return self._templates[template_path]
template_cache_store: Dict[str, cfnlint.Template] = {}
tcat_template_cache = TemplateCache(template_cache_store) # pylint: disable=C0103
class Template:
def __init__(
self,
template_path: Union[str, Path],
project_root: Union[str, Path] = "",
url: str = "",
s3_key_prefix: str = "",
template_cache: TemplateCache = tcat_template_cache,
):
self.template_cache = template_cache
self.template_path: Path = Path(template_path).expanduser().resolve()
self.template = self.template_cache.get(str(self.template_path))
with open(template_path, "r", encoding="utf-8") as file_handle:
self.raw_template = file_handle.read()
project_root = (
project_root if project_root else self.template_path.parent.parent
)
self.project_root = Path(project_root).expanduser().resolve()
self.url = url
self._s3_key_prefix = s3_key_prefix
self.children: List[Template] = []
self._find_children()
def __str__(self):
return str(self.template)
def __repr__(self):
return f"<Template {self.template_path} at {hex(id(self))}>"
@property
def s3_key(self):
suffix = str(self.template_path.relative_to(self.project_root).as_posix())
return self._s3_key_prefix + suffix
@property
def s3_key_prefix(self):
return self._s3_key_prefix
@property
def linesplit(self):
return self.raw_template.split("\n")
def write(self):
"""writes raw_template back to file, and reloads decoded template, useful if
the template has been modified"""
with open(str(self.template_path), "w", encoding="utf-8") as file_handle:
file_handle.write(self.raw_template)
self.template = cfnlint.decode.cfn_yaml.load(self.template_path)
self._find_children()
def _template_url_to_path(
self,
template_url,
template_mappings=None,
):
try:
helper = StackURLHelper(
template_mappings=template_mappings,
template_parameters=self.template.get("Parameters"),
)
urls = helper.template_url_to_path(
current_template_path=self.template_path, template_url=template_url
)
if len(urls) > 0:
return urls[0]
except Exception as e: # pylint: disable=broad-except
LOG.debug("Traceback:", exc_info=True)
LOG.error("TemplateURL parsing error: %s " % str(e))
LOG.warning(
"Failed to discover path for %s, path %s does not exist",
template_url,
None,
)
return ""
def _get_relative_url(self, path: str) -> str:
suffix = str(path).replace(str(self.project_root), "")
url = self.url_prefix() + suffix
return url
def url_prefix(self) -> str:
if not self.url:
return ""
regionless_url = re.sub(
r"\.s3\.(.*)\.amazonaws\.com",
".s3.amazonaws.com",
self.url,
)
suffix = str(self.template_path).replace(str(self.project_root), "")
suffix_length = len(suffix.lstrip("/").split("/"))
url_prefix = "/".join(regionless_url.split("/")[0:-suffix_length])
return url_prefix
def _find_children(self) -> None: # noqa: C901
children = set()
if "Resources" not in self.template:
raise TaskCatException(
f"did not receive a valid template: {self.template_path} does not "
f"have a Resources section"
)
for resource in self.template["Resources"].keys():
resource = self.template["Resources"][resource]
if resource["Type"] == "AWS::CloudFormation::Stack":
child_name = self._template_url_to_path(
template_url=resource["Properties"]["TemplateURL"],
)
# print(child_name)
if child_name:
# for child_url in child_name:
children.add(child_name)
for child in children:
child_template_instance = None
for descendent in self.descendents:
if str(descendent.template_path) == str(child):
child_template_instance = descendent
if not child_template_instance:
try:
child_template_instance = Template(
child,
self.project_root,
self._get_relative_url(child),
self._s3_key_prefix,
tcat_template_cache,
)
except Exception: # pylint: disable=broad-except
LOG.debug("Traceback:", exc_info=True)
LOG.error(f"Failed to add child template {child}")
if isinstance(child_template_instance, Template):
self.children.append(child_template_instance)
@property
def descendents(self) -> List["Template"]:
desc_map = {}
def recurse(template):
for child in template.children:
desc_map[str(child.template_path)] = child
recurse(child)
recurse(self)
return list(desc_map.values())
def parameters(
self,
) -> Dict[str, Union[None, str, int, bool, List[Union[int, str]]]]:
parameters = {}
for param_key, param in self.template.get("Parameters", {}).items():
parameters[param_key] = param.get("Default")
return parameters
Variables
LOG
tcat_template_cache
template_cache_store
Classes
Template
class Template(
template_path: Union[str, pathlib.Path],
project_root: Union[str, pathlib.Path] = '',
url: str = '',
s3_key_prefix: str = '',
template_cache: taskcat._cfn.template.TemplateCache = <taskcat._cfn.template.TemplateCache object at 0x7fc5662911c0>
)
View Source
class Template:
def __init__(
self,
template_path: Union[str, Path],
project_root: Union[str, Path] = "",
url: str = "",
s3_key_prefix: str = "",
template_cache: TemplateCache = tcat_template_cache,
):
self.template_cache = template_cache
self.template_path: Path = Path(template_path).expanduser().resolve()
self.template = self.template_cache.get(str(self.template_path))
with open(template_path, "r", encoding="utf-8") as file_handle:
self.raw_template = file_handle.read()
project_root = (
project_root if project_root else self.template_path.parent.parent
)
self.project_root = Path(project_root).expanduser().resolve()
self.url = url
self._s3_key_prefix = s3_key_prefix
self.children: List[Template] = []
self._find_children()
def __str__(self):
return str(self.template)
def __repr__(self):
return f"<Template {self.template_path} at {hex(id(self))}>"
@property
def s3_key(self):
suffix = str(self.template_path.relative_to(self.project_root).as_posix())
return self._s3_key_prefix + suffix
@property
def s3_key_prefix(self):
return self._s3_key_prefix
@property
def linesplit(self):
return self.raw_template.split("\n")
def write(self):
"""writes raw_template back to file, and reloads decoded template, useful if
the template has been modified"""
with open(str(self.template_path), "w", encoding="utf-8") as file_handle:
file_handle.write(self.raw_template)
self.template = cfnlint.decode.cfn_yaml.load(self.template_path)
self._find_children()
def _template_url_to_path(
self,
template_url,
template_mappings=None,
):
try:
helper = StackURLHelper(
template_mappings=template_mappings,
template_parameters=self.template.get("Parameters"),
)
urls = helper.template_url_to_path(
current_template_path=self.template_path, template_url=template_url
)
if len(urls) > 0:
return urls[0]
except Exception as e: # pylint: disable=broad-except
LOG.debug("Traceback:", exc_info=True)
LOG.error("TemplateURL parsing error: %s " % str(e))
LOG.warning(
"Failed to discover path for %s, path %s does not exist",
template_url,
None,
)
return ""
def _get_relative_url(self, path: str) -> str:
suffix = str(path).replace(str(self.project_root), "")
url = self.url_prefix() + suffix
return url
def url_prefix(self) -> str:
if not self.url:
return ""
regionless_url = re.sub(
r"\.s3\.(.*)\.amazonaws\.com",
".s3.amazonaws.com",
self.url,
)
suffix = str(self.template_path).replace(str(self.project_root), "")
suffix_length = len(suffix.lstrip("/").split("/"))
url_prefix = "/".join(regionless_url.split("/")[0:-suffix_length])
return url_prefix
def _find_children(self) -> None: # noqa: C901
children = set()
if "Resources" not in self.template:
raise TaskCatException(
f"did not receive a valid template: {self.template_path} does not "
f"have a Resources section"
)
for resource in self.template["Resources"].keys():
resource = self.template["Resources"][resource]
if resource["Type"] == "AWS::CloudFormation::Stack":
child_name = self._template_url_to_path(
template_url=resource["Properties"]["TemplateURL"],
)
# print(child_name)
if child_name:
# for child_url in child_name:
children.add(child_name)
for child in children:
child_template_instance = None
for descendent in self.descendents:
if str(descendent.template_path) == str(child):
child_template_instance = descendent
if not child_template_instance:
try:
child_template_instance = Template(
child,
self.project_root,
self._get_relative_url(child),
self._s3_key_prefix,
tcat_template_cache,
)
except Exception: # pylint: disable=broad-except
LOG.debug("Traceback:", exc_info=True)
LOG.error(f"Failed to add child template {child}")
if isinstance(child_template_instance, Template):
self.children.append(child_template_instance)
@property
def descendents(self) -> List["Template"]:
desc_map = {}
def recurse(template):
for child in template.children:
desc_map[str(child.template_path)] = child
recurse(child)
recurse(self)
return list(desc_map.values())
def parameters(
self,
) -> Dict[str, Union[None, str, int, bool, List[Union[int, str]]]]:
parameters = {}
for param_key, param in self.template.get("Parameters", {}).items():
parameters[param_key] = param.get("Default")
return parameters
Instance variables
descendents
linesplit
s3_key
s3_key_prefix
Methods
parameters
def parameters(
self
) -> Dict[str, Union[NoneType, str, int, bool, List[Union[str, int]]]]
View Source
def parameters(
self,
) -> Dict[str, Union[None, str, int, bool, List[Union[int, str]]]]:
parameters = {}
for param_key, param in self.template.get("Parameters", {}).items():
parameters[param_key] = param.get("Default")
return parameters
url_prefix
def url_prefix(
self
) -> str
View Source
def url_prefix(self) -> str:
if not self.url:
return ""
regionless_url = re.sub(
r"\.s3\.(.*)\.amazonaws\.com",
".s3.amazonaws.com",
self.url,
)
suffix = str(self.template_path).replace(str(self.project_root), "")
suffix_length = len(suffix.lstrip("/").split("/"))
url_prefix = "/".join(regionless_url.split("/")[0:-suffix_length])
return url_prefix
write
def write(
self
)
writes raw_template back to file, and reloads decoded template, useful if
the template has been modified
View Source
def write(self):
"""writes raw_template back to file, and reloads decoded template, useful if
the template has been modified"""
with open(str(self.template_path), "w", encoding="utf-8") as file_handle:
file_handle.write(self.raw_template)
self.template = cfnlint.decode.cfn_yaml.load(self.template_path)
self._find_children()
TemplateCache
class TemplateCache(
store: dict = None
)
View Source
class TemplateCache:
def __init__(self, store: dict = None):
self._templates = store if store else {}
self._lock: Dict[str, bool] = {}
def get(self, template_path: str) -> cfnlint.Template:
while self._lock.get(template_path):
sleep(0.1)
if template_path not in self._templates:
try:
self._lock[template_path] = True
try:
self._templates[template_path] = cfnlint.decode.cfn_yaml.load(
template_path
)
except ScannerError as e:
LOG.error(
f"Failed to parse template {template_path} {e.problem} at "
f"{e.problem_mark}"
)
raise
self._lock[template_path] = False
except Exception: # pylint: disable=broad-except
self._lock[template_path] = False
raise
return self._templates[template_path]
Methods
get
def get(
self,
template_path: str
) -> cfnlint.decorators.refactored.refactored.<locals>.cls_wrapper.<locals>.Wrapped
View Source
def get(self, template_path: str) -> cfnlint.Template:
while self._lock.get(template_path):
sleep(0.1)
if template_path not in self._templates:
try:
self._lock[template_path] = True
try:
self._templates[template_path] = cfnlint.decode.cfn_yaml.load(
template_path
)
except ScannerError as e:
LOG.error(
f"Failed to parse template {template_path} {e.problem} at "
f"{e.problem_mark}"
)
raise
self._lock[template_path] = False
except Exception: # pylint: disable=broad-except
self._lock[template_path] = False
raise
return self._templates[template_path]