HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/lib/python3/dist-packages/awscli/customizations/eks/kubeconfig.py
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import os
import yaml
import logging
import errno
from botocore.compat import OrderedDict

from awscli.customizations.eks.exceptions import EKSError
from awscli.customizations.eks.ordered_yaml import (ordered_yaml_load,
                                                    ordered_yaml_dump)


class KubeconfigError(EKSError):
    """ Base class for all kubeconfig errors."""


class KubeconfigCorruptedError(KubeconfigError):
    """ Raised when a kubeconfig cannot be parsed."""


class KubeconfigInaccessableError(KubeconfigError):
    """ Raised when a kubeconfig cannot be opened for read/writing."""


def _get_new_kubeconfig_content():
    return OrderedDict([
        ("apiVersion", "v1"),
        ("clusters", []),
        ("contexts", []),
        ("current-context", ""),
        ("kind", "Config"),
        ("preferences", OrderedDict()),
        ("users", [])
    ])


class Kubeconfig(object):
    def __init__(self, path, content=None):
        self.path = path
        if content is None:
            content = _get_new_kubeconfig_content()
        self.content = content

    def dump_content(self):
        """ Return the stored content in yaml format. """
        return ordered_yaml_dump(self.content)

    def has_cluster(self, name):
        """
        Return true if this kubeconfig contains an entry
        For the passed cluster name.
        """
        if 'clusters' not in self.content:
            return False
        return name in [cluster['name']
                        for cluster in self.content['clusters']]


class KubeconfigValidator(object):
    def __init__(self):
        # Validation_content is an empty Kubeconfig
        # It is used as a way to know what types different entries should be
        self._validation_content = Kubeconfig(None, None).content

    def validate_config(self, config):
        """
        Raises KubeconfigCorruptedError if the passed content is invalid

        :param config: The config to validate
        :type config: Kubeconfig
        """
        if not isinstance(config, Kubeconfig):
            raise KubeconfigCorruptedError("Internal error: "
                                           "Not a Kubeconfig object.")
        self._validate_config_types(config)
        self._validate_list_entry_types(config)

    def _validate_config_types(self, config):
        """
        Raises KubeconfigCorruptedError if any of the entries in config
        are the wrong type

        :param config: The config to validate
        :type config: Kubeconfig
        """
        if not isinstance(config.content, dict):
            raise KubeconfigCorruptedError("Content not a dictionary.")
        for key, value in self._validation_content.items():
            if (key in config.content and
                    config.content[key] is not None and
                    not isinstance(config.content[key], type(value))):
                raise KubeconfigCorruptedError(
                    "{0} is wrong type:{1} "
                    "(Should be {2})".format(
                        key,
                        type(config.content[key]),
                        type(value)
                    )
                )

    def _validate_list_entry_types(self, config):
        """
        Raises KubeconfigCorruptedError if any lists in config contain objects
        which are not dictionaries

        :param config: The config to validate
        :type config: Kubeconfig
        """
        for key, value in self._validation_content.items():
            if (key in config.content and
                    type(config.content[key]) == list):
                for element in config.content[key]:
                    if not isinstance(element, OrderedDict):
                        raise KubeconfigCorruptedError(
                            "Entry in {0} not a dictionary.".format(key))


class KubeconfigLoader(object):
    def __init__(self, validator=None):
        if validator is None:
            validator = KubeconfigValidator()
        self._validator = validator

    def load_kubeconfig(self, path):
        """
        Loads the kubeconfig found at the given path.
        If no file is found at the given path,
        Generate a new kubeconfig to write back.
        If the kubeconfig is valid, loads the content from it.
        If the kubeconfig is invalid, throw the relevant exception.

        :param path: The path to load a kubeconfig from
        :type path: string

        :raises KubeconfigInaccessableError: if the kubeconfig can't be opened
        :raises KubeconfigCorruptedError: if the kubeconfig is invalid

        :return: The loaded kubeconfig
        :rtype: Kubeconfig
        """
        try:
            with open(path, "r") as stream:
                loaded_content = ordered_yaml_load(stream)
        except IOError as e:
            if e.errno == errno.ENOENT:
                loaded_content = None
            else:
                raise KubeconfigInaccessableError(
                    "Can't open kubeconfig for reading: {0}".format(e))
        except yaml.YAMLError as e:
            raise KubeconfigCorruptedError(
                "YamlError while loading kubeconfig: {0}".format(e))

        loaded_config = Kubeconfig(path, loaded_content)
        self._validator.validate_config(loaded_config)

        return loaded_config


class KubeconfigWriter(object):
    def write_kubeconfig(self, config):
        """
        Write config to disk.
        OK if the file doesn't exist.

        :param config: The kubeconfig to write
        :type config: Kubeconfig

        :raises KubeconfigInaccessableError: if the kubeconfig
        can't be opened for writing
        """
        directory = os.path.dirname(config.path)

        try:
            os.makedirs(directory)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise KubeconfigInaccessableError(
                        "Can't create directory for writing: {0}".format(e))
        try:
            with os.fdopen(
                    os.open(
                        config.path,
                        os.O_CREAT | os.O_RDWR | os.O_TRUNC,
                        0o600),
                    "w+") as stream:
                ordered_yaml_dump(config.content, stream)
        except (IOError, OSError) as e:
            raise KubeconfigInaccessableError(
                "Can't open kubeconfig for writing: {0}".format(e))


class KubeconfigAppender(object):
    def insert_entry(self, config, key, entry):
        """
        Insert entry into the array at content[key]
        Overwrite an existing entry if they share the same name

        :param config: The kubeconfig to insert an entry into
        :type config: Kubeconfig
        """
        if key not in config.content:
            config.content[key] = []
        array = config.content[key]
        if not isinstance(array, list):
            raise KubeconfigError("Tried to insert into {0},"
                                  "which is a {1} "
                                  "not a {2}".format(key,
                                                     type(array),
                                                     list))
        found = False
        for counter, existing_entry in enumerate(array):
            if "name" in existing_entry and\
               "name" in entry and\
               existing_entry["name"] == entry["name"]:
                array[counter] = entry
                found = True

        if not found:
            array.append(entry)

        config.content[key] = array
        return config

    def _make_context(self, cluster, user, alias=None):
        """ Generate a context to associate cluster and user with a given alias."""
        return OrderedDict([
            ("context", OrderedDict([
                ("cluster", cluster["name"]),
                ("user", user["name"])
            ])),
            ("name", alias or user["name"])
        ])

    def insert_cluster_user_pair(self, config, cluster, user, alias=None):
        """
        Insert the passed cluster entry and user entry,
        then make a context to associate them
        and set current-context to be the new context.
        Returns the new context

        :param config: the Kubeconfig to insert the pair into
        :type config: Kubeconfig

        :param cluster: the cluster entry
        :type cluster: OrderedDict

        :param user: the user entry
        :type user: OrderedDict

        :param alias: the alias for the context; defaults top user entry name
        :type context: str

        :return: The generated context
        :rtype: OrderedDict
        """
        context = self._make_context(cluster, user, alias=alias)
        self.insert_entry(config, "clusters", cluster)
        self.insert_entry(config, "users", user)
        self.insert_entry(config, "contexts", context)

        config.content["current-context"] = context["name"]

        return context