Skip to content

Module taskcat.iam_policy.policy

None

None

View Source
import json

import logging

from pathlib import Path

from typing import List

import pkg_resources

from taskcat._config import Config

LOG = logging.getLogger(__name__)

class CFNPolicyGenerator:

    def __init__(self, config: Config, output_file: str):

        self._config = config

        self._data_file_path = pkg_resources.resource_filename(

            "taskcat", "/cfg/cfn_resource_iam_policy.json"

        )

        self._output_file = output_file

    def generate_policy(self):

        LOG.warning("This is an ALPHA feature. Use with caution")

        templates = []

        for template in self._config.get_templates().values():

            templates.append(template)

            templates += list(template.descendents)

        resource_types = set()

        for template in templates:

            for _resource_name, _resource in template.template["Resources"].items():

                resource_types.add(_resource["Type"])

        policy = self._policy_from_resource_types(list(resource_types))

        with open(Path(self._output_file).resolve(), "w", encoding="utf-8") as _f:

            _f.write(json.dumps(policy, indent=4, sort_keys=True))

    @staticmethod

    def _generate_placeholder(resource_type_name):

        svc_name = resource_type_name.split("::")[1].lower()

        _x = {

            "create": [f"{svc_name}:*"],

            "read": [f"{svc_name}:*"],

            "update": [f"{svc_name}:*"],

            "delete": [f"{svc_name}:*"],

        }

        return _x

    def _policy_from_resource_types(self, resource_types: List[str]):

        with open(self._data_file_path, encoding="utf-8") as _f:

            data = json.load(_f)

        _policy = {"Version": "2012-10-17", "Statement": []}

        _statements: dict = {

            "create": set(),

            "read": set(),

            "update": set(),

            "delete": set(),

        }

        for resource in resource_types:

            for k, v in data.get(

                resource, self._generate_placeholder(resource)

            ).items():

                for action in v:

                    _statements[k].add(action)

        for k, v in _statements.items():

            _policy["Statement"].append(

                {

                    "Sid": f"{k.upper()}Actions",

                    "Effect": "Allow",

                    "Action": sorted(v),

                    "Resource": "*",

                }

            )

        LOG.warning(

            "NOTE: The generated IAM policy will contain <service>:* IAM Actions where a"

            + " coverage gap exists within the CloudFormation Resource Spec"

        )

        LOG.warning(

            "Provide feedback to the CloudFormation team via: "

            + "https://github.com/aws-cloudformation/cloudformation-coverage-roadmap "

        )

        return _policy

Variables

LOG

Classes

CFNPolicyGenerator

class CFNPolicyGenerator(
    config: taskcat._config.Config,
    output_file: str
)
View Source
class CFNPolicyGenerator:

    def __init__(self, config: Config, output_file: str):

        self._config = config

        self._data_file_path = pkg_resources.resource_filename(

            "taskcat", "/cfg/cfn_resource_iam_policy.json"

        )

        self._output_file = output_file

    def generate_policy(self):

        LOG.warning("This is an ALPHA feature. Use with caution")

        templates = []

        for template in self._config.get_templates().values():

            templates.append(template)

            templates += list(template.descendents)

        resource_types = set()

        for template in templates:

            for _resource_name, _resource in template.template["Resources"].items():

                resource_types.add(_resource["Type"])

        policy = self._policy_from_resource_types(list(resource_types))

        with open(Path(self._output_file).resolve(), "w", encoding="utf-8") as _f:

            _f.write(json.dumps(policy, indent=4, sort_keys=True))

    @staticmethod

    def _generate_placeholder(resource_type_name):

        svc_name = resource_type_name.split("::")[1].lower()

        _x = {

            "create": [f"{svc_name}:*"],

            "read": [f"{svc_name}:*"],

            "update": [f"{svc_name}:*"],

            "delete": [f"{svc_name}:*"],

        }

        return _x

    def _policy_from_resource_types(self, resource_types: List[str]):

        with open(self._data_file_path, encoding="utf-8") as _f:

            data = json.load(_f)

        _policy = {"Version": "2012-10-17", "Statement": []}

        _statements: dict = {

            "create": set(),

            "read": set(),

            "update": set(),

            "delete": set(),

        }

        for resource in resource_types:

            for k, v in data.get(

                resource, self._generate_placeholder(resource)

            ).items():

                for action in v:

                    _statements[k].add(action)

        for k, v in _statements.items():

            _policy["Statement"].append(

                {

                    "Sid": f"{k.upper()}Actions",

                    "Effect": "Allow",

                    "Action": sorted(v),

                    "Resource": "*",

                }

            )

        LOG.warning(

            "NOTE: The generated IAM policy will contain <service>:* IAM Actions where a"

            + " coverage gap exists within the CloudFormation Resource Spec"

        )

        LOG.warning(

            "Provide feedback to the CloudFormation team via: "

            + "https://github.com/aws-cloudformation/cloudformation-coverage-roadmap "

        )

        return _policy

Methods

generate_policy

def generate_policy(
    self
)
View Source
    def generate_policy(self):

        LOG.warning("This is an ALPHA feature. Use with caution")

        templates = []

        for template in self._config.get_templates().values():

            templates.append(template)

            templates += list(template.descendents)

        resource_types = set()

        for template in templates:

            for _resource_name, _resource in template.template["Resources"].items():

                resource_types.add(_resource["Type"])

        policy = self._policy_from_resource_types(list(resource_types))

        with open(Path(self._output_file).resolve(), "w", encoding="utf-8") as _f:

            _f.write(json.dumps(policy, indent=4, sort_keys=True))