Skip to content

Module taskcat._cli_modules.test

None

None

View Source
# pylint: disable=duplicate-code

# noqa: B950,F841

import inspect

import logging

import os

import sys

import tempfile

from pathlib import Path

import boto3

import yaml

from taskcat._common_utils import determine_profile_for_region

from taskcat._config import Config

from taskcat._tui import TerminalPrinter

from taskcat.testing import CFNTest

from .delete import Delete

from .list import List

LOG = logging.getLogger(__name__)

class Test:

    """

    Performs functional tests on CloudFormation templates.

    """

    # pylint: disable=too-many-locals

    @staticmethod

    def retry(

        region: str,

        stack_name: str,

        resource_name: str,

        config_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        keep_failed: bool = False,

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

    ):

        """[ALPHA] re-launches a child stack using the same parameters as previous

        launch

        :param region: region stack is in

        :param stack_name: name of parent stack

        :param resource_name: logical id of child stack that will be re-launched

        :param config_file: path to either a taskat project config file or a

        CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param keep_failed: do not delete failed stacks

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        """

        LOG.warning("test retry is in alpha feature, use with caution")

        project_root_path: Path = Path(project_root).expanduser().resolve()

        input_file_path: Path = project_root_path / config_file

        config = Config.create(

            project_root=project_root_path, project_config_path=input_file_path

        )

        profile = determine_profile_for_region(config.config.general.auth, region)

        cfn = boto3.Session(profile_name=profile).client(

            "cloudformation", region_name=region

        )

        events = cfn.describe_stack_events(StackName=stack_name)["StackEvents"]

        resource = [i for i in events if i["LogicalResourceId"] == resource_name][0]

        properties = yaml.safe_load(resource["ResourceProperties"])

        with open(str(input_file_path), "r", encoding="utf-8") as filepointer:

            config_yaml = yaml.safe_load(filepointer)

        config_yaml["project"]["regions"] = [region]

        config_yaml["project"]["parameters"] = properties["Parameters"]

        config_yaml["project"]["template"] = "/".join(

            properties["TemplateURL"].split("/")[4:]

        )

        config_yaml["tests"] = {"default": {}}

        tmpdir = tempfile.mkdtemp()

        name = ".taskcat.yml.temp"

        umask = os.umask(0o77)

        file_path = os.path.join(tmpdir, name)

        try:

            with open(file_path, "w", encoding="utf-8") as filepointer:  # nosec

                yaml.safe_dump(config_yaml, filepointer)

            if resource["PhysicalResourceId"]:

                cfn.delete_stack(StackName=resource["PhysicalResourceId"])

                LOG.info("waiting for old stack to delete...")

                cfn.get_waiter("stack_delete_complete").wait(

                    StackName=resource["PhysicalResourceId"]

                )

            Test.run(

                input_file=file_path,  # nosec

                project_root=project_root,

                lint_disable=True,

                no_delete=no_delete,

                keep_failed=keep_failed,

                minimal_output=minimal_output,

                dont_wait_for_delete=dont_wait_for_delete,

            )

        except IOError:

            LOG.error("IOError when retrying Test Run")

            sys.exit(1)

        else:

            os.remove(file_path)

        finally:

            os.umask(umask)

            os.rmdir(tmpdir)

    @staticmethod

    # pylint: disable=too-many-arguments,W0613,line-too-long

    def run(  # noqa: C901

        test_names: str = "ALL",

        regions: str = "ALL",

        input_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        lint_disable: bool = False,

        enable_sig_v2: bool = False,

        keep_failed: bool = False,

        output_directory: str = "./taskcat_outputs",

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

        skip_upload: bool = False,

        _extra_tags: List = None,

    ):

        """tests whether CloudFormation templates are able to successfully launch

        :param test_names: comma separated list of tests to run

        :param regions: comma separated list of regions to test in

        :param input_file: path to either a taskat project config file or a CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param lint_disable: disable cfn-lint checks

        :param enable_sig_v2: enable legacy sigv2 requests for auto-created buckets

        :param keep_failed: do not delete failed stacks

        :param output_directory: Where to store generated logfiles

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        :param skip_upload: Use templates in an existing cloudformation bucket.

        """  # noqa: B950

        test = CFNTest.from_file(

            project_root=project_root,

            input_file=input_file,

            regions=regions,

            enable_sig_v2=enable_sig_v2,

        )

        # This code is temporary and should be removed once its easier

        # to create a config object

        frame = inspect.currentframe()

        if frame is not None:

            args, _, _, values = inspect.getargvalues(frame)

            for i in args:

                if hasattr(test, i):

                    setattr(test, i, values[i])

        terminal_printer = TerminalPrinter(minimalist=minimal_output)

        test.printer = terminal_printer

        # Runs here

        with test:

            test.report(output_directory)

    def resume(self, run_id):  # pylint: disable=no-self-use

        """resumes a monitoring of a previously started test run"""

        # do some stuff

        raise NotImplementedError()

    @staticmethod

    def list(profiles: str = "default", regions="ALL"):

        """

        :param profiles: comma separated list of aws profiles to search

        :param regions: comma separated list of regions to search, default is to check

        all commercial regions

        """

        List(profiles=profiles, regions=regions, stack_type="test")

    @staticmethod

    def clean(project: str, aws_profile: str = "default", region="ALL"):

        """

        :param project: project to delete, can be an name or uuid, or ALL to clean all

        tests

        :param aws_profile: aws profile to use for deletion

        :param region: region to delete from, default will scan all regions

        """

        Delete(

            project=project, aws_profile=aws_profile, region=region, stack_type="test"

        )

