HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/local/bin/package-installs-squash
#!/usr/bin/env python3

# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import argparse
import collections
import functools
import json
import logging
import os
import re
import sys
import yaml

from diskimage_builder import logging_config

logger = logging.getLogger(__name__)


def get_element_installtype(element_name):
    default = os.environ.get("DIB_DEFAULT_INSTALLTYPE", "source")
    return os.environ.get(
        "DIB_INSTALLTYPE_%s" % element_name.replace('-', '_'),
        default)


def _is_arch_in_list(strlist):
    """Checks if os.environ['ARCH'] is in comma separated strlist"""
    strlist = strlist.split(',')
    map(str.strip, strlist)
    return os.environ['ARCH'] in strlist


def _valid_for_arch(pkg_name, arch, not_arch):
    """Filter out incorrect ARCH versions"""
    if arch is None and not_arch is None:
        # nothing specified; always OK
        return True
    if arch and not_arch:
        print("package-installs configuration error: arch and not_arch "
              "given for package [%s]" % pkg_name)
        sys.exit(1)
    # if we have an arch list, our current arch must be in it
    # to install.
    if arch:
        return _is_arch_in_list(arch)
    # if we don't have an explicit arch list, we should
    # install unless we are in the not-arch list.
    return not _is_arch_in_list(not_arch)


def _when(statements):
    '''evaulate a when: statement

    Evaluate statements of the form

     when: ENVIRONMENT_VARIABLE[!]=value

    Returns True if the package should be installed, False otherwise

    If the ENVIRONMENT_VARIABLE is unset, raises an error

    '''
    # No statement means install
    if statements is None:
        return True
    if not isinstance(statements, (list, tuple)):
        statements = [statements]
    result = []

    for s in statements:
        # FOO =  BAR
        # var op val
        match = re.match(
            r"(?P<var>[\w]+)(\s*)(?P<op>=|!=)(\s*)(?P<val>.*)", s)
        if not match:
            print("Malformed when line: <%s>" % s)
            sys.exit(1)
        match = match.groupdict()
        var = match['var']
        op = match['op']
        val = match['val']

        if var not in os.environ:
            raise RuntimeError("The variable <%s> is not set" % var)

        logger.debug("... when eval %s%s%s against <%s>" %
                     (var, op, val, os.environ[var]))

        if op == '=':
            if val == os.environ[var]:
                result.append(True)
                continue
        elif op == '!=':
            if val != os.environ[var]:
                result.append(True)
                continue
        else:
            print("Malformed when op: %s" % op)
            sys.exit(1)

        result.append(False)

    return all(result)


def collect_data(data, objs, element_name):
    for pkg_name, params in objs.items():
        if not params:
            params = [{}]
        if not isinstance(params, (list, tuple)):
            params = [params]

        for param in params:
            logger.debug("Considering %s/%s param:%s" %
                         (element_name, pkg_name, param))
            phase = param.get('phase', 'install.d')
            installs = ["install"]
            if 'uninstall' in param:
                installs = ["uninstall"]
            if 'build-only' in param:
                installs = ["install", "uninstall"]

            # Filter out incorrect installtypes
            installtype = param.get('installtype', None)
            elem_installtype = get_element_installtype(element_name)
            valid_installtype = (installtype is None or
                                 installtype == elem_installtype)
            if not valid_installtype:
                logger.debug("... skipping due to installtype")
                continue

            valid_arch = _valid_for_arch(pkg_name, param.get('arch', None),
                                         param.get('not-arch', None))
            if not valid_arch:
                logger.debug("... skipping due to arch match")
                continue

            dib_py_version = str(param.get('dib_python_version', ''))
            dib_py_version_env = os.environ.get('DIB_PYTHON_VERSION', '')
            valid_dib_python_version = (dib_py_version == '' or
                                        dib_py_version == dib_py_version_env)
            if not valid_dib_python_version:
                logger.debug("... skipping due to python version")
                continue

            # True means install, false skip
            if _when(param.get('when', None)) is False:
                logger.debug("... skipped due to when: failures")
                continue

            for install in installs:
                logger.debug("... installing for '%s'" % install)
                data[phase][install].append((pkg_name, element_name))

    return data


def main():
    parser = argparse.ArgumentParser(
        description="Produce a single packages-installs file from all of"
                    " the available package-installs files")
    parser.add_argument('--elements', required=True,
                        help="Which elements to squash")
    parser.add_argument('--path', required=True,
                        help="Elements path to search for elements")
    parser.add_argument('outfile', help="Location of the output file")
    args = parser.parse_args()

    logging_config.setup()

    # Replicate the logic of finding the first element, because we can't
    # operate on the post-copied hooks dir, since we lose element context
    element_dirs = list()
    for element_name in args.elements.split():
        for elements_dir in args.path.split(':'):
            potential_path = os.path.join(elements_dir, element_name)
            if os.path.exists(potential_path):
                element_dirs.append((elements_dir, element_name))

    logger.debug("element_dirs -> %s" % element_dirs)

    # Collect the merge of all of the existing install files in the elements
    # that are the first on the ELEMENT_PATH
    final_dict = collections.defaultdict(
        functools.partial(collections.defaultdict, list))
    for (elements_dir, element_name) in element_dirs:
        for file_type in ('json', 'yaml'):
            target_file = os.path.join(
                elements_dir, element_name, "package-installs.%s" % file_type)
            if not os.path.exists(target_file):
                continue
            logger.info("Squashing install file: %s" % target_file)
            try:
                objs = json.load(open(target_file))
            except ValueError:
                objs = yaml.safe_load(open(target_file))

            final_dict = collect_data(final_dict, objs, element_name)

    logger.debug("final_dict -> %s" % final_dict)

    # Write the resulting file
    with open(args.outfile, 'w') as outfile:
        json.dump(
            final_dict, outfile,
            indent=True, separators=(',', ': '), sort_keys=False)


if __name__ == '__main__':
    main()