HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/lib/python3/dist-packages/uaclient/cli/enable.py
import json
import logging
from typing import Any, Dict, List, NamedTuple, Optional

from uaclient import (
    api,
    config,
    contract,
    entitlements,
    event_logger,
    exceptions,
    lock,
    messages,
    status,
    util,
)
from uaclient.api.u.pro.services.dependencies.v1 import (
    ServiceWithDependencies,
    _dependencies,
)
from uaclient.api.u.pro.services.enable.v1 import (
    EnableOptions,
    EnableResult,
    _enable,
)
from uaclient.api.u.pro.status.enabled_services.v1 import (
    EnabledService,
    _enabled_services,
)
from uaclient.api.u.pro.status.is_attached.v1 import _is_attached
from uaclient.cli import cli_util
from uaclient.cli.commands import ProArgument, ProArgumentGroup, ProCommand
from uaclient.cli.parser import HelpCategory

LOG = logging.getLogger(util.replace_top_level_logger_name(__name__))

_EnableOneServiceResult = NamedTuple(
    "_EnableOneServiceResult",
    [
        ("success", bool),
        ("needs_reboot", bool),
        ("error", Optional[Dict[str, Any]]),
    ],
)


def _enable_landscape(
    cfg: config.UAConfig,
    access_only: bool,
    extra_args,
    progress_object: Optional[api.AbstractProgress] = None,
):
    """
    Landscape gets special treatment because it currently not supported by our
    enable API. This function is a temporary workaround until we have a proper
    API for enabling landscape, which will happen after Landscape is fully
    integrated with the contracts backend.
    """
    progress = api.ProgressWrapper(progress_object)
    landscape = entitlements.LandscapeEntitlement(
        cfg,
        called_name="landscape",
        access_only=access_only,
        extra_args=extra_args,
    )
    success = False
    fail_reason = None

    try:
        with lock.RetryLock(
            lock_holder="cli.enable._enable_landscape",
        ):
            success, fail_reason = landscape.enable(progress=progress)
    except Exception as e:
        lock.clear_lock_file_if_present()
        raise e

    if not success:
        if fail_reason is not None and fail_reason.message is not None:
            reason = fail_reason.message
        else:
            reason = messages.GENERIC_UNKNOWN_ISSUE
        raise exceptions.EntitlementNotEnabledError(
            service="landscape", reason=reason
        )
    return EnableResult(
        enabled=["landscape"], disabled=[], reboot_required=False, messages=[]
    )


def prompt_for_dependency_handling(
    cfg: config.UAConfig,
    service: str,
    all_dependencies: List[ServiceWithDependencies],
    enabled_services: List[EnabledService],
    called_name: str,
    variant: str,
    service_title: str,
):
    incompatible_services = []
    required_services = []
    enabled_service_names = [s.name for s in enabled_services]

    dependencies = next(
        (s for s in all_dependencies if s.name == service), None
    )
    if dependencies is not None:
        incompatible_services = [
            s.name
            for s in dependencies.incompatible_with
            if s.name in enabled_service_names
        ]
        required_services = [
            s.name
            for s in dependencies.depends_on
            if s.name not in enabled_service_names
        ]

    for incompatible_service in incompatible_services:
        cfg_block_disable_on_enable = util.is_config_value_true(
            config=cfg.cfg,
            path_to_value="features.block_disable_on_enable",
        )
        incompatible_service_title = entitlements.get_title(
            cfg, incompatible_service
        )
        user_msg = messages.INCOMPATIBLE_SERVICE.format(
            service_being_enabled=service_title,
            incompatible_service=incompatible_service_title,
        )
        if cfg_block_disable_on_enable or not util.prompt_for_confirmation(
            msg=user_msg
        ):
            raise exceptions.IncompatibleServiceStopsEnable(
                service_being_enabled=service_title,
                incompatible_service=incompatible_service_title,
            )

    for required_service in required_services:
        required_service_title = entitlements.get_title(cfg, required_service)
        user_msg = messages.REQUIRED_SERVICE.format(
            service_being_enabled=service_title,
            required_service=required_service_title,
        )
        if not util.prompt_for_confirmation(msg=user_msg):
            raise exceptions.RequiredServiceStopsEnable(
                service_being_enabled=service_title,
                required_service=required_service_title,
            )

    variant_enabled = next(
        (
            s
            for s in enabled_services
            if s.name == service
            and s.variant_enabled
            and s.variant_name != variant
        ),
        None,
    )
    if variant_enabled is not None and variant is not None:
        to_be_enabled_title = entitlements.get_title(cfg, service, variant)
        enabled_variant_title = entitlements.get_title(
            cfg, service, variant_enabled.variant_name
        )
        cfg_block_disable_on_enable = util.is_config_value_true(
            config=cfg.cfg,
            path_to_value="features.block_disable_on_enable",
        )
        user_msg = messages.INCOMPATIBLE_SERVICE.format(
            service_being_enabled=to_be_enabled_title,
            incompatible_service=enabled_variant_title,
        )
        if cfg_block_disable_on_enable or not util.prompt_for_confirmation(
            msg=user_msg
        ):
            raise exceptions.IncompatibleServiceStopsEnable(
                service_being_enabled=to_be_enabled_title,
                incompatible_service=enabled_variant_title,
            )