Variables

LOG

Classes

Test

class Test(
    /,
    *args,
    **kwargs
)
View Source
class Test:

    """

    Performs functional tests on CloudFormation templates.

    """

    # pylint: disable=too-many-locals

    @staticmethod

    def retry(

        region: str,

        stack_name: str,

        resource_name: str,

        config_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        keep_failed: bool = False,

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

    ):

        """[ALPHA] re-launches a child stack using the same parameters as previous

        launch

        :param region: region stack is in

        :param stack_name: name of parent stack

        :param resource_name: logical id of child stack that will be re-launched

        :param config_file: path to either a taskat project config file or a

        CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param keep_failed: do not delete failed stacks

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        """

        LOG.warning("test retry is in alpha feature, use with caution")

        project_root_path: Path = Path(project_root).expanduser().resolve()

        input_file_path: Path = project_root_path / config_file

        config = Config.create(

            project_root=project_root_path, project_config_path=input_file_path

        )

        profile = determine_profile_for_region(config.config.general.auth, region)

        cfn = boto3.Session(profile_name=profile).client(

            "cloudformation", region_name=region

        )

        events = cfn.describe_stack_events(StackName=stack_name)["StackEvents"]

        resource = [i for i in events if i["LogicalResourceId"] == resource_name][0]

        properties = yaml.safe_load(resource["ResourceProperties"])

        with open(str(input_file_path), "r", encoding="utf-8") as filepointer:

            config_yaml = yaml.safe_load(filepointer)

        config_yaml["project"]["regions"] = [region]

        config_yaml["project"]["parameters"] = properties["Parameters"]

        config_yaml["project"]["template"] = "/".join(

            properties["TemplateURL"].split("/")[4:]

        )

        config_yaml["tests"] = {"default": {}}

        tmpdir = tempfile.mkdtemp()

        name = ".taskcat.yml.temp"

        umask = os.umask(0o77)

        file_path = os.path.join(tmpdir, name)

        try:

            with open(file_path, "w", encoding="utf-8") as filepointer:  # nosec

                yaml.safe_dump(config_yaml, filepointer)

            if resource["PhysicalResourceId"]:

                cfn.delete_stack(StackName=resource["PhysicalResourceId"])

                LOG.info("waiting for old stack to delete...")

                cfn.get_waiter("stack_delete_complete").wait(

                    StackName=resource["PhysicalResourceId"]

                )

            Test.run(

                input_file=file_path,  # nosec

                project_root=project_root,

                lint_disable=True,

                no_delete=no_delete,

                keep_failed=keep_failed,

                minimal_output=minimal_output,

                dont_wait_for_delete=dont_wait_for_delete,

            )

        except IOError:

            LOG.error("IOError when retrying Test Run")

            sys.exit(1)

        else:

            os.remove(file_path)

        finally:

            os.umask(umask)

            os.rmdir(tmpdir)

    @staticmethod

    # pylint: disable=too-many-arguments,W0613,line-too-long

    def run(  # noqa: C901

        test_names: str = "ALL",

        regions: str = "ALL",

        input_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        lint_disable: bool = False,

        enable_sig_v2: bool = False,

        keep_failed: bool = False,

        output_directory: str = "./taskcat_outputs",

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

        skip_upload: bool = False,

        _extra_tags: List = None,

    ):

        """tests whether CloudFormation templates are able to successfully launch

        :param test_names: comma separated list of tests to run

        :param regions: comma separated list of regions to test in

        :param input_file: path to either a taskat project config file or a CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param lint_disable: disable cfn-lint checks

        :param enable_sig_v2: enable legacy sigv2 requests for auto-created buckets

        :param keep_failed: do not delete failed stacks

        :param output_directory: Where to store generated logfiles

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        :param skip_upload: Use templates in an existing cloudformation bucket.

        """  # noqa: B950

        test = CFNTest.from_file(

            project_root=project_root,

            input_file=input_file,

            regions=regions,

            enable_sig_v2=enable_sig_v2,

        )

        # This code is temporary and should be removed once its easier

        # to create a config object

        frame = inspect.currentframe()

        if frame is not None:

            args, _, _, values = inspect.getargvalues(frame)

            for i in args:

                if hasattr(test, i):

                    setattr(test, i, values[i])

        terminal_printer = TerminalPrinter(minimalist=minimal_output)

        test.printer = terminal_printer

        # Runs here

        with test:

            test.report(output_directory)

    def resume(self, run_id):  # pylint: disable=no-self-use

        """resumes a monitoring of a previously started test run"""

        # do some stuff

        raise NotImplementedError()

    @staticmethod

    def list(profiles: str = "default", regions="ALL"):

        """

        :param profiles: comma separated list of aws profiles to search

        :param regions: comma separated list of regions to search, default is to check

        all commercial regions

        """

        List(profiles=profiles, regions=regions, stack_type="test")

    @staticmethod

    def clean(project: str, aws_profile: str = "default", region="ALL"):

        """

        :param project: project to delete, can be an name or uuid, or ALL to clean all

        tests

        :param aws_profile: aws profile to use for deletion

        :param region: region to delete from, default will scan all regions

        """

        Delete(

            project=project, aws_profile=aws_profile, region=region, stack_type="test"

        )

