def __init__(
self,
project: str,
aws_profile: str = "default",
region="ALL",
no_verify: bool = False,
stack_type: str = "ALL",
wait_for_delete: bool = False,
):
"""
:param project: installed project to delete, can be an install name, uuid, or project name
:param aws_profile: aws profile to use for deletion
:param region: region(s) to delete from, by default, will delete all applicable\
stacks, supply a csv "us-east-1,us-west-1" to override this default
:param no_verify: ignore region verification, delete will not error if an invalid\
region is detected
:param stack_type: type of stacks to delete, allowable options are ["project","test","ALL"]
:param wait_for_delete: if True, wait for CloudFormation stacks to be deleted before continuing,
allowable options are ["True", "False"]
"""
regions = []
boto3_cache = Boto3Cache()
if region == "default":
regions = boto3_cache.get_default_region(aws_profile)
elif region == "ALL":
region_set: set = set()
region_set = region_set.union(
# pylint: disable=duplicate-code
set(
boto3.Session(profile_name=aws_profile).get_available_regions(
"cloudformation"
)
)
)
regions = list(region_set)
elif isinstance(region, str):
regions = (
self._validate_regions(region) if not no_verify else region.split(",")
)
stacks = Stacker.list_stacks([aws_profile], regions)
jobs = []
for stack in stacks:
name = stack.get("taskcat-installer", stack["taskcat-project-name"])
job = {
"name": name,
"project_name": stack["taskcat-project-name"],
"test_name": stack["taskcat-test-name"],
"taskcat_id": stack["taskcat-id"].hex,
"region": stack["region"],
"stack_id": stack["stack-id"],
}
if stack_type in ["project", "ALL"] and project in [
job["name"],
job["taskcat_id"],
"ALL",
]:
jobs.append(job)
if stack_type in ["test", "ALL"] and project in [
job["project_name"],
"ALL",
]:
jobs.append(job)
with ThreadPoolExecutor() as executor:
stack_futures = {
executor.submit(
self._delete_stack,
boto3_cache=boto3_cache,
job=job,
aws_profile=aws_profile,
wait_for_delete=wait_for_delete,
): [job["name"], job["region"]]
for job in jobs
}
for stack_future in as_completed(stack_futures):
name_and_region = stack_futures[stack_future]
try:
stack_future.result()
# pylint: disable=broad-except
except Exception:
LOG.error(f"{name_and_region[0]} failed in {name_and_region[1]}")
else:
LOG.info(f"{name_and_region[0]} deleted in {name_and_region[1]}")