def _print_json_output(
    json_output: bool,
    json_response: Dict[str, Any],
    processed_services: List[str],
    failed_services: List[str],
    errors: List[Dict[str, Any]],
    warnings: List[Dict[str, Any]],
    success: bool,
):
    if json_output:
        processed_services.sort()
        failed_services.sort()

        json_response["result"] = "success" if success else "failure"
        json_response["processed_services"] = processed_services
        json_response["failed_services"] = failed_services
        json_response["errors"] = errors
        json_response["warnings"] = warnings

        print(
            json.dumps(
                json_response,
                cls=util.DatetimeAwareJSONEncoder,
                sort_keys=True,
            )
        )


def _enable_one_service(
    cfg: config.UAConfig,
    ent_name: str,
    variant: str,
    access_only: bool,
    assume_yes: bool,
    json_output: bool,
    extra_args: Optional[List[str]],
    enabled_services: List[EnabledService],
    all_dependencies: List[ServiceWithDependencies],
) -> _EnableOneServiceResult:
    interactive_only_print = cli_util.create_interactive_only_print_function(
        json_output
    )
    ent = entitlements.entitlement_factory(
        cfg,
        ent_name,
        variant=variant,
        access_only=access_only,
        extra_args=extra_args,
    )
    real_name = ent.name
    ent_title = ent.title

    already_enabled = next(
        (
            s
            for s in enabled_services
            if s.name == real_name
            and (
                not variant
                or (s.variant_enabled and s.variant_name == variant)
            )
        ),
        None,
    )
    if already_enabled is not None:
        msg = messages.ALREADY_ENABLED.format(title=ent_title)
        interactive_only_print(msg.msg)
        interactive_only_print(messages.ENABLE_FAILED.format(title=ent_title))
        return _EnableOneServiceResult(
            success=False,
            needs_reboot=False,
            error={
                "type": "service",
                "service": ent_name,
                "message": msg.msg,
                "message_code": msg.name,
            },
        )

    if not assume_yes:
        # this never happens for json output because we assert earlier that
        # assume_yes must be True for json output
        try:
            prompt_for_dependency_handling(
                cfg,
                real_name,
                all_dependencies,
                enabled_services,
                called_name=ent_name,
                variant=variant,
                service_title=ent_title,
            )
        except exceptions.UbuntuProError as e:
            LOG.exception(e)
            interactive_only_print(e.msg)
            interactive_only_print(
                messages.ENABLE_FAILED.format(title=ent_title)
            )
            return _EnableOneServiceResult(
                success=False,
                needs_reboot=False,
                error=None,
            )

    try:
        if json_output:
            progress = None
        else:
            progress = cli_util.CLIEnableDisableProgress(assume_yes=assume_yes)

        if real_name == "landscape":
            enable_result = _enable_landscape(
                cfg,
                access_only,
                extra_args=extra_args,
                progress_object=progress,
            )
        else:
            enable_result = _enable(
                EnableOptions(
                    service=ent_name,
                    variant=variant,
                    access_only=access_only,
                ),
                cfg,
                progress_object=progress,
            )

        status.status(cfg=cfg)  # Update the status cache

        if access_only:
            interactive_only_print(
                messages.ACCESS_ENABLED_TMPL.format(title=ent_title)
            )
        else:
            interactive_only_print(
                messages.ENABLED_TMPL.format(title=ent_title)
            )

        needs_reboot = enable_result.reboot_required
        if needs_reboot:
            interactive_only_print(
                messages.ENABLE_REBOOT_REQUIRED_TMPL.format(
                    operation="install"
                )
            )

        for message in enable_result.messages:
            interactive_only_print(message)

        return _EnableOneServiceResult(
            success=True,
            needs_reboot=needs_reboot,
            error=None,
        )

    except exceptions.EntitlementNotEnabledError as e:
        reason = e.additional_info["reason"]
        err_code = reason["code"]
        err_msg = reason["title"]
        err_info = reason["additional_info"]
        interactive_only_print(err_msg)
        interactive_only_print(messages.ENABLE_FAILED.format(title=ent_title))
        return _EnableOneServiceResult(
            success=False,
            needs_reboot=False,
            error={
                "type": "service",
                "service": ent_name,
                "message": err_msg,
                "message_code": err_code,
                "additional_info": err_info,
            },
        )
    except exceptions.UbuntuProError as e:
        interactive_only_print(e.msg)
        interactive_only_print(messages.ENABLE_FAILED.format(title=ent_title))
        return _EnableOneServiceResult(
            success=False,
            needs_reboot=False,
            error={
                "type": "service",
                "service": ent_name,
                "message": e.msg,
                "message_code": e.msg_code,
                "additional_info": e.additional_info,
            },
        )


@cli_util.verify_json_format_args
@cli_util.assert_root
@cli_util.assert_attached(cli_util._raise_enable_disable_unattached_error)
def action_enable(args, *, cfg, **kwargs) -> int:
    """Perform the enable action on a named entitlement.

    @return: 0 on success, 1 otherwise
    """
    processed_services = []  # type: List[str]
    failed_services = []  # type: List[str]
    errors = []  # type: List[Dict[str, Any]]
    warnings = []

    json_response = {
        "_schema_version": event_logger.JSON_SCHEMA_VERSION,
        "needs_reboot": False,
    }

    json_output = args.format == "json"
    # HACK NOTICE: interactive_only_print here will be a no-op "null_print"
    # function defined above if args.format == "json". We use this function
    # throughout enable for things that should get printed in the normal
    # interactive output so that they don't get printed for the json output.
    interactive_only_print = cli_util.create_interactive_only_print_function(
        json_output
    )

    variant = getattr(args, "variant", "")
    access_only = args.access_only
    assume_yes = args.assume_yes

    if variant and access_only:
        raise exceptions.InvalidOptionCombination(
            option1="--access-only", option2="--variant"
        )

    interactive_only_print(messages.REFRESH_CONTRACT_ENABLE)
    try:
        contract.refresh(cfg)
    except (exceptions.ConnectivityError, exceptions.UbuntuProError):
        # Inability to refresh is not a critical issue during enable
        LOG.warning("Failed to refresh contract", exc_info=True)
        warnings.append(
            {
                "type": "system",
                "message": messages.E_REFRESH_CONTRACT_FAILURE.msg,
                "message_code": messages.E_REFRESH_CONTRACT_FAILURE.name,
            }
        )

    if not _is_attached(cfg).is_attached_and_contract_valid:
        expired_err = exceptions.ContractExpiredError()
        interactive_only_print(expired_err.msg)
        errors.append(
            {
                "type": "system",
                "message": expired_err.msg,
                "message_code": expired_err.msg_code,
            }
        )
        _print_json_output(
            json_output,
            json_response,
            processed_services,
            failed_services,
            errors,
            warnings,
            success=False,
        )
        return 1

    names = getattr(args, "service", [])
    (
        entitlements_found,
        entitlements_not_found,
    ) = entitlements.get_valid_entitlement_names(names, cfg)
    enabled_services = _enabled_services(cfg).enabled_services
    all_dependencies = _dependencies(cfg).services

    ret = True
    for ent_name in entitlements.order_entitlements_for_enabling(
        cfg, entitlements_found
    ):
        result = _enable_one_service(
            cfg,
            ent_name,
            variant,
            access_only,
            assume_yes,
            json_output,
            kwargs.get("extra_args"),
            enabled_services,
            all_dependencies,
        )
        if result.success:
            processed_services.append(ent_name)
            if result.needs_reboot:
                json_response["needs_reboot"] = True
        else:
            ret = False
            failed_services.append(ent_name)
            if result.error is not None:
                errors.append(result.error)

    if entitlements_not_found:
        ret = False
        failed_services += entitlements_not_found
        err = entitlements.create_enable_entitlements_not_found_error(
            entitlements_not_found, cfg=cfg
        )
        interactive_only_print(err.msg)
        errors.append(
            {
                "type": "system",
                "service": None,
                "message": err.msg,
                "message_code": err.msg_code,
                "additional_info": err.additional_info,
            }
        )

    contract_client = contract.UAContractClient(cfg)
    contract_client.update_activity_token()

    _print_json_output(
        json_output,
        json_response,
        processed_services,
        failed_services,
        errors,
        warnings,
        success=ret,
    )

    return 0 if ret else 1


enable_command = ProCommand(
    "enable",
    help=messages.CLI_ROOT_ENABLE,
    description=messages.CLI_ENABLE_DESC,
    action=action_enable,
    help_category=HelpCategory.QUICKSTART,
    help_position=3,
    argument_groups=[
        ProArgumentGroup(
            arguments=[
                ProArgument(
                    "service",
                    help=messages.CLI_ENABLE_SERVICE.format(
                        options=", ".join(
                            entitlements.valid_services(cfg=config.UAConfig())
                        )
                    ),
                    action="store",
                    nargs="+",
                ),
                ProArgument(
                    "--assume-yes",
                    help=messages.CLI_ASSUME_YES.format(command="enable"),
                    action="store_true",
                ),
                ProArgument(
                    "--access-only",
                    help=messages.CLI_ENABLE_ACCESS_ONLY,
                    action="store_true",
                ),
                ProArgument(
                    "--beta",
                    help=messages.CLI_ENABLE_BETA,
                    action="store_true",
                ),
                ProArgument(
                    "--format",
                    help=messages.CLI_FORMAT_DESC.format(default="cli"),
                    action="store",
                    choices=["cli", "json"],
                    default="cli",
                ),
                ProArgument(
                    "--variant",
                    help=messages.CLI_ENABLE_VARIANT,
                    action="store",
                ),
            ]
        )
    ],
)