Static methods

clean

def clean(
    project: str,
    aws_profile: str = 'default',
    region='ALL'
)

Parameters:

Name Type Description Default
project None project to delete, can be an name or uuid, or ALL to clean all
tests None
aws_profile None aws profile to use for deletion None
region None region to delete from, default will scan all regions None
View Source
    @staticmethod

    def clean(project: str, aws_profile: str = "default", region="ALL"):

        """

        :param project: project to delete, can be an name or uuid, or ALL to clean all

        tests

        :param aws_profile: aws profile to use for deletion

        :param region: region to delete from, default will scan all regions

        """

        Delete(

            project=project, aws_profile=aws_profile, region=region, stack_type="test"

        )

list

def list(
    profiles: str = 'default',
    regions='ALL'
)

Parameters:

Name Type Description Default
profiles None comma separated list of aws profiles to search None
regions None comma separated list of regions to search, default is to check
all commercial regions None
View Source
    @staticmethod

    def list(profiles: str = "default", regions="ALL"):

        """

        :param profiles: comma separated list of aws profiles to search

        :param regions: comma separated list of regions to search, default is to check

        all commercial regions

        """

        List(profiles=profiles, regions=regions, stack_type="test")

retry

def retry(
    region: str,
    stack_name: str,
    resource_name: str,
    config_file: str = './.taskcat.yml',
    project_root: str = './',
    no_delete: bool = False,
    keep_failed: bool = False,
    minimal_output: bool = False,
    dont_wait_for_delete: bool = False
)

[ALPHA] re-launches a child stack using the same parameters as previous

launch

Parameters:

Name Type Description Default
region None region stack is in None
stack_name None name of parent stack None
resource_name None logical id of child stack that will be re-launched None
config_file None path to either a taskat project config file or a
CloudFormation template None
project_root None root path of the project relative to input_file None
no_delete None don't delete stacks after test is complete None
keep_failed None do not delete failed stacks None
minimal_output None Reduces output during test runs None
dont_wait_for_delete None Exits immediately after calling stack_delete None
View Source
    @staticmethod

    def retry(

        region: str,

        stack_name: str,

        resource_name: str,

        config_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        keep_failed: bool = False,

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

    ):

        """[ALPHA] re-launches a child stack using the same parameters as previous

        launch

        :param region: region stack is in

        :param stack_name: name of parent stack

        :param resource_name: logical id of child stack that will be re-launched

        :param config_file: path to either a taskat project config file or a

        CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param keep_failed: do not delete failed stacks

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        """

        LOG.warning("test retry is in alpha feature, use with caution")

        project_root_path: Path = Path(project_root).expanduser().resolve()

        input_file_path: Path = project_root_path / config_file

        config = Config.create(

            project_root=project_root_path, project_config_path=input_file_path

        )

        profile = determine_profile_for_region(config.config.general.auth, region)

        cfn = boto3.Session(profile_name=profile).client(

            "cloudformation", region_name=region

        )

        events = cfn.describe_stack_events(StackName=stack_name)["StackEvents"]

        resource = [i for i in events if i["LogicalResourceId"] == resource_name][0]

        properties = yaml.safe_load(resource["ResourceProperties"])

        with open(str(input_file_path), "r", encoding="utf-8") as filepointer:

            config_yaml = yaml.safe_load(filepointer)

        config_yaml["project"]["regions"] = [region]

        config_yaml["project"]["parameters"] = properties["Parameters"]

        config_yaml["project"]["template"] = "/".join(

            properties["TemplateURL"].split("/")[4:]

        )

        config_yaml["tests"] = {"default": {}}

        tmpdir = tempfile.mkdtemp()

        name = ".taskcat.yml.temp"

        umask = os.umask(0o77)

        file_path = os.path.join(tmpdir, name)

        try:

            with open(file_path, "w", encoding="utf-8") as filepointer:  # nosec

                yaml.safe_dump(config_yaml, filepointer)

            if resource["PhysicalResourceId"]:

                cfn.delete_stack(StackName=resource["PhysicalResourceId"])

                LOG.info("waiting for old stack to delete...")

                cfn.get_waiter("stack_delete_complete").wait(

                    StackName=resource["PhysicalResourceId"]

                )

            Test.run(

                input_file=file_path,  # nosec

                project_root=project_root,

                lint_disable=True,

                no_delete=no_delete,

                keep_failed=keep_failed,

                minimal_output=minimal_output,

                dont_wait_for_delete=dont_wait_for_delete,

            )

        except IOError:

            LOG.error("IOError when retrying Test Run")

            sys.exit(1)

        else:

            os.remove(file_path)

        finally:

            os.umask(umask)

            os.rmdir(tmpdir)

