diff --git a/package/package.sh b/package/package.sh index 59fdc5d0b912f32ef04cd09628a0967bcb89b8b8..6ac751843618cc28794a10059ba054f9ed1f1adc 100644 --- a/package/package.sh +++ b/package/package.sh @@ -398,6 +398,7 @@ function target_file_copy() sed -i '/gs_lcctl/d' binfile sed -i '/gs_wsr/d' binfile sed -i '/gs_gucZenith/d' binfile + sed -i '/gs_expansion/d' binfile bin_script=$(cat binfile) rm binfile script_file cd $BUILD_DIR diff --git a/src/manager/om/script/gs_expansion b/src/manager/om/script/gs_expansion new file mode 100644 index 0000000000000000000000000000000000000000..ee2d8648b20ffa0a060c8ecc94dfb0349b8bc3c6 --- /dev/null +++ b/src/manager/om/script/gs_expansion @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +# -*- coding:utf-8 -*- +############################################################################# +# Copyright (c) 2020 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms +# and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, +# WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ---------------------------------------------------------------------------- +# Description : gs_expansion is a utility to expansion standby node databases +############################################################################# + +import os +import pwd +import sys +import threading +import uuid +import subprocess +import weakref + +sys.path.append(sys.path[0]) +from gspylib.common.DbClusterInfo import dbClusterInfo, \ + readOneClusterConfigItem, initParserXMLFile, dbNodeInfo, checkPathVaild +from gspylib.common.GaussLog import GaussLog +from gspylib.common.Common import DefaultValue +from gspylib.common.ErrorCode import ErrorCode +from gspylib.common.ParallelBaseOM import ParallelBaseOM +from gspylib.common.ParameterParsecheck import Parameter +from impl.preinstall.OLAP.PreinstallImplOLAP import PreinstallImplOLAP +from gspylib.threads.SshTool import SshTool +from impl.expansion.ExpansionImpl import ExpansionImpl + +ENV_LIST = ["MPPDB_ENV_SEPARATE_PATH", "GPHOME", "PATH", + "LD_LIBRARY_PATH", "PYTHONPATH", "GAUSS_WARNING_TYPE", + "GAUSSHOME", "PATH", "LD_LIBRARY_PATH", + "S3_CLIENT_CRT_FILE", "GAUSS_VERSION", "PGHOST", + "GS_CLUSTER_NAME", "GAUSSLOG", "GAUSS_ENV", "umask"] + +class Expansion(ParallelBaseOM): + """ + """ + + def __init__(self): + """ + """ + ParallelBaseOM.__init__(self) + # new added standby node backip list + self.newHostList = [] + self.clusterInfoDict = {} + self.backIpNameMap = {} + self.packagepath = os.path.realpath( + os.path.join(os.path.realpath(__file__), "../../")) + + self.standbyLocalMode = False + + def usage(self): + """ +gs_expansion is a utility to expansion standby node for a cluster. + +Usage: + gs_expansion -? | --help + gs_expansion -V | --version + gs_expansion -U USER -G GROUP -X XMLFILE -h nodeList [-L] +General options: + -U Cluster user. + -G Group of the cluster user. + -X Path of the XML configuration file. + -h New standby node node backip list. + Separate multiple nodes with commas (,). + such as '-h 192.168.0.1,192.168.0.2' + -L The standby database installed with + local mode. + -?, --help Show help information for this + utility, and exit the command line mode. + -V, --version Show version information. + """ + print(self.usage.__doc__) + + def parseCommandLine(self): + """ + parse parameter from command line + """ + ParaObj = Parameter() + ParaDict = ParaObj.ParameterCommandLine("expansion") + + # parameter -h or -? + if (ParaDict.__contains__("helpFlag")): + self.usage() + sys.exit(0) + # Resolves command line arguments + # parameter -U + if (ParaDict.__contains__("user")): + self.user = ParaDict.get("user") + DefaultValue.checkPathVaild(self.user) + # parameter -G + if (ParaDict.__contains__("group")): + self.group = ParaDict.get("group") + # parameter -X + if (ParaDict.__contains__("confFile")): + self.xmlFile = ParaDict.get("confFile") + # parameter -L + if (ParaDict.__contains__("localMode")): + self.localMode = ParaDict.get("localMode") + self.standbyLocalMode = ParaDict.get("localMode") + # parameter -l + if (ParaDict.__contains__("logFile")): + self.logFile = ParaDict.get("logFile") + #parameter -h + if (ParaDict.__contains__("nodename")): + self.newHostList = ParaDict.get("nodename") + + + def checkParameters(self): + """ + function: Check parameter from command line + input: NA + output: NA + """ + + # check user | group | xmlfile | node + if len(self.user) == 0: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-U") + if len(self.group) == 0: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-G") + if len(self.xmlFile) == 0: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-X") + if len(self.newHostList) == 0: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-h") + + clusterInfo = ExpansipnClusterInfo() + hostNameIpDict = clusterInfo.initFromXml(self.xmlFile) + clusterDict = clusterInfo.getClusterDirectorys() + backIpList = clusterInfo.getClusterBackIps() + nodeNameList = clusterInfo.getClusterNodeNames() + + self.nodeNameList = nodeNameList + self.backIpNameMap = {} + for backip in backIpList: + self.backIpNameMap[backip] = clusterInfo.getNodeNameByBackIp(backip) + + # check parameter node must in xml config file + for nodeid in self.newHostList: + if nodeid not in backIpList: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35702"] % \ + nodeid) + + # get corepath and toolpath from xml file + corePath = clusterInfo.readClustercorePath(self.xmlFile) + toolPath = clusterInfo.getToolPath(self.xmlFile) + # parse xml file and cache node info + clusterInfoDict = {} + clusterInfoDict["appPath"] = clusterDict["appPath"][0] + clusterInfoDict["logPath"] = clusterDict["logPath"][0] + clusterInfoDict["corePath"] = corePath + clusterInfoDict["toolPath"] = toolPath + for nodeName in nodeNameList: + hostInfo = hostNameIpDict[nodeName] + ipList = hostInfo[0] + portList = hostInfo[1] + backIp = "" + sshIp = "" + if len(ipList) == 1: + backIp = sshIp = ipList[0] + elif len(ipList) == 2: + backIp = ipList[0] + sshIp = ipList[1] + port = portList[0] + cluster = clusterDict[nodeName] + dataNode = cluster[2] + clusterInfoDict[nodeName] = { + "backIp": backIp, + "sshIp": sshIp, + "port": port, + "localport": int(port) + 1, + "localservice": int(port) + 4, + "heartBeatPort": int(port) + 3, + "dataNode": dataNode, + "instanceType": -1 + } + + nodeIdList = clusterInfo.getClusterNodeIds() + for id in nodeIdList: + insType = clusterInfo.getdataNodeInstanceType(id) + hostName = clusterInfo.getHostNameByNodeId(id) + clusterInfoDict[hostName]["instanceType"] = insType + self.clusterInfoDict = clusterInfoDict + + + def initLogs(self): + """ + init log file + """ + # if no log file + if (self.logFile == ""): + self.logFile = DefaultValue.getOMLogPath( + DefaultValue.EXPANSION_LOG_FILE, self.user, "", + self.xmlFile) + # if not absolute path + if (not os.path.isabs(self.logFile)): + GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50213"] % "log") + + self.initLogger("gs_expansion") + self.logger.ignoreErr = True + +class ExpansipnClusterInfo(dbClusterInfo): + + def __init__(self): + dbClusterInfo.__init__(self) + + def getToolPath(self, xmlFile): + """ + function : Read tool path from default xml file + input : String + output : String + """ + self.setDefaultXmlFile(xmlFile) + # read gaussdb tool path from xml file + (retStatus, retValue) = readOneClusterConfigItem( + initParserXMLFile(xmlFile), "gaussdbToolPath", "cluster") + if retStatus != 0: + raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] + % "gaussdbToolPath" + " Error: \n%s" % retValue) + toolPath = os.path.normpath(retValue) + checkPathVaild(toolPath) + return toolPath + +if __name__ == "__main__": + """ + """ + expansion = Expansion() + expansion.parseCommandLine() + expansion.checkParameters() + expansion.initLogs() + expImpl = ExpansionImpl(expansion) + expImpl.run() + diff --git a/src/manager/om/script/gspylib/common/Common.py b/src/manager/om/script/gspylib/common/Common.py index f56c6243c281d76c618bb135f207cafa51dceb33..7a60f822aed55d93acffa64bc8c5b81c359cbfe3 100644 --- a/src/manager/om/script/gspylib/common/Common.py +++ b/src/manager/om/script/gspylib/common/Common.py @@ -309,6 +309,7 @@ class DefaultValue(): LCCTL_LOG_FILE = "gs_lcctl.log" RESIZE_LOG_FILE = "gs_resize.log" HOTPATCH_LOG_FILE = "gs_hotpatch.log" + EXPANSION_LOG_FILE = "gs_expansion.log" # hotpatch action HOTPATCH_ACTION_LIST = ["load", "unload", "active", "deactive", "info", "list"] diff --git a/src/manager/om/script/gspylib/common/ErrorCode.py b/src/manager/om/script/gspylib/common/ErrorCode.py index e6725c18e4451ea2cbab15ff4aa6c1c6ec947eec..fcf19ba94cf86dc47b1870695b73caeea7f057d0 100644 --- a/src/manager/om/script/gspylib/common/ErrorCode.py +++ b/src/manager/om/script/gspylib/common/ErrorCode.py @@ -1092,6 +1092,24 @@ class ErrorCode(): 'GAUSS_53612': "[GAUSS-53612]: Can not find any catalog in database %s" } + ########################################################################## + # gs_expansion + # [GAUSS-537] : gs_expansion failed + ########################################################################## + GAUSS_357 = { + "GAUSS_35700": "[GAUSS-35700] Expansion standby node failed.", + "GAUSS_35701": "[GAUSS-35701] Empty parameter. The %s parameter is" + "missing in the command.", + "GAUSS_35702": "[GAUSS-35702] Unrecognized parameter, standby host " + "backip %s is not in the " + "XML configuration file", + "GAUSS_35703": "[GAUSS-35703] Check standby database Failed. The " + "database on node is abnormal. \n" + "node [%s], user [%s], dataNode [%s]. \n" + "You can use command \"gs_ctl query -D %s\" for more " + "detail." + } + class OmError(BaseException): """ diff --git a/src/manager/om/script/gspylib/common/ParameterParsecheck.py b/src/manager/om/script/gspylib/common/ParameterParsecheck.py index 717890d84fc888e19477100ed8cd12b8e02e7819..89397c06b9e3c604aa08589ef0179b4641964634 100644 --- a/src/manager/om/script/gspylib/common/ParameterParsecheck.py +++ b/src/manager/om/script/gspylib/common/ParameterParsecheck.py @@ -93,6 +93,8 @@ gs_ssh = ["-?", "--help", "-V", "--version", "-c:"] gs_checkos = ["-?", "--help", "-V", "--version", "-h:", "-f:", "-o:", "-i:", "--detail", "-l:", "-X:"] +gs_expansion = ["-?", "--help", "-V", "--version", "-U:", "-G:", "-L", + "-X:", "-h:", "--sep-env-file="] # gs_om child branch gs_om_start = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:", @@ -153,7 +155,8 @@ ParameterDict = {"preinstall": gs_preinstall, "postuninstall": gs_postuninstall, "view": gs_om_view, "query": gs_om_query, - "refreshconf": gs_om_refreshconf + "refreshconf": gs_om_refreshconf, + "expansion": gs_expansion } # List of scripts with the -t parameter diff --git a/src/manager/om/script/impl/expansion/ExpansionImpl.py b/src/manager/om/script/impl/expansion/ExpansionImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..95e21dd771767817242b54637f330612e1befd4c --- /dev/null +++ b/src/manager/om/script/impl/expansion/ExpansionImpl.py @@ -0,0 +1,829 @@ +# -*- coding:utf-8 -*- +############################################################################# +# Copyright (c) 2020 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms +# and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, +# WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ---------------------------------------------------------------------------- +# Description : ExpansionImpl.py +############################################################################# + +import subprocess +import sys +import re +import os +import getpass +import pwd +import datetime +from random import sample +import time +from multiprocessing import Process, Value + +sys.path.append(sys.path[0] + "/../../../../") +from gspylib.common.DbClusterInfo import dbClusterInfo, queryCmd +from gspylib.threads.SshTool import SshTool +from gspylib.common.DbClusterStatus import DbClusterStatus +from gspylib.common.ErrorCode import ErrorCode +from gspylib.common.Common import DefaultValue +from gspylib.common.GaussLog import GaussLog + +sys.path.append(sys.path[0] + "/../../../lib/") +DefaultValue.doConfigForParamiko() +import paramiko + + +#mode +MODE_PRIMARY = "primary" +MODE_STANDBY = "standby" +MODE_NORMAL = "normal" + +#db state +STAT_NORMAL = "normal" + +# master +MASTER_INSTANCE = 0 +# standby +STANDBY_INSTANCE = 1 + +# statu failed +STATUS_FAIL = "Failure" + +class ExpansionImpl(): + """ + class for expansion standby node. + step: + 1. preinstall database on new standby node + 2. install as single-node database + 3. establish primary-standby relationship of all node + """ + + def __init__(self, expansion): + """ + """ + self.context = expansion + + self.user = self.context.user + self.group = self.context.group + + self.logger = self.context.logger + + self.envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH") + + currentTime = str(datetime.datetime.now()).replace(" ", "_").replace( + ".", "_") + + self.commonGsCtl = GsCtlCommon(expansion) + self.tempFileDir = "/tmp/gs_expansion_%s" % (currentTime) + self.logger.debug("tmp expansion dir is %s ." % self.tempFileDir) + + + def sendSoftToHosts(self): + """ + create software dir and send it on each nodes + """ + self.logger.debug("Start to send software to each standby nodes.\n") + hostNames = self.context.newHostList + hostList = hostNames + + sshTool = SshTool(hostNames) + + srcFile = self.context.packagepath + targetDir = os.path.realpath( + os.path.join(srcFile, "../")) + + ## mkdir package dir and send package to remote nodes. + sshTool.executeCommand("mkdir -p %s" % srcFile , "", DefaultValue.SUCCESS, + hostList) + sshTool.scpFiles(srcFile, targetDir, hostList) + + ## change mode of package dir to set privileges for users + tPathList = os.path.split(targetDir) + path2ChangeMode = targetDir + if len(tPathList) > 2: + path2ChangeMode = os.path.join(tPathList[0],tPathList[1]) + changeModCmd = "chmod -R a+x {srcFile}".format(user=self.user, + group=self.group,srcFile=path2ChangeMode) + sshTool.executeCommand(changeModCmd, "", DefaultValue.SUCCESS, + hostList) + self.logger.debug("End to send software to each standby nodes.\n") + + def generateAndSendXmlFile(self): + """ + """ + self.logger.debug("Start to generateAndSend XML file.\n") + + tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir + cmd = "mkdir -p %s; touch %s; cat /dev/null > %s" % \ + (self.tempFileDir, tempXmlFile, tempXmlFile) + (status, output) = subprocess.getstatusoutput(cmd) + + cmd = "chown -R %s:%s %s" % (self.user, self.group, self.tempFileDir) + (status, output) = subprocess.getstatusoutput(cmd) + + newHosts = self.context.newHostList + for host in newHosts: + # create single deploy xml file for each standby node + xmlContent = self.__generateXml(host) + fo = open("%s" % tempXmlFile, "w") + fo.write( xmlContent ) + fo.close() + # send single deploy xml file to each standby node + sshTool = SshTool(host) + retmap, output = sshTool.getSshStatusOutput("mkdir -p %s" % + self.tempFileDir , [host], self.envFile) + retmap, output = sshTool.getSshStatusOutput("chown %s:%s %s" % + (self.user, self.group, self.tempFileDir), [host], self.envFile) + sshTool.scpFiles("%s" % tempXmlFile, "%s" % + tempXmlFile, [host], self.envFile) + + self.logger.debug("End to generateAndSend XML file.\n") + + def __generateXml(self, backIp): + """ + """ + nodeName = self.context.backIpNameMap[backIp] + nodeInfo = self.context.clusterInfoDict[nodeName] + + backIp = nodeInfo["backIp"] + sshIp = nodeInfo["sshIp"] + port = nodeInfo["port"] + dataNode = nodeInfo["dataNode"] + + appPath = self.context.clusterInfoDict["appPath"] + logPath = self.context.clusterInfoDict["logPath"] + corePath = self.context.clusterInfoDict["corePath"] + toolPath = self.context.clusterInfoDict["toolPath"] + + xmlConfig = """\ + + + + + + + + + + + + + + + + + + + + + + + + + + + """.format(nodeName=nodeName,backIp=backIp,appPath=appPath, + logPath=logPath,toolPath=toolPath,corePath=corePath, + sshIp=sshIp,port=port,dataNode=dataNode) + return xmlConfig + + def changeUser(self): + user = self.user + try: + pw_record = pwd.getpwnam(user) + except Exception: + GaussLog.exitWithError(ErrorCode.GAUSS_503["GAUSS_50300"] % user) + + user_name = pw_record.pw_name + user_uid = pw_record.pw_uid + user_gid = pw_record.pw_gid + env = os.environ.copy() + os.setgid(user_gid) + os.setuid(user_uid) + + def initSshConnect(self, host, user='root'): + + try: + getPwdStr = "Please enter the password of user [%s] on node [%s]: " \ + % (user, host) + passwd = getpass.getpass(getPwdStr) + self.sshClient = paramiko.SSHClient() + self.sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.sshClient.connect(host, 22, user, passwd) + except paramiko.ssh_exception.AuthenticationException as e : + self.logger.log("Authentication failed.") + self.initSshConnect(host, user) + + def installDatabaseOnHosts(self): + """ + install database on each standby node + """ + hostList = self.context.newHostList + envfile = DefaultValue.getEnv(DefaultValue.MPPRC_FILE_ENV) + tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir + installCmd = "source {envfile} ; gs_install -X {xmlfile} \ + 2>&1".format(envfile=envfile,xmlfile=tempXmlFile) + + statusArr = [] + + for newHost in hostList: + + self.logger.log("\ninstalling database on node %s:" % newHost) + self.logger.debug(installCmd) + + hostName = self.context.backIpNameMap[newHost] + sshIp = self.context.clusterInfoDict[hostName]["sshIp"] + self.initSshConnect(sshIp, self.user) + + stdin, stdout, stderr = self.sshClient.exec_command(installCmd, + get_pty=True) + channel = stdout.channel + echannel = stderr.channel + + while not channel.exit_status_ready(): + try: + recvOut = channel.recv(1024) + outDecode = recvOut.decode("utf-8"); + outStr = outDecode.strip() + if(len(outStr) == 0): + continue + if(outDecode.endswith("\r\n")): + self.logger.log(outStr) + else: + value = "" + if re.match(r".*yes.*no.*", outStr): + value = input(outStr) + while True: + # check the input + if ( + value.upper() != "YES" + and value.upper() != "NO" + and value.upper() != "Y" + and value.upper() != "N"): + value = input("Please type 'yes' or 'no': ") + continue + break + else: + value = getpass.getpass(outStr) + stdin.channel.send("%s\r\n" %value) + stdin.flush() + stdout.flush() + except Exception as e: + sys.exit(1) + pass + if channel.exit_status_ready() and \ + not channel.recv_stderr_ready() and \ + not channel.recv_ready(): + channel.close() + break + + stdout.close() + stderr.close() + status = channel.recv_exit_status() + statusArr.append(status) + + isBothSuccess = True + for status in statusArr: + if status != 0: + isBothSuccess = False + break + if isBothSuccess: + self.logger.log("\nSuccessfully install database on node %s" % + hostList) + else: + sys.exit(1) + + def preInstallOnHosts(self): + """ + execute preinstall step + """ + self.logger.debug("Start to preinstall database step.\n") + newBackIps = self.context.newHostList + newHostNames = [] + for host in newBackIps: + newHostNames.append(self.context.backIpNameMap[host]) + envfile = self.envFile + tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir + + preinstallCmd = "{softpath}/script/gs_preinstall -U {user} -G {group} \ + -X {xmlfile} --sep-env-file={envfile} \ + --non-interactive 2>&1\ + ".format(softpath=self.context.packagepath,user=self.user, + group=self.group,xmlfile=tempXmlFile,envfile=envfile) + + sshTool = SshTool(newHostNames) + + status, output = sshTool.getSshStatusOutput(preinstallCmd , [], envfile) + statusValues = status.values() + if STATUS_FAIL in statusValues: + GaussLog.exitWithError(output) + + self.logger.debug("End to preinstall database step.\n") + + + def buildStandbyRelation(self): + """ + func: after install single database on standby nodes. + build the relation with primary and standby nodes. + step: + 1. restart primary node with Primary Mode + (only used to Single-Node instance) + 2. set guc config to primary node + 3. restart standby node with Standby Mode + 4. set guc config to standby node + 5. generate cluster static file and send to each node. + """ + self.queryPrimaryClusterDetail() + self.setPrimaryGUCConfig() + self.setStandbyGUCConfig() + self.buildStandbyHosts() + self.generateClusterStaticFile() + + def queryPrimaryClusterDetail(self): + """ + get current cluster type. + single-node or primary-standby + """ + self.logger.debug("Query primary database instance mode.\n") + self.isSingleNodeInstance = True + primaryHost = self.getPrimaryHostName() + result = self.commonGsCtl.queryOmCluster(primaryHost, self.envFile) + instance = re.findall(r"node\s+node_ip\s+instance\s+state", result) + if len(instance) > 1: + self.isSingleNodeInstance = False + self.logger.debug("Original instance mode is %s" % + self.isSingleNodeInstance) + + def setPrimaryGUCConfig(self): + """ + """ + self.logger.debug("Start to set primary node GUC config.\n") + primaryHost = self.getPrimaryHostName() + dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] + + self.setGUCOnClusterHosts([primaryHost]) + self.addStandbyIpInPrimaryConf() + insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost, + dataNode,self.envFile) + if insType != MODE_PRIMARY: + self.commonGsCtl.stopInstance(primaryHost, dataNode, self.envFile) + self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode, + MODE_PRIMARY,self.envFile) + + # start db to primary state for three times max + start_retry_num = 1 + while start_retry_num <= 3: + insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost, + dataNode, self.envFile) + if insType == MODE_PRIMARY: + break + self.logger.debug("Start database as Primary mode failed, \ +retry for %s times" % start_retry_num) + self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode, + MODE_PRIMARY, self.envFile) + start_retry_num = start_retry_num + 1 + + def setStandbyGUCConfig(self): + """ + """ + self.logger.debug("Start to set standby node GUC config.\n") + standbyHosts = self.context.newHostList + standbyHostNames = [] + for host in standbyHosts: + hostName = self.context.backIpNameMap[host] + standbyHostNames.append(hostName) + self.setGUCOnClusterHosts(standbyHostNames) + + def addStandbyIpInPrimaryConf(self): + """ + add standby hosts ip in primary node pg_hba.conf + """ + + standbyHosts = self.context.newHostList + primaryHost = self.getPrimaryHostName() + command = '' + for host in standbyHosts: + hostName = self.context.backIpNameMap[host] + dataNode = self.context.clusterInfoDict[hostName]["dataNode"] + command += "gs_guc set -D %s -h 'host all all %s/32 \ + trust';" % (dataNode, host) + self.logger.debug(command) + sshTool = SshTool([primaryHost]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [primaryHost], self.envFile) + self.logger.debug(outputCollect) + + def reloadPrimaryConf(self): + """ + """ + primaryHost = self.getPrimaryHostName() + dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] + command = "gs_ctl reload -D %s " % dataNode + sshTool = SshTool([primaryHost]) + self.logger.debug(command) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [primaryHost], self.envFile) + self.logger.debug(outputCollect) + + def getPrimaryHostName(self): + """ + """ + primaryHost = "" + for nodeName in self.context.nodeNameList: + if self.context.clusterInfoDict[nodeName]["instanceType"] \ + == MASTER_INSTANCE: + primaryHost = nodeName + break + return primaryHost + + + def buildStandbyHosts(self): + """ + stop the new standby host`s database and build it as standby mode + """ + self.logger.debug("start to build standby node...\n") + + standbyHosts = self.context.newHostList + + for host in standbyHosts: + hostName = self.context.backIpNameMap[host] + dataNode = self.context.clusterInfoDict[hostName]["dataNode"] + self.commonGsCtl.stopInstance(hostName, dataNode, self.envFile) + self.commonGsCtl.startInstanceWithMode(hostName, dataNode, + MODE_STANDBY, self.envFile) + + # start standby as standby mode for three times max. + start_retry_num = 1 + while start_retry_num <= 3: + insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, + dataNode, self.envFile) + if insType != MODE_STANDBY: + self.logger.debug("Start databasse as Standby mode failed, \ +retry for %s times" % start_retry_num) + self.setGUCOnClusterHosts([]) + self.addStandbyIpInPrimaryConf() + self.reloadPrimaryConf() + self.commonGsCtl.startInstanceWithMode(hostName, dataNode, + MODE_STANDBY, self.envFile) + start_retry_num = start_retry_num + 1 + else: + break + + # build standby node + self.addStandbyIpInPrimaryConf() + self.reloadPrimaryConf() + self.commonGsCtl.buildInstance(hostName, dataNode, MODE_STANDBY, + self.envFile) + + # if build failed first time. retry for three times. + start_retry_num = 1 + while start_retry_num <= 3: + insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, + dataNode, self.envFile) + if dbStat != STAT_NORMAL: + self.logger.debug("Build standby instance failed, \ +retry for %s times" % start_retry_num) + self.addStandbyIpInPrimaryConf() + self.reloadPrimaryConf() + self.commonGsCtl.buildInstance(hostName, dataNode, + MODE_STANDBY, self.envFile) + start_retry_num = start_retry_num + 1 + else: + break + + + def generateClusterStaticFile(self): + """ + generate static_config_files and send to all hosts + """ + self.logger.debug("Start to generate and send cluster static file.\n") + + primaryHosts = self.getPrimaryHostName() + command = "gs_om -t generateconf -X %s" % self.context.xmlFile + sshTool = SshTool([primaryHosts]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [primaryHosts], self.envFile) + self.logger.debug(outputCollect) + + nodeNameList = self.context.nodeNameList + + for hostName in nodeNameList: + hostSsh = SshTool([hostName]) + toolPath = self.context.clusterInfoDict["toolPath"] + appPath = self.context.clusterInfoDict["appPath"] + srcFile = "%s/script/static_config_files/cluster_static_config_%s" \ + % (toolPath, hostName) + targetFile = "%s/bin/cluster_static_config" % appPath + hostSsh.scpFiles(srcFile, targetFile, [hostName], self.envFile) + + self.logger.debug("End to generate and send cluster static file.\n") + time.sleep(10) + + # Single-node database need start cluster after expansion + if self.isSingleNodeInstance: + self.logger.debug("Single-Node instance need restart.\n") + self.commonGsCtl.queryOmCluster(primaryHosts, self.envFile) + + # if primary database not normal, restart it + primaryHost = self.getPrimaryHostName() + dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] + insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost, + dataNode, self.envFile) + if insType != MODE_PRIMARY: + self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode, + MODE_PRIMARY, self.envFile) + # if stat if not normal,rebuild standby database + standbyHosts = self.context.newHostList + for host in standbyHosts: + hostName = self.context.backIpNameMap[host] + dataNode = self.context.clusterInfoDict[hostName]["dataNode"] + insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, + dataNode, self.envFile) + if dbStat != STAT_NORMAL: + self.commonGsCtl.buildInstance(hostName, dataNode, + MODE_STANDBY, self.envFile) + + self.commonGsCtl.startOmCluster(primaryHosts, self.envFile) + + def setGUCOnClusterHosts(self, hostNames=[]): + """ + guc config on all hosts + """ + + gucDict = self.getGUCConfig() + + tempShFile = "%s/guc.sh" % self.tempFileDir + + if len(hostNames) == 0: + hostNames = self.context.nodeNameList + + for host in hostNames: + + command = "source %s ; " % self.envFile + gucDict[host] + + self.logger.debug(command) + + sshTool = SshTool([host]) + + # create temporary dir to save guc command bashfile. + mkdirCmd = "mkdir -m a+x -p %s; chown %s:%s %s" % \ + (self.tempFileDir,self.user,self.group,self.tempFileDir) + retmap, output = sshTool.getSshStatusOutput(mkdirCmd, [host], self.envFile) + + subprocess.getstatusoutput("mkdir -m a+x -p %s; touch %s; \ + cat /dev/null > %s" % \ + (self.tempFileDir, tempShFile, tempShFile)) + fo = open("%s" % tempShFile, "w") + fo.write("#bash\n") + fo.write( command ) + fo.close() + + # send guc command bashfile to each host and execute it. + sshTool.scpFiles("%s" % tempShFile, "%s" % tempShFile, [host], + self.envFile) + + resultMap, outputCollect = sshTool.getSshStatusOutput("sh %s" % \ + tempShFile, [host], self.envFile) + + self.logger.debug(outputCollect) + + def getGUCConfig(self): + """ + get guc config of each node: + replconninfo[index] + remote_read_mode + replication_type + """ + nodeDict = self.context.clusterInfoDict + hostNames = self.context.nodeNameList + + gucDict = {} + + for hostName in hostNames: + + localeHostInfo = nodeDict[hostName] + index = 1 + guc_tempate_str = "" + for remoteHost in hostNames: + if(remoteHost == hostName): + continue + remoteHostInfo = nodeDict[remoteHost] + + guc_repl_template = """\ + gs_guc set -D {dn} -c "replconninfo{index}=\ + 'localhost={localhost} localport={localport} \ + localheartbeatport={localeHeartPort} \ + localservice={localservice} \ + remotehost={remoteNode} \ + remoteport={remotePort} \ + remoteheartbeatport={remoteHeartPort} \ + remoteservice={remoteservice}'" + """.format(dn=localeHostInfo["dataNode"], + index=index, + localhost=localeHostInfo["sshIp"], + localport=localeHostInfo["localport"], + localeHeartPort=localeHostInfo["heartBeatPort"], + localservice=localeHostInfo["localservice"], + remoteNode=remoteHostInfo["sshIp"], + remotePort=remoteHostInfo["localport"], + remoteHeartPort=remoteHostInfo["heartBeatPort"], + remoteservice=remoteHostInfo["localservice"]) + + guc_tempate_str += guc_repl_template + + index += 1 + + guc_mode_type = """ + gs_guc set -D {dn} -c 'remote_read_mode=off'; + gs_guc set -D {dn} -c 'replication_type=1'; + """.format(dn=localeHostInfo["dataNode"]) + guc_tempate_str += guc_mode_type + + gucDict[hostName] = guc_tempate_str + return gucDict + + def checkLocalModeOnStandbyHosts(self): + """ + """ + standbyHosts = self.context.newHostList + self.logger.log("Checking the database with locale mode.") + for host in standbyHosts: + hostName = self.context.backIpNameMap[host] + dataNode = self.context.clusterInfoDict[hostName]["dataNode"] + insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, + dataNode, self.envFile) + if insType not in (MODE_PRIMARY, MODE_STANDBY, MODE_NORMAL): + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35703"] % + (hostName, self.user, dataNode, dataNode)) + + self.logger.log("Successfully checked the database with locale mode.") + + def preInstall(self): + """ + preinstall on new hosts. + """ + self.logger.log("Start to preinstall database on the new \ +standby nodes.") + self.sendSoftToHosts() + self.generateAndSendXmlFile() + self.preInstallOnHosts() + self.logger.log("Successfully preinstall database on the new \ +standby nodes.") + + + def clearTmpFile(self): + """ + clear temporary file after expansion success + """ + self.logger.debug("start to delete temporary file") + hostNames = self.context.nodeNameList + sshTool = SshTool(hostNames) + clearCmd = "source %s ; rm -rf %s" % (self.envFile, self.tempFileDir) + result, output = sshTool.getSshStatusOutput(clearCmd, + hostNames, self.envFile) + self.logger.debug(output) + + def installAndExpansion(self): + """ + install database and expansion standby node with db om user + """ + pvalue = Value('i', 0) + proc = Process(target=self.installProcess, args=(pvalue,)) + proc.start() + proc.join() + if not pvalue.value: + sys.exit(1) + else: + proc.terminate() + + def installProcess(self, pvalue): + # change to db manager user. the below steps run with db manager user. + self.changeUser() + + if not self.context.standbyLocalMode: + self.logger.log("\nStart to install database on the new \ +standby nodes.") + self.installDatabaseOnHosts() + else: + self.logger.log("\nStandby nodes is installed with locale mode.") + self.checkLocalModeOnStandbyHosts() + + self.logger.log("\nDatabase on standby nodes installed finished. \ +Start to establish the primary-standby relationship.") + self.buildStandbyRelation() + # process success + pvalue.value = 1 + + def run(self): + """ + start expansion + """ + # preinstall on standby nodes with root user. + if not self.context.standbyLocalMode: + self.preInstall() + + self.installAndExpansion() + + self.clearTmpFile() + + self.logger.log("\nSuccess to expansion standby nodes.") + + +class GsCtlCommon: + + def __init__(self, expansion): + """ + """ + self.logger = expansion.logger + + def queryInstanceStatus(self, host, datanode, env): + """ + """ + command = "source %s ; gs_ctl query -D %s" % (env, datanode) + sshTool = SshTool([datanode]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(outputCollect) + localRole = re.findall(r"local_role.*: (.*?)\n", outputCollect) + db_state = re.findall(r"db_state.*: (.*?)\n", outputCollect) + + insType = "" + + if(len(localRole)) == 0: + insType = "" + else: + insType = localRole[0] + + dbStatus = "" + if(len(db_state)) == 0: + dbStatus = "" + else: + dbStatus = db_state[0] + + return insType.strip().lower(), dbStatus.strip().lower() + + def stopInstance(self, host, datanode, env): + """ + """ + command = "source %s ; gs_ctl stop -D %s" % (env, datanode) + sshTool = SshTool([host]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(host) + self.logger.debug(outputCollect) + + def startInstanceWithMode(self, host, datanode, mode, env): + """ + """ + command = "source %s ; gs_ctl start -D %s -M %s" % (env, datanode, mode) + self.logger.debug(command) + sshTool = SshTool([host]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(host) + self.logger.debug(outputCollect) + + def buildInstance(self, host, datanode, mode, env): + command = "source %s ; gs_ctl build -D %s -M %s" % (env, datanode, mode) + self.logger.debug(command) + sshTool = SshTool([host]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(host) + self.logger.debug(outputCollect) + + def startOmCluster(self, host, env): + """ + om tool start cluster + """ + command = "source %s ; gs_om -t start" % env + self.logger.debug(command) + sshTool = SshTool([host]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(host) + self.logger.debug(outputCollect) + + def queryOmCluster(self, host, env): + """ + query om cluster detail with command: + gs_om -t status --detail + """ + command = "source %s ; gs_om -t status --detail" % env + sshTool = SshTool([host]) + resultMap, outputCollect = sshTool.getSshStatusOutput(command, + [host], env) + self.logger.debug(host) + self.logger.debug(outputCollect) + return outputCollect + + diff --git a/src/manager/om/script/impl/expansion/__init__.py b/src/manager/om/script/impl/expansion/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391