File: //usr/local/bin/dhwp/dhwp/core/wordpress.py
import os
import re
import sys
import tempfile
from cement import shell
from cement.utils.misc import minimal_logger
from dhwp.core.curl import fetch
LOG = minimal_logger(__name__)
class Wordpress:
def __init__(self, path, user=None):
self.path = path
self.user = user
def cli(self, cmd, **kwargs):
out, err, code = shell.cmd('which wp')
if code > 0:
LOG.fatal("cannot find wpcli, please install it first")
sys.exit(1)
args = [('--path=%s' % self.path), cmd]
for key, value in kwargs.items():
key = re.sub(r"__", '-', key)
if isinstance(value, bool):
if value:
args.append("--%s" % key)
else:
args.append("--%s=%s" % (key, value))
command = "wp --no-color " + " ".join(args)
if (self.user):
command = "cd " + self.path + "; sudo -u {user} {command}".format(user=self.user, command=command)
LOG.info("wpcli command : %s" % command)
out, err, code = shell.cmd(command)
out = out.decode("utf-8")
err = err.decode("utf-8")
LOG.info("wpcli output\n%s" % out)
if code > 0:
LOG.fatal("wpcli failed with error\n%s" % err)
sys.exit(1)
return out
def db_import(self, sql_file):
return self.cli('db import ' + sql_file)
def db_export(self, sql_file=None):
if not sql_file:
_, sql_file = tempfile.mkstemp()
self.cli('db export --set-gtid-purged=OFF ' + sql_file)
return sql_file
def site_url(self):
# TODO look at wp config file first then here??
site_url = self.cli('option get siteurl --skip-plugins --skip-themes').rstrip('\n')
return site_url
def blogname(self):
blogname = self.cli('option get blogname --skip-plugins --skip-themes').rstrip('\n')
return blogname
def db_prefix(self):
db_prefix = self.cli('config get table_prefix').rstrip('\n')
return db_prefix
def is_installed(self):
command = "wp --quiet --path=%s core is-installed > /dev/null 2>&1" % self.path
code = shell.cmd(command, capture=False)
if code > 0:
return False
else:
return True
def wp_import(self):
pass
def rename(self, old_url, new_url):
# self.app.log.info("Updating WP DB backup for staging")
# with fileinput.FileInput(stage_db_file, inplace=True, backup='.bak') as file:
# for line in file:
# print(line.replace(live_prefix, stage_prefix), end='')
self.cli("search-replace %s %s --skip-plugins --skip-themes --all-tables-with-prefix" % (old_url, new_url))
def download_source(self, location):
dest_file = self.path + '/wordpress.zip'
fetch(location, dest_file)
extract_cmd = """unzip -q {wpfile} -d {path} &&
rm {wpfile} """.format(
wpfile=dest_file,
path=self.path
)
out, err, code = shell.cmd(extract_cmd)
if code > 0:
self.app.log.fatal("error extracting %s : %s" %
(dest_file, err))
sys.exit(1)
move_cmd = """mv {path}/wordpress/* {path} &&
rm -rf {path}/wordpress""".format(
path=self.path
)
out, err, code = shell.cmd(move_cmd)
if code > 0:
self.app.log.fatal("error moving %s : %s" % (dest_file, err))
sys.exit(1)
def download_package(self, location):
dest_file = None
unlink_file_after = False
if os.path.exists(location):
dest_file = location
else:
_, dest_file = tempfile.mkstemp()
fetch(location, dest_file)
unlink_file_after = True
extract_cmd = """cd {path} && tar zxvf {dest_file}""".format(
path=self.path,
dest_file=dest_file
)
out, err, code = shell.cmd(extract_cmd)
if code > 0:
self.app.log.fatal(
"error extracting package %s : %s" % (dest_file, err))
sys.exit(1)
if unlink_file_after:
os.unlink(dest_file)
def download_extra(self, location, rel_dir):
dest_file = None
unlink_file_after = False
if os.path.exists(location):
dest_file = location
else:
_, dest_file = tempfile.mkstemp()
LOG.info("Downloading extra from {location}".format(location=location))
fetch(location, dest_file)
unlink_file_after = True
extract_cmd = """cd {path}{subpath} && unzip -o {dest_file}""".format(
path=self.path,
subpath=rel_dir,
dest_file=dest_file
)
LOG.info("Installing extra to {path}{subpath}".format(
path=self.path,
subpath=rel_dir))
out, err, code = shell.cmd(extract_cmd)
if code > 0:
LOG.fatal(
"error extracting extra %s to %s : %s" % (dest_file, rel_dir, err))
sys.exit(1)
if unlink_file_after:
os.unlink(dest_file)