Module taskcat._cfn.threaded
None
None
View Source
import logging
import uuid
from functools import partial
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, List
import boto3
from taskcat._cfn.stack import Stack, Stacks, StackStatus
from taskcat._client_factory import Boto3Cache
from taskcat._common_utils import merge_dicts
from taskcat._dataclasses import Tag, TestObj, TestRegion
from taskcat.exceptions import TaskCatException
LOG = logging.getLogger(__name__)
def fan_out(func, partial_kwargs, payload, threads):
pool = ThreadPool(threads)
if partial_kwargs:
func = partial(func, **partial_kwargs)
results = pool.map(func, payload)
pool.close()
pool.join()
return results
class Stacker:
NULL_UUID = uuid.UUID(int=0)
def __init__(
self,
project_name: str,
tests: Dict[str, TestObj],
uid: uuid.UUID = NULL_UUID,
stack_name_prefix: str = "tCaT",
shorten_stack_name: bool = False,
tags: list = None,
):
self.tests = tests
self.project_name = project_name
self.stack_name_prefix = stack_name_prefix
self.shorten_stack_name = shorten_stack_name
self.tags = tags if tags else []
self.uid = uuid.uuid4() if uid == Stacker.NULL_UUID else uid
self.stacks: Stacks = Stacks()
@staticmethod
def _tests_to_list(tests: Dict[str, TestObj]):
return list(tests.values())
def create_stacks(self, threads: int = 8):
if self.stacks:
raise TaskCatException("Stacker already initialised with stack objects")
tests = self._tests_to_list(self.tests)
tags = [Tag({"Key": "taskcat-id", "Value": self.uid.hex})]
tags += [
Tag(t)
for t in self.tags
if t.key not in ["taskcat-project-name", "taskcat-test-name", "taskcat-id"]
]
fan_out(self._create_stacks_for_test, {"tags": tags}, tests, threads)
def _create_stacks_for_test(self, test, tags, threads: int = 32):
stack_name = test.stack_name
tags.append(Tag({"Key": "taskcat-project-name", "Value": self.project_name}))
tags.append(Tag({"Key": "taskcat-test-name", "Value": test.name}))
tags += test.tags
partial_kwargs = {
"stack_name": stack_name,
"template": test.template,
"tags": tags,
"test_name": test.name,
}
stacks = fan_out(Stack.create, partial_kwargs, test.regions, threads)
self.stacks += stacks
# Not used by tCat at present
def update_stacks(self):
raise NotImplementedError()
def delete_stacks(self, criteria: dict = None, deep=False, threads=32):
if deep:
raise NotImplementedError("deep delete not yet implemented")
fan_out(
self._delete_stacks_per_client,
None,
self._group_stacks(self.stacks.filter(criteria)),
threads,
)
def _delete_stacks_per_client(self, stacks, threads=8):
fan_out(self._delete_stack, None, stacks["Stacks"], threads)
@staticmethod
def _delete_stack(stack: Stack):
stack.delete(stack_id=stack.id, client=stack.client)
stack.refresh()
def status(self, recurse: bool = False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
stacks = self.stacks.filter(kwargs)
per_region_stacks = self._group_stacks(stacks)
results = fan_out(self._status_per_client, None, per_region_stacks, threads)
statuses: Dict[str, dict] = {"IN_PROGRESS": {}, "COMPLETE": {}, "FAILED": {}}
for region in results:
for status in region:
statuses[status[1]][status[0]] = status[2]
return statuses
def _status_per_client(self, stacks, threads: int = 8):
return fan_out(self._status, None, stacks["Stacks"], threads)
@staticmethod
def _status(stack: Stack):
for status_group in ["COMPLETE", "IN_PROGRESS", "FAILED"]:
if stack.status in getattr(StackStatus, status_group):
return stack.id, status_group, stack.status_reason
raise TaskCatException(f"Invalid stack {stack}")
def events(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
per_region_stacks = self._group_stacks(self.stacks)
results = fan_out(
self._events_per_client, {"criteria": kwargs}, per_region_stacks, threads
)
return merge_dicts(results)
def _events_per_client(self, stacks, criteria, threads: int = 8):
results = fan_out(
self._describe_stack_events,
{"criteria": criteria},
stacks["Stacks"],
threads,
)
return merge_dicts(results)
@staticmethod
def _describe_stack_events(stack: Stack, criteria):
return {stack.id: stack.events().filter(criteria)}
def resources(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
results = fan_out(
self._resources_per_client,
{"criteria": kwargs},
self._group_stacks(self.stacks),
threads,
)
return merge_dicts(results)
def _resources_per_client(self, stacks, criteria, threads: int = 8):
results = fan_out(
self._resources, {"criteria": criteria}, stacks["Stacks"], threads
)
return merge_dicts(results)
@staticmethod
def _resources(stack: Stack, criteria):
return {stack.id: stack.resources().filter(criteria)}
@classmethod
def from_existing(
cls,
uid: uuid.UUID,
project_name: str,
tests: Dict[str, TestObj],
include_deleted=False,
recurse=False,
threads=32,
):
if include_deleted:
raise NotImplementedError("including deleted stacks not implemented")
if recurse:
raise NotImplementedError("recurse not implemented")
clients: Dict[boto3.client, List[TestRegion]] = {}
for test in tests.values():
for region in test.regions:
client = region.client("cloudformation")
if client not in clients:
clients[client] = []
clients[client].append(region)
results = fan_out(
Stacker._import_stacks_per_client,
{"uid": uid, "project_name": project_name, "tests": tests},
clients.items(),
threads,
)
stacker = Stacker(project_name, tests, uid)
stacker.stacks = Stacks([item for sublist in results for item in sublist])
return stacker
@staticmethod
def _import_stacks_per_client(clients, uid, project_name, tests):
# pylint: disable=too-many-locals
stacks = Stacks()
client, region = clients
for page in client.get_paginator("describe_stacks").paginate():
for stack_props in page["Stacks"]:
if stack_props.get("ParentId"):
continue
match = False
project = ""
test = ""
for tag in stack_props["Tags"]:
k, v = (tag["Key"], tag["Value"])
if k == "taskcat-id" and v == uid.hex:
match = True
elif k == "taskcat-test-name" and v in tests:
test = v
elif k == "taskcat-project-name" and v == project_name:
project = v
if match and test and project:
stack = Stack.import_existing(
stack_props,
tests[test].template,
region[0],
test,
uid,
)
stacks.append(stack)
return stacks
@staticmethod
def _group_stacks(stacks: Stacks) -> List[dict]:
stacks_by_client: dict = {}
for stack in stacks:
client = stack.client
if client not in stacks_by_client:
stacks_by_client[client] = {"Client": client, "Stacks": []}
stacks_by_client[client]["Stacks"].append(stack)
return [
stacks_by_client[r]
for r in stacks_by_client # pylint: disable=consider-using-dict-items
]
@staticmethod
def list_stacks(profiles, regions):
stacks = fan_out(
Stacker._list_per_profile,
{"regions": regions, "boto_cache": Boto3Cache()},
profiles,
threads=8,
)
return [stack for sublist in stacks for stack in sublist]
@staticmethod
def _list_per_profile(profile, regions, boto_cache):
stacks = fan_out(
Stacker._get_taskcat_stacks,
{"boto_cache": boto_cache, "profile": profile},
regions,
threads=len(regions),
)
return [stack for sublist in stacks for stack in sublist]
@staticmethod
def _get_taskcat_stacks(region, boto_cache: Boto3Cache, profile: str):
stacks = []
try:
cfn = boto_cache.client("cloudformation", profile=profile, region=region)
for page in cfn.get_paginator("describe_stacks").paginate():
for stack_props in page["Stacks"]:
if stack_props.get("ParentId"):
continue
stack_id = stack_props["StackId"]
stack_name = stack_id.split("/")[1]
stack = {
"region": region,
"profile": profile,
"stack-id": stack_id,
"stack-name": stack_name,
}
for tag in stack_props["Tags"]:
k, v = (tag["Key"], tag["Value"])
if k.startswith("taskcat-"):
stack[k] = v
if stack.get("taskcat-id"):
stack["taskcat-id"] = uuid.UUID(stack["taskcat-id"])
stacks.append(stack)
except Exception as e: # pylint: disable=broad-except
LOG.warning(
f"Failed to fetch stacks for region {region} using profile "
f"{profile} {type(e)} {e}"
)
LOG.debug("Traceback:", exc_info=True)
return stacks
Variables
LOG
Functions
fan_out
def fan_out(
func,
partial_kwargs,
payload,
threads
)
View Source
def fan_out(func, partial_kwargs, payload, threads):
pool = ThreadPool(threads)
if partial_kwargs:
func = partial(func, **partial_kwargs)
results = pool.map(func, payload)
pool.close()
pool.join()
return results
Classes
Stacker
class Stacker(
project_name: str,
tests: Dict[str, taskcat._dataclasses.TestObj],
uid: uuid.UUID = UUID('00000000-0000-0000-0000-000000000000'),
stack_name_prefix: str = 'tCaT',
shorten_stack_name: bool = False,
tags: list = None
)
View Source
class Stacker:
NULL_UUID = uuid.UUID(int=0)
def __init__(
self,
project_name: str,
tests: Dict[str, TestObj],
uid: uuid.UUID = NULL_UUID,
stack_name_prefix: str = "tCaT",
shorten_stack_name: bool = False,
tags: list = None,
):
self.tests = tests
self.project_name = project_name
self.stack_name_prefix = stack_name_prefix
self.shorten_stack_name = shorten_stack_name
self.tags = tags if tags else []
self.uid = uuid.uuid4() if uid == Stacker.NULL_UUID else uid
self.stacks: Stacks = Stacks()
@staticmethod
def _tests_to_list(tests: Dict[str, TestObj]):
return list(tests.values())
def create_stacks(self, threads: int = 8):
if self.stacks:
raise TaskCatException("Stacker already initialised with stack objects")
tests = self._tests_to_list(self.tests)
tags = [Tag({"Key": "taskcat-id", "Value": self.uid.hex})]
tags += [
Tag(t)
for t in self.tags
if t.key not in ["taskcat-project-name", "taskcat-test-name", "taskcat-id"]
]
fan_out(self._create_stacks_for_test, {"tags": tags}, tests, threads)
def _create_stacks_for_test(self, test, tags, threads: int = 32):
stack_name = test.stack_name
tags.append(Tag({"Key": "taskcat-project-name", "Value": self.project_name}))
tags.append(Tag({"Key": "taskcat-test-name", "Value": test.name}))
tags += test.tags
partial_kwargs = {
"stack_name": stack_name,
"template": test.template,
"tags": tags,
"test_name": test.name,
}
stacks = fan_out(Stack.create, partial_kwargs, test.regions, threads)
self.stacks += stacks
# Not used by tCat at present
def update_stacks(self):
raise NotImplementedError()
def delete_stacks(self, criteria: dict = None, deep=False, threads=32):
if deep:
raise NotImplementedError("deep delete not yet implemented")
fan_out(
self._delete_stacks_per_client,
None,
self._group_stacks(self.stacks.filter(criteria)),
threads,
)
def _delete_stacks_per_client(self, stacks, threads=8):
fan_out(self._delete_stack, None, stacks["Stacks"], threads)
@staticmethod
def _delete_stack(stack: Stack):
stack.delete(stack_id=stack.id, client=stack.client)
stack.refresh()
def status(self, recurse: bool = False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
stacks = self.stacks.filter(kwargs)
per_region_stacks = self._group_stacks(stacks)
results = fan_out(self._status_per_client, None, per_region_stacks, threads)
statuses: Dict[str, dict] = {"IN_PROGRESS": {}, "COMPLETE": {}, "FAILED": {}}
for region in results:
for status in region:
statuses[status[1]][status[0]] = status[2]
return statuses
def _status_per_client(self, stacks, threads: int = 8):
return fan_out(self._status, None, stacks["Stacks"], threads)
@staticmethod
def _status(stack: Stack):
for status_group in ["COMPLETE", "IN_PROGRESS", "FAILED"]:
if stack.status in getattr(StackStatus, status_group):
return stack.id, status_group, stack.status_reason
raise TaskCatException(f"Invalid stack {stack}")
def events(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
per_region_stacks = self._group_stacks(self.stacks)
results = fan_out(
self._events_per_client, {"criteria": kwargs}, per_region_stacks, threads
)
return merge_dicts(results)
def _events_per_client(self, stacks, criteria, threads: int = 8):
results = fan_out(
self._describe_stack_events,
{"criteria": criteria},
stacks["Stacks"],
threads,
)
return merge_dicts(results)
@staticmethod
def _describe_stack_events(stack: Stack, criteria):
return {stack.id: stack.events().filter(criteria)}
def resources(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
results = fan_out(
self._resources_per_client,
{"criteria": kwargs},
self._group_stacks(self.stacks),
threads,
)
return merge_dicts(results)
def _resources_per_client(self, stacks, criteria, threads: int = 8):
results = fan_out(
self._resources, {"criteria": criteria}, stacks["Stacks"], threads
)
return merge_dicts(results)
@staticmethod
def _resources(stack: Stack, criteria):
return {stack.id: stack.resources().filter(criteria)}
@classmethod
def from_existing(
cls,
uid: uuid.UUID,
project_name: str,
tests: Dict[str, TestObj],
include_deleted=False,
recurse=False,
threads=32,
):
if include_deleted:
raise NotImplementedError("including deleted stacks not implemented")
if recurse:
raise NotImplementedError("recurse not implemented")
clients: Dict[boto3.client, List[TestRegion]] = {}
for test in tests.values():
for region in test.regions:
client = region.client("cloudformation")
if client not in clients:
clients[client] = []
clients[client].append(region)
results = fan_out(
Stacker._import_stacks_per_client,
{"uid": uid, "project_name": project_name, "tests": tests},
clients.items(),
threads,
)
stacker = Stacker(project_name, tests, uid)
stacker.stacks = Stacks([item for sublist in results for item in sublist])
return stacker
@staticmethod
def _import_stacks_per_client(clients, uid, project_name, tests):
# pylint: disable=too-many-locals
stacks = Stacks()
client, region = clients
for page in client.get_paginator("describe_stacks").paginate():
for stack_props in page["Stacks"]:
if stack_props.get("ParentId"):
continue
match = False
project = ""
test = ""
for tag in stack_props["Tags"]:
k, v = (tag["Key"], tag["Value"])
if k == "taskcat-id" and v == uid.hex:
match = True
elif k == "taskcat-test-name" and v in tests:
test = v
elif k == "taskcat-project-name" and v == project_name:
project = v
if match and test and project:
stack = Stack.import_existing(
stack_props,
tests[test].template,
region[0],
test,
uid,
)
stacks.append(stack)
return stacks
@staticmethod
def _group_stacks(stacks: Stacks) -> List[dict]:
stacks_by_client: dict = {}
for stack in stacks:
client = stack.client
if client not in stacks_by_client:
stacks_by_client[client] = {"Client": client, "Stacks": []}
stacks_by_client[client]["Stacks"].append(stack)
return [
stacks_by_client[r]
for r in stacks_by_client # pylint: disable=consider-using-dict-items
]
@staticmethod
def list_stacks(profiles, regions):
stacks = fan_out(
Stacker._list_per_profile,
{"regions": regions, "boto_cache": Boto3Cache()},
profiles,
threads=8,
)
return [stack for sublist in stacks for stack in sublist]
@staticmethod
def _list_per_profile(profile, regions, boto_cache):
stacks = fan_out(
Stacker._get_taskcat_stacks,
{"boto_cache": boto_cache, "profile": profile},
regions,
threads=len(regions),
)
return [stack for sublist in stacks for stack in sublist]
@staticmethod
def _get_taskcat_stacks(region, boto_cache: Boto3Cache, profile: str):
stacks = []
try:
cfn = boto_cache.client("cloudformation", profile=profile, region=region)
for page in cfn.get_paginator("describe_stacks").paginate():
for stack_props in page["Stacks"]:
if stack_props.get("ParentId"):
continue
stack_id = stack_props["StackId"]
stack_name = stack_id.split("/")[1]
stack = {
"region": region,
"profile": profile,
"stack-id": stack_id,
"stack-name": stack_name,
}
for tag in stack_props["Tags"]:
k, v = (tag["Key"], tag["Value"])
if k.startswith("taskcat-"):
stack[k] = v
if stack.get("taskcat-id"):
stack["taskcat-id"] = uuid.UUID(stack["taskcat-id"])
stacks.append(stack)
except Exception as e: # pylint: disable=broad-except
LOG.warning(
f"Failed to fetch stacks for region {region} using profile "
f"{profile} {type(e)} {e}"
)
LOG.debug("Traceback:", exc_info=True)
return stacks
Class variables
NULL_UUID
Static methods
from_existing
def from_existing(
uid: uuid.UUID,
project_name: str,
tests: Dict[str, taskcat._dataclasses.TestObj],
include_deleted=False,
recurse=False,
threads=32
)
View Source
@classmethod
def from_existing(
cls,
uid: uuid.UUID,
project_name: str,
tests: Dict[str, TestObj],
include_deleted=False,
recurse=False,
threads=32,
):
if include_deleted:
raise NotImplementedError("including deleted stacks not implemented")
if recurse:
raise NotImplementedError("recurse not implemented")
clients: Dict[boto3.client, List[TestRegion]] = {}
for test in tests.values():
for region in test.regions:
client = region.client("cloudformation")
if client not in clients:
clients[client] = []
clients[client].append(region)
results = fan_out(
Stacker._import_stacks_per_client,
{"uid": uid, "project_name": project_name, "tests": tests},
clients.items(),
threads,
)
stacker = Stacker(project_name, tests, uid)
stacker.stacks = Stacks([item for sublist in results for item in sublist])
return stacker
list_stacks
def list_stacks(
profiles,
regions
)
View Source
@staticmethod
def list_stacks(profiles, regions):
stacks = fan_out(
Stacker._list_per_profile,
{"regions": regions, "boto_cache": Boto3Cache()},
profiles,
threads=8,
)
return [stack for sublist in stacks for stack in sublist]
Methods
create_stacks
def create_stacks(
self,
threads: int = 8
)
View Source
def create_stacks(self, threads: int = 8):
if self.stacks:
raise TaskCatException("Stacker already initialised with stack objects")
tests = self._tests_to_list(self.tests)
tags = [Tag({"Key": "taskcat-id", "Value": self.uid.hex})]
tags += [
Tag(t)
for t in self.tags
if t.key not in ["taskcat-project-name", "taskcat-test-name", "taskcat-id"]
]
fan_out(self._create_stacks_for_test, {"tags": tags}, tests, threads)
delete_stacks
def delete_stacks(
self,
criteria: dict = None,
deep=False,
threads=32
)
View Source
def delete_stacks(self, criteria: dict = None, deep=False, threads=32):
if deep:
raise NotImplementedError("deep delete not yet implemented")
fan_out(
self._delete_stacks_per_client,
None,
self._group_stacks(self.stacks.filter(criteria)),
threads,
)
events
def events(
self,
recurse=False,
threads: int = 32,
**kwargs
)
View Source
def events(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
per_region_stacks = self._group_stacks(self.stacks)
results = fan_out(
self._events_per_client, {"criteria": kwargs}, per_region_stacks, threads
)
return merge_dicts(results)
resources
def resources(
self,
recurse=False,
threads: int = 32,
**kwargs
)
View Source
def resources(self, recurse=False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
results = fan_out(
self._resources_per_client,
{"criteria": kwargs},
self._group_stacks(self.stacks),
threads,
)
return merge_dicts(results)
status
def status(
self,
recurse: bool = False,
threads: int = 32,
**kwargs
)
View Source
def status(self, recurse: bool = False, threads: int = 32, **kwargs):
if recurse:
raise NotImplementedError("recurse not implemented")
stacks = self.stacks.filter(kwargs)
per_region_stacks = self._group_stacks(stacks)
results = fan_out(self._status_per_client, None, per_region_stacks, threads)
statuses: Dict[str, dict] = {"IN_PROGRESS": {}, "COMPLETE": {}, "FAILED": {}}
for region in results:
for status in region:
statuses[status[1]][status[0]] = status[2]
return statuses
update_stacks
def update_stacks(
self
)
View Source
def update_stacks(self):
raise NotImplementedError()