File: //usr/lib/python3/dist-packages/sos/upload/targets/__init__.py
# Copyright 2024 Red Hat, Inc. Jose Castillo <jcastillo@redhat.com>
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.
import os
import re
import logging
from getpass import getpass
from sos import _sos as _
from sos.utilities import is_executable, TIMEOUT_DEFAULT
try:
    import requests
    REQUESTS_LOADED = True
except ImportError:
    REQUESTS_LOADED = False
try:
    import boto3
    BOTO3_LOADED = True
except ImportError:
    BOTO3_LOADED = False
class UploadTarget():
    """
    This class is designed to upload files to a distribution
    defined location. These files can be either sos reports,
    sos collections, or other kind of files like: vmcores,
    application cores, logs, etc.
    """
    desc = """
            Upload a file (can be an sos report, a must-gather, or others) to
             a distribution defined remote location
            """
    # _ prefixed class attrs are used for storing any vendor-defined defaults
    # the non-prefixed attrs are used by the upload methods, and will be set
    # to the cmdline/config file values, if provided. If not provided, then
    # those attrs will be set to the _ prefixed values as a fallback.
    # TL;DR Use _upload_* for target default values, use upload_* when wanting
    # to actual use the value in a method/override
    upload_target_name = "Generic Upload"
    upload_target_id = "generic"
    _upload_file = None
    _upload_url = None
    _upload_directory = '/'
    _upload_user = None
    _upload_password = None
    _upload_method = None
    _upload_s3_endpoint = 'https://s3.amazonaws.com'
    _upload_s3_bucket = None
    _upload_s3_access_key = None
    _upload_s3_secret_key = None
    _upload_s3_region = None
    _upload_s3_object_prefix = ''
    upload_url = None
    upload_user = None
    upload_password = None
    upload_s3_endpoint = None
    upload_s3_bucket = None
    upload_s3_access_key = None
    upload_s3_secret_key = None
    upload_s3_region = None
    upload_s3_object_prefix = None
    upload_target = None
    arg_defaults = {
        'upload_file': '',
        'case_id': '',
        'low_priority': False,
        'profiles': [],
        'upload_url': None,
        'upload_directory': None,
        'upload_user': None,
        'upload_pass': None,
        'upload_method': 'auto',
        'upload_no_ssl_verify': False,
        'upload_protocol': 'auto',
        'upload_s3_endpoint': None,
        'upload_s3_region': None,
        'upload_s3_bucket': None,
        'upload_s3_access_key': None,
        'upload_s3_secret_key': None,
        'upload_s3_object_prefix': None,
        'upload_target': None,
    }
    def __init__(self, parser=None, args=None, cmdline=None):
        self.ui_log = logging.getLogger('sos_ui')
        self.parser = parser
        self.cmdline = cmdline
        self.args = args
    def check_distribution(self):
        """This should be overridden by upload targets
        This is called by sos upload on each target type that exists, and
        is meant to return True when the upload target matches a criteria
        that indicates that is the local upload target that should be used.
        Only the first upload target to determine a match is selected"""
        return False
    def get_target_id(self):
        return self.upload_target_id
    @classmethod
    def name(cls):
        """Returns the upload target's name as a string."""
        if cls.upload_target_name:
            return cls.upload_target_name
        return cls.__name__.lower()
    def get_commons(self):
        return {
            'cmdlineopts': self.hook_commons['cmdlineopts'],
            'policy': self.hook_commons['policy'],
            'case_id': self.hook_commons['cmdlineopts'].case_id,
            'upload_directory': self.hook_commons['cmdlineopts']
            .upload_directory
        }
    def set_commons(self, commons):
        """Set common host data for the Upload targets
            to reference
        """
        self.commons = commons
    def pre_work(self, hook_commons):
        self.hook_commons = hook_commons
        self.commons = self.get_commons()
        cmdline_opts = self.commons['cmdlineopts']
        policy = self.commons['policy']
        if cmdline_opts.low_priority:
            policy._configure_low_priority()
        # Set the cmdline settings to the class attrs that are referenced later
        # The target default '_' prefixed versions of these are untouched to
        # allow fallback
        self.upload_url = cmdline_opts.upload_url
        self.upload_user = cmdline_opts.upload_user
        self.upload_directory = cmdline_opts.upload_directory
        self.upload_password = cmdline_opts.upload_pass
        self.upload_archive_name = ''
        self.upload_s3_endpoint = cmdline_opts.upload_s3_endpoint
        self.upload_s3_region = cmdline_opts.upload_s3_region
        self.upload_s3_access_key = cmdline_opts.upload_s3_access_key
        self.upload_s3_bucket = cmdline_opts.upload_s3_bucket
        self.upload_s3_object_prefix = cmdline_opts.upload_s3_object_prefix
        self.upload_s3_secret_key = cmdline_opts.upload_s3_secret_key
        # set or query for upload credentials; this needs to be done after
        # setting case id, as below methods might rely on detection of it
        if not cmdline_opts.batch and not \
                cmdline_opts.quiet:
            # Targets will need to handle the prompts for user information
            if self.get_upload_url() and \
                    not cmdline_opts.upload_protocol == 's3':
                self.prompt_for_upload_user()
                self.prompt_for_upload_password()
            elif cmdline_opts.upload_protocol == 's3':
                self.prompt_for_upload_s3_bucket()
                self.prompt_for_upload_s3_endpoint()
                self.prompt_for_upload_s3_access_key()
                self.prompt_for_upload_s3_secret_key()
            self.ui_log.info('')
    def prompt_for_upload_s3_access_key(self):
        """Should be overridden by targets to determine if an access key needs
        to be provided for upload or not
        """
        if not self.get_upload_s3_access_key():
            msg = (
                "Please provide the upload access key for bucket"
                f" {self.get_upload_s3_bucket()} via endpoint"
                f" {self.get_upload_s3_endpoint()}: "
            )
            self.upload_s3_access_key = input(_(msg))
    def prompt_for_upload_s3_secret_key(self):
        """Should be overridden by targets to determine if a secret key needs
        to be provided for upload or not
        """
        if not self.get_upload_s3_secret_key():
            msg = (
                "Please provide the upload secret key for bucket"
                f" {self.get_upload_s3_bucket()} via endpoint"
                f" {self.get_upload_s3_endpoint()}: "
            )
            self.upload_s3_secret_key = getpass(msg)
    def prompt_for_upload_s3_bucket(self):
        """Should be overridden by targets to determine if a bucket needs to
        be provided for upload or not
        """
        if not self.upload_s3_bucket:
            if self.upload_url and self.upload_url.startswith('s3://'):
                self.upload_s3_bucket = self.upload_url[5:]
            else:
                user_input = input(_("Please provide the upload bucket: "))
                self.upload_s3_bucket = user_input.strip('/')
        return self.upload_s3_bucket
    def prompt_for_upload_s3_endpoint(self):
        """Should be overridden by targets to determine if an endpoint needs
        to be provided for upload or not
        """
        default_endpoint = self._upload_s3_endpoint
        if not self.upload_s3_endpoint:
            msg = (
                "Please provide the upload endpoint for bucket"
                f" {self.get_upload_s3_bucket()}"
                f" (default: {default_endpoint}): "
            )
            user_input = input(_(msg))
            self.upload_s3_endpoint = user_input or default_endpoint
        return self.upload_s3_endpoint
    def prompt_for_upload_user(self):
        """Should be overridden by targets to determine if a user needs to
        be provided or not
        """
        if not self.get_upload_user():
            msg = f"Please provide upload user for {self.get_upload_url()}: "
            self.upload_user = input(_(msg))
    def prompt_for_upload_password(self):
        """Should be overridden by targets to determine if a password needs to
        be provided for upload or not
        """
        if not self.get_upload_password() and (self.get_upload_user() !=
                                               self._upload_user):
            msg = ("Please provide the upload password for "
                   f"{self.get_upload_user()}: ")
            self.upload_password = getpass(msg)
    def upload_archive(self, archive):
        """
        Entry point for sos attempts to upload the generated archive to a
        target or user specified location.
        Currently there is support for HTTPS, SFTP, and FTP. HTTPS uploads are
        preferred for target-defined defaults.
        Targets that need to override uploading methods should override the
        respective upload_https(), upload_sftp(), and/or upload_ftp() methods
        and should NOT override this method.
        :param archive: The archive filepath to use for upload
        :type archive: ``str``
        In order to enable this for a target, that target needs to implement
        the following:
        Required Class Attrs
        :_upload_url:     The default location to use. Note these MUST include
                          protocol header
        :_upload_user:    Default username, if any else None
        :_upload_password: Default password, if any else None
        The following Class Attrs may optionally be overidden by the Target
        :_upload_directory:     Default FTP server directory, if any
        The following methods may be overridden by ``Target`` as needed
        `prompt_for_upload_user()`
            Determines if sos should prompt for a username or not.
        `get_upload_user()`
            Determines if the default or a different username should be used
        `get_upload_https_auth()`
            Format authentication data for HTTPS uploads
        `get_upload_url_string()`
            Print a more human-friendly string than vendor URLs
        """
        self.upload_archive_name = archive
        if not self.upload_url:
            self.upload_url = self.get_upload_url()
        if not self.upload_url:
            raise Exception("No upload destination provided by upload target"
                            " or by --upload-url")
        upload_func = self._determine_upload_type()
        self.ui_log.info(
            _(f"Attempting upload to {self.get_upload_url_string()}")
        )
        return upload_func()
    def _determine_upload_type(self):
        """Based on the url provided, determine what type of upload to attempt.
        Note that this requires users to provide a FQDN address, such as
        https://myvendor.com/api or ftp://myvendor.com instead of
        myvendor.com/api or myvendor.com
        """
        prots = {
            'ftp': self.upload_ftp,
            'sftp': self.upload_sftp,
            'https': self.upload_https,
            's3': self.upload_s3
        }
        if self.commons['cmdlineopts'].upload_protocol in prots:
            return prots[self.commons['cmdlineopts'].upload_protocol]
        if '://' not in self.upload_url:
            raise Exception("Must provide protocol in upload URL")
        prot, _ = self.upload_url.split('://')
        if prot not in prots:
            raise Exception(f"Unsupported or unrecognized protocol: {prot}")
        return prots[prot]
    def get_upload_https_auth(self, user=None, password=None):
        """Formats the user/password credentials using basic auth
        :param user: The username for upload
        :type user: ``str``
        :param password: Password for `user` to use for upload
        :type password: ``str``
        :returns: The user/password auth suitable for use in requests calls
        :rtype: ``requests.auth.HTTPBasicAuth()``
        """
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        return requests.auth.HTTPBasicAuth(user, password)
    def get_upload_s3_access_key(self):
        """Helper function to determine if we should use the target default
        upload access key or one provided by the user
        :returns: The access_key to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADS3ACCESSKEY', None) or
                self.upload_s3_access_key or
                self._upload_s3_access_key)
    def get_upload_s3_endpoint(self):
        """Helper function to determine if we should use the target default
        upload endpoint or one provided by the user
        :returns: The S3 Endpoint to use for upload
        :rtype: ``str``
        """
        if not self.upload_s3_endpoint:
            self.prompt_for_upload_s3_endpoint()
        return self.upload_s3_endpoint
    def get_upload_s3_region(self):
        """Helper function to determine if we should use the target default
        upload region or one provided by the user
        :returns: The S3 region to use for upload
        :rtype: ``str``
        """
        return self.upload_s3_region or self._upload_s3_region
    def get_upload_s3_bucket(self):
        """Helper function to determine if we should use the target default
        upload bucket or one provided by the user
        :returns: The S3 bucket to use for upload
        :rtype: ``str``
        """
        if self.upload_url and self.upload_url.startswith('s3://'):
            bucket_and_prefix = self.upload_url[5:].split('/', 1)
            self.upload_s3_bucket = bucket_and_prefix[0]
            if len(bucket_and_prefix) > 1:
                self.upload_s3_object_prefix = bucket_and_prefix[1]
        if not self.upload_s3_bucket:
            self.prompt_for_upload_s3_bucket()
        return self.upload_s3_bucket or self._upload_s3_bucket
    def get_upload_s3_object_prefix(self):
        """Helper function to determine if we should use the target default
        upload object prefix or one provided by the user
        :returns: The S3 object prefix to use for upload
        :rtype: ``str``
        """
        return self.upload_s3_object_prefix or self._upload_s3_object_prefix
    def get_upload_s3_secret_key(self):
        """Helper function to determine if we should use the target default
        upload secret key or one provided by the user
        :returns: The S3 secret key to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADS3SECRETKEY', None) or
                self.upload_s3_secret_key or
                self._upload_s3_secret_key)
    def get_upload_url(self):
        """Helper function to determine if we should use the target default
        upload url or one provided by the user
        :returns: The URL to use for upload
        :rtype: ``str``
        """
        if not self.upload_url and (
            self.upload_s3_bucket and
            self.upload_s3_access_key and
            self.upload_s3_secret_key
        ):
            bucket = self.get_upload_s3_bucket()
            prefix = self.get_upload_s3_object_prefix()
            self._upload_url = f"s3://{bucket}/{prefix}"
        return self.upload_url or self._upload_url
    def _get_obfuscated_upload_url(self, url):
        pattern = r"([^:]+://[^:]+:)([^@]+)(@.+)"
        obfuscated_url = re.sub(pattern, r'\1********\3', url)
        return obfuscated_url
    def get_upload_url_string(self):
        """Used by upload targets to potentially change the string used to
        report upload location from the URL to a more human-friendly string
        """
        return self._get_obfuscated_upload_url(self.get_upload_url())
    def get_upload_user(self):
        """Helper function to determine if we should use the target default
        upload user or one provided by the user
        :returns: The username to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADUSER', None) or
                self.upload_user or
                self._upload_user)
    def get_upload_password(self):
        """Helper function to determine if we should use the target default
        upload password or one provided by the user
        A user provided password, either via option or the 'SOSUPLOADPASSWORD'
        environment variable will have precendent over any target value
        :returns: The password to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADPASSWORD', None) or
                self.upload_password or
                self._upload_password)
    def upload_sftp(self, user=None, password=None):
        """Attempts to upload the archive to an SFTP location.
        Due to the lack of well maintained, secure, and generally widespread
        python libraries for SFTP, sos will shell-out to the system's local ssh
        installation in order to handle these uploads.
        Do not override this method with one that uses python-paramiko, as the
        upstream sos team will reject any PR that includes that dependency.
        """
        # if we somehow don't have sftp available locally, fail early
        if not is_executable('sftp'):
            raise Exception('SFTP is not locally supported')
        # soft dependency on python3-pexpect, which we need to use to control
        # sftp login since as of this writing we don't have a viable solution
        # via ssh python bindings commonly available among downstreams
        try:
            import pexpect
        except ImportError as err:
            raise Exception('SFTP upload requires python3-pexpect, which is '
                            'not currently installed') from err
        sftp_connected = False
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        # need to strip the protocol prefix here
        sftp_url = self.get_upload_url().replace('sftp://', '')
        sftp_cmd = f"sftp -oStrictHostKeyChecking=no {user}@{sftp_url}"
        ret = pexpect.spawn(sftp_cmd, encoding='utf-8')
        sftp_expects = [
            'sftp>',
            'password:',
            'Connection refused',
            pexpect.TIMEOUT,
            pexpect.EOF
        ]
        idx = ret.expect(sftp_expects, timeout=15)
        if idx == 0:
            sftp_connected = True
        elif idx == 1:
            ret.sendline(password)
            pass_expects = [
                'sftp>',
                'Permission denied',
                pexpect.TIMEOUT,
                pexpect.EOF
            ]
            sftp_connected = ret.expect(pass_expects, timeout=10) == 0
            if not sftp_connected:
                ret.close()
                raise Exception("Incorrect username or password for "
                                f"{self.get_upload_url_string()}")
        elif idx == 2:
            raise Exception("Connection refused by "
                            f"{self.get_upload_url_string()}. Incorrect port?")
        elif idx == 3:
            raise Exception("Timeout hit trying to connect to "
                            f"{self.get_upload_url_string()}")
        elif idx == 4:
            raise Exception("Unexpected error trying to connect to sftp: "
                            f"{ret.before}")
        if not sftp_connected:
            ret.close()
            raise Exception("Unable to connect via SFTP to "
                            f"{self.get_upload_url_string()}")
        put_cmd = (f'put {self.upload_archive_name} '
                   f'{self._get_sftp_upload_name()}')
        ret.sendline(put_cmd)
        put_expects = [
            '100%',
            pexpect.TIMEOUT,
            pexpect.EOF,
            'No such file or directory'
        ]
        put_success = ret.expect(put_expects, timeout=180)
        if put_success == 0:
            ret.sendline('bye')
            return True
        if put_success == 1:
            raise Exception("Timeout expired while uploading")
        if put_success == 2:
            raise Exception(f"Unknown error during upload: {ret.before}")
        if put_success == 3:
            raise Exception("Unable to write archive to destination")
        raise Exception(f"Unexpected response from server: {ret.before}")
    def _get_sftp_upload_name(self):
        """If a specific file name pattern is required by the SFTP server,
        override this method in the relevant Upload Target. Otherwise the
        archive's name on disk will be used
        :returns:       Filename as it will exist on the SFTP server
        :rtype:         ``str``
        """
        fname = self.upload_archive_name.split('/')[-1]
        if self.upload_directory:
            fname = os.path.join(self.upload_directory, fname)
        return fname
    def _upload_https_put(self, archive, verify=True):
        """If upload_https() needs to use requests.put(), use this method.
        Targets should override this method instead of the base upload_https()
        :param archive:     The open archive file object
        """
        return requests.put(self.get_upload_url(), data=archive,
                            auth=self.get_upload_https_auth(),
                            verify=verify, timeout=TIMEOUT_DEFAULT)
    def _get_upload_headers(self):
        """Define any needed headers to be passed with the POST request here
        """
        return {}
    def _upload_https_post(self, archive, verify=True):
        """If upload_https() needs to use requests.post(), use this method.
        Targets should override this method instead of the base upload_https()
        :param archive:     The open archive file object
        """
        files = {
            'file': (archive.name.split('/')[-1], archive,
                     self._get_upload_headers())
        }
        return requests.post(self.get_upload_url(), files=files,
                             auth=self.get_upload_https_auth(),
                             verify=verify, timeout=TIMEOUT_DEFAULT)
    def upload_https(self):
        """Attempts to upload the archive to an HTTPS location.
        :returns: ``True`` if upload is successful
        :rtype: ``bool``
        :raises: ``Exception`` if upload was unsuccessful
        """
        if not REQUESTS_LOADED:
            raise Exception("Unable to upload due to missing python requests "
                            "library")
        with open(self.upload_archive_name, 'rb') as arc:
            if self.commons['cmdlineopts'].upload_method == 'auto':
                method = self._upload_method
            else:
                method = self.commons['cmdlineopts'].upload_method
            verify = self.commons['cmdlineopts'].upload_no_ssl_verify is False
            if method == 'put':
                r = self._upload_https_put(arc, verify)
            else:
                r = self._upload_https_post(arc, verify)
            if r.status_code not in (200, 201):
                if r.status_code == 401:
                    raise Exception(
                        "Authentication failed: invalid user credentials"
                    )
                raise Exception(f"POST request returned {r.status_code}: "
                                f"{r.reason}")
            return True
    def upload_ftp(self, url=None, directory=None, user=None, password=None):
        """Attempts to upload the archive to either the target defined or user
        provided FTP location.
        :param url: The URL to upload to
        :type url: ``str``
        :param directory: The directory on the FTP server to write to
        :type directory: ``str`` or ``None``
        :param user: The user to authenticate with
        :type user: ``str``
        :param password: The password to use for `user`
        :type password: ``str``
        :returns: ``True`` if upload is successful
        :rtype: ``bool``
        :raises: ``Exception`` if upload in unsuccessful
        """
        import ftplib
        import socket
        if not url:
            url = self.get_upload_url()
        if url is None:
            raise Exception("no FTP server specified by upload target, "
                            "use --upload-url to specify a location")
        url = url.replace('ftp://', '')
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        if not directory:
            directory = self.upload_directory or self._upload_directory
        try:
            session = ftplib.FTP(url, user, password, timeout=15)
            if not session:
                raise Exception("connection failed, did you set a user and "
                                "password?")
            session.cwd(directory)
        except socket.timeout as err:
            raise Exception(f"timeout hit while connecting to {url}") from err
        except socket.gaierror as err:
            raise Exception(f"unable to connect to {url}") from err
        except ftplib.error_perm as err:
            errno = str(err).split()[0]
            if errno == '503':
                raise Exception(f"could not login as '{user}'") from err
            if errno == '530':
                raise Exception(f"invalid password for user '{user}'") from err
            if errno == '550':
                raise Exception("could not set upload directory to "
                                f"{directory}") from err
            raise Exception(f"error trying to establish session: {str(err)}") \
                from err
        with open(self.upload_archive_name, 'rb') as _arcfile:
            session.storbinary(
                f"STOR {self.upload_archive_name.split('/')[-1]}", _arcfile
                )
        session.quit()
        return True
    def upload_s3(self, endpoint=None, region=None, bucket=None, prefix=None,
                  access_key=None, secret_key=None):
        """Attempts to upload the archive to an S3 bucket.
        :param endpoint: The S3 endpoint to upload to
        :type endpoint: str
        :param region: The S3 region to upload to
        :type region: str
        :param bucket: The name of the S3 bucket to upload to
        :type bucket: str
        :param prefix: The prefix for the S3 object/key
        :type prefix: str
        :param access_key: The access key for the S3 bucket
        :type access_key: str
        :param secret_key: The secret key for the S3 bucket
        :type secret_key: str
        :returns: True if upload is successful
        :rtype: bool
        :raises: Exception if upload is unsuccessful
        """
        if not BOTO3_LOADED:
            raise Exception("Unable to upload due to missing python boto3 "
                            "library")
        if not endpoint:
            endpoint = self.get_upload_s3_endpoint()
        if not region:
            region = self.get_upload_s3_region()
        if not bucket:
            bucket = self.get_upload_s3_bucket().strip('/')
        if not prefix:
            prefix = self.get_upload_s3_object_prefix()
            if prefix != '' and prefix.startswith('/'):
                prefix = prefix[1:]
            if prefix != '' and not prefix.endswith('/'):
                prefix = f'{prefix}/' if prefix else ''
        if not access_key:
            access_key = self.get_upload_s3_access_key()
        if not secret_key:
            secret_key = self.get_upload_s3_secret_key()
        s3_client = boto3.client('s3', endpoint_url=endpoint,
                                 region_name=region,
                                 aws_access_key_id=access_key,
                                 aws_secret_access_key=secret_key)
        try:
            key = prefix + self.upload_archive_name.split('/')[-1]
            s3_client.upload_file(self.upload_archive_name,
                                  bucket, key)
            return True
        except Exception as e:
            raise Exception(f"Failed to upload to S3: {str(e)}") from e
# vim: set et ts=4 sw=4 :