HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/local/bin/dhwp/dhwp/core/wordpress.py
import os
import re
import sys
import tempfile
from cement import shell
from cement.utils.misc import minimal_logger
from dhwp.core.curl import fetch
LOG = minimal_logger(__name__)


class Wordpress:
    def __init__(self, path, user=None):
        self.path = path
        self.user = user

    def cli(self, cmd, **kwargs):
        out, err, code = shell.cmd('which wp')

        if code > 0:
            LOG.fatal("cannot find wpcli, please install it first")
            sys.exit(1)

        args = [('--path=%s' % self.path), cmd]

        for key, value in kwargs.items():
            key = re.sub(r"__", '-', key)
            if isinstance(value, bool):
                if value:
                    args.append("--%s" % key)
            else:
                args.append("--%s=%s" % (key, value))

        command = "wp --no-color " + " ".join(args)

        if (self.user):
            command = "cd " + self.path + "; sudo -u {user} {command}".format(user=self.user, command=command)

        LOG.info("wpcli command : %s" % command)

        out, err, code = shell.cmd(command)
        out = out.decode("utf-8")
        err = err.decode("utf-8")

        LOG.info("wpcli output\n%s" % out)

        if code > 0:
            LOG.fatal("wpcli failed with error\n%s" % err)
            sys.exit(1)

        return out

    def db_import(self, sql_file):
        return self.cli('db import ' + sql_file)

    def db_export(self, sql_file=None):
        if not sql_file:
            _, sql_file = tempfile.mkstemp()

        self.cli('db export --set-gtid-purged=OFF ' + sql_file)
        return sql_file

    def site_url(self):
        # TODO look at wp config file first then here??
        site_url = self.cli('option get siteurl --skip-plugins --skip-themes').rstrip('\n')
        return site_url

    def blogname(self):
        blogname = self.cli('option get blogname --skip-plugins --skip-themes').rstrip('\n')
        return blogname

    def db_prefix(self):
        db_prefix = self.cli('config get table_prefix').rstrip('\n')
        return db_prefix

    def is_installed(self):
        command = "wp --quiet --path=%s core is-installed > /dev/null 2>&1" % self.path
        code = shell.cmd(command, capture=False)
        if code > 0:
            return False
        else:
            return True

    def wp_import(self):
        pass

    def rename(self, old_url, new_url):
        #        self.app.log.info("Updating WP DB backup for staging")
        #        with fileinput.FileInput(stage_db_file, inplace=True, backup='.bak') as file:
        #            for line in file:
        #                print(line.replace(live_prefix, stage_prefix), end='')
        self.cli("search-replace %s %s  --skip-plugins --skip-themes --all-tables-with-prefix" % (old_url, new_url))

    def download_source(self, location):
        dest_file = self.path + '/wordpress.zip'
        fetch(location, dest_file)

        extract_cmd = """unzip -q {wpfile} -d {path} &&
                         rm {wpfile} """.format(
            wpfile=dest_file,
            path=self.path
        )

        out, err, code = shell.cmd(extract_cmd)
        if code > 0:
            self.app.log.fatal("error extracting %s : %s" %
                               (dest_file, err))
            sys.exit(1)

        move_cmd = """mv {path}/wordpress/* {path} &&
                      rm -rf {path}/wordpress""".format(
            path=self.path
        )

        out, err, code = shell.cmd(move_cmd)
        if code > 0:
            self.app.log.fatal("error moving %s : %s" % (dest_file, err))
            sys.exit(1)

    def download_package(self, location):
        dest_file = None
        unlink_file_after = False
        if os.path.exists(location):
            dest_file = location
        else:
            _, dest_file = tempfile.mkstemp()
            fetch(location, dest_file)
            unlink_file_after = True

        extract_cmd = """cd {path} && tar zxvf {dest_file}""".format(
            path=self.path,
            dest_file=dest_file
        )
        out, err, code = shell.cmd(extract_cmd)
        if code > 0:
            self.app.log.fatal(
                "error extracting package %s : %s" % (dest_file, err))
            sys.exit(1)

        if unlink_file_after:
            os.unlink(dest_file)

    def download_extra(self, location, rel_dir):
        dest_file = None
        unlink_file_after = False
        if os.path.exists(location):
            dest_file = location
        else:
            _, dest_file = tempfile.mkstemp()
            LOG.info("Downloading extra from {location}".format(location=location))
            fetch(location, dest_file)
            unlink_file_after = True

        extract_cmd = """cd {path}{subpath} && unzip -o {dest_file}""".format(
            path=self.path,
            subpath=rel_dir,
            dest_file=dest_file
        )
        LOG.info("Installing extra to {path}{subpath}".format(
            path=self.path,
            subpath=rel_dir))
        out, err, code = shell.cmd(extract_cmd)
        if code > 0:
            LOG.fatal(
                "error extracting extra %s to %s : %s" % (dest_file, rel_dir, err))
            sys.exit(1)

        if unlink_file_after:
            os.unlink(dest_file)