run

def run(
    test_names: str = 'ALL',
    regions: str = 'ALL',
    input_file: str = './.taskcat.yml',
    project_root: str = './',
    no_delete: bool = False,
    lint_disable: bool = False,
    enable_sig_v2: bool = False,
    keep_failed: bool = False,
    output_directory: str = './taskcat_outputs',
    minimal_output: bool = False,
    dont_wait_for_delete: bool = False,
    skip_upload: bool = False,
    _extra_tags: taskcat._cli_modules.list.List = None
)

tests whether CloudFormation templates are able to successfully launch

Parameters:

Name Type Description Default
test_names None comma separated list of tests to run None
regions None comma separated list of regions to test in None
input_file None path to either a taskat project config file or a CloudFormation template None
project_root None root path of the project relative to input_file None
no_delete None don't delete stacks after test is complete None
lint_disable None disable cfn-lint checks None
enable_sig_v2 None enable legacy sigv2 requests for auto-created buckets None
keep_failed None do not delete failed stacks None
output_directory None Where to store generated logfiles None
minimal_output None Reduces output during test runs None
dont_wait_for_delete None Exits immediately after calling stack_delete None
skip_upload None Use templates in an existing cloudformation bucket. None
View Source
    @staticmethod

    # pylint: disable=too-many-arguments,W0613,line-too-long

    def run(  # noqa: C901

        test_names: str = "ALL",

        regions: str = "ALL",

        input_file: str = "./.taskcat.yml",

        project_root: str = "./",

        no_delete: bool = False,

        lint_disable: bool = False,

        enable_sig_v2: bool = False,

        keep_failed: bool = False,

        output_directory: str = "./taskcat_outputs",

        minimal_output: bool = False,

        dont_wait_for_delete: bool = False,

        skip_upload: bool = False,

        _extra_tags: List = None,

    ):

        """tests whether CloudFormation templates are able to successfully launch

        :param test_names: comma separated list of tests to run

        :param regions: comma separated list of regions to test in

        :param input_file: path to either a taskat project config file or a CloudFormation template

        :param project_root: root path of the project relative to input_file

        :param no_delete: don't delete stacks after test is complete

        :param lint_disable: disable cfn-lint checks

        :param enable_sig_v2: enable legacy sigv2 requests for auto-created buckets

        :param keep_failed: do not delete failed stacks

        :param output_directory: Where to store generated logfiles

        :param minimal_output: Reduces output during test runs

        :param dont_wait_for_delete: Exits immediately after calling stack_delete

        :param skip_upload: Use templates in an existing cloudformation bucket.

        """  # noqa: B950

        test = CFNTest.from_file(

            project_root=project_root,

            input_file=input_file,

            regions=regions,

            enable_sig_v2=enable_sig_v2,

        )

        # This code is temporary and should be removed once its easier

        # to create a config object

        frame = inspect.currentframe()

        if frame is not None:

            args, _, _, values = inspect.getargvalues(frame)

            for i in args:

                if hasattr(test, i):

                    setattr(test, i, values[i])

        terminal_printer = TerminalPrinter(minimalist=minimal_output)

        test.printer = terminal_printer

        # Runs here

        with test:

            test.report(output_directory)

Methods

resume

def resume(
    self,
    run_id
)

resumes a monitoring of a previously started test run

View Source
    def resume(self, run_id):  # pylint: disable=no-self-use

        """resumes a monitoring of a previously started test run"""

        # do some stuff

        raise NotImplementedError()