From b55f41b64dc91c373aa5744808ef5c405f2a2bf9 Mon Sep 17 00:00:00 2001 From: liuzifeng Date: Tue, 19 Nov 2024 10:04:20 +0800 Subject: [PATCH] fix set var --- storage/ctc/ctc_ddl_rewriter_plugin.cc | 210 +++++++++++++++++++++---- storage/ctc/ctc_meta_data.cc | 69 ++++---- storage/ctc/ctc_meta_data.h | 2 +- storage/ctc/ctc_mysql_proxy.cc | 39 ++++- storage/ctc/ctc_srv.h | 28 ++++ storage/ctc/ctc_srv_mq_module.cc | 9 ++ storage/ctc/ctc_srv_mq_stub.cc | 35 +++++ storage/ctc/ha_ctc_ddl.h | 8 + storage/ctc/srv_mq_msg.h | 13 ++ 9 files changed, 350 insertions(+), 63 deletions(-) diff --git a/storage/ctc/ctc_ddl_rewriter_plugin.cc b/storage/ctc/ctc_ddl_rewriter_plugin.cc index 08cfb42..1c67d9d 100644 --- a/storage/ctc/ctc_ddl_rewriter_plugin.cc +++ b/storage/ctc/ctc_ddl_rewriter_plugin.cc @@ -56,9 +56,8 @@ #include "sql/auth/sql_auth_cache.h" #include "sql/auth/auth_internal.h" #include "sql/sql_parse.h" -#ifdef FEATURE_X_FOR_MYSQL_32 #include "sql/sys_vars_shared.h" // intern_find_sys_var -#endif +#include "sql/sql_show.h" // get_one_variable using namespace std; @@ -437,7 +436,7 @@ static int ctc_check_flush(string &, MYSQL_THD thd, bool &need_forward) { } static uint32_t ctc_set_var_option(bool is_null_value, bool is_set_default_value, - set_var *setvar) { + enum_var_type type) { uint32_t options = 0; if (is_null_value) { options |= CTC_SET_VARIABLE_TO_NULL; @@ -445,44 +444,60 @@ static uint32_t ctc_set_var_option(bool is_null_value, bool is_set_default_value if (is_set_default_value) { options |= CTC_SET_VARIABLE_TO_DEFAULT; } - if (setvar->type == OPT_PERSIST_ONLY) { + if (type == OPT_PERSIST_ONLY) { options |= CTC_SET_VARIABLE_PERSIST_ONLY; } - if (setvar->type == OPT_PERSIST) { + if (type == OPT_PERSIST) { options |= CTC_SET_VARIABLE_PERSIST; } return options; } -static int ctc_set_var_meta(MYSQL_THD thd, uint32_t options, const char* base_name, - string var_name, string var_value, bool var_real_type) { +static int ctc_set_or_unset_var_meta(MYSQL_THD thd, std::list variables_info, + ctc_set_opt_request *set_opt_request, bool is_unset) { ctc_handler_t tch; tch.inst_id = ctc_instance_id; handlerton* hton = get_ctc_hton(); CTC_RETURN_IF_NOT_ZERO(get_tch_in_handler_data(hton, thd, tch)); - ctc_ddl_broadcast_request broadcast_req {{0}, {0}, {0}, {0}, 0, 0, 0, 0, {0}}; - broadcast_req.options |= CTC_NOT_NEED_CANTIAN_EXECUTE; - broadcast_req.options |= (thd->lex->contains_plaintext_password ? CTC_CURRENT_SQL_CONTAIN_PLAINTEXT_PASSWORD : 0); - string sql = string(thd->query().str).substr(0, thd->query().length); - if (var_real_type) { - // actual value of the variable type int - broadcast_req.user_ip[0] |= 1; - } - // user_name存变量名,user_ip存变量值 - FILL_BROADCAST_BASE_REQ(broadcast_req, var_value.c_str(), var_name.c_str(), - broadcast_req.user_ip, ctc_instance_id, SQLCOM_SET_OPTION); - if(base_name != nullptr) { - strncpy(broadcast_req.db_name, base_name, SMALL_RECORD_SIZE - 1); - } - broadcast_req.options |= options; - int ret = ctc_execute_mysql_ddl_sql(&tch, &broadcast_req, true); - if (ret != 0 && broadcast_req.err_code != 0) { - string err_msg = broadcast_req.err_msg; - my_printf_error(broadcast_req.err_code, "%s", MYF(0), err_msg.c_str()); - return ret; + set_opt_request->mysql_inst_id = ctc_instance_id; + set_opt_request->opt_num = variables_info.size(); + set_opt_request->is_exist_success = false; + for (int i = 0; i < CTC_MAX_INST_NUM; i++) { + set_opt_request->fail_inst_id[i] = UINT32_MAX; + } + set_opt_request->is_unset = is_unset; + set_opt_request->success_offset = 0; + set_opt_request->fail_offset = 0; + if (!is_unset) { + for (int i = 0; i < CTC_MAX_INST_NUM; i++) { + set_opt_request->success_inst_id[i] = UINT32_MAX; + } + } + set_opt_request->set_opt_info = (set_opt_info_t *)my_malloc(PSI_NOT_INSTRUMENTED, variables_info.size() * sizeof(set_opt_info_t), MYF(MY_WME)); + set_opt_info_t *set_opt_info_begin = set_opt_request->set_opt_info; + for (auto it = variables_info.begin(); it != variables_info.end(); ++it) { + auto var_info = *it; + strncpy(set_opt_info_begin->var_name, var_info.name.c_str(), SMALL_RECORD_SIZE - 1); + strncpy(set_opt_info_begin->var_value, var_info.value.c_str(), MAX_DDL_SQL_LEN - 1); + set_opt_info_begin->options |= CTC_NOT_NEED_CANTIAN_EXECUTE; + set_opt_info_begin->options |= (thd->lex->contains_plaintext_password ? CTC_CURRENT_SQL_CONTAIN_PLAINTEXT_PASSWORD : 0); + set_opt_info_begin->options |= var_info.options; + if (var_info.var_real_type) { + // actual value of the variable type int + set_opt_info_begin->var_real_type = 1; + } + memset(set_opt_info_begin->base_name, 0, SMALL_RECORD_SIZE); + if(var_info.base_name != "") { + strncpy(set_opt_info_begin->base_name, var_info.base_name, SMALL_RECORD_SIZE - 1); + } + set_opt_info_begin += 1; } + bool allow_fail = is_unset ? false : true; + int ret = ctc_execute_set_opt(&tch, set_opt_request, allow_fail); + my_free(set_opt_request->set_opt_info); + set_opt_request->set_opt_info = nullptr; update_sess_ctx_by_tch(tch, hton, thd); return ret; } @@ -579,6 +594,80 @@ static int ctc_set_user_var_flag(MYSQL_THD thd, string name, string value) { return 0; } +static void ctc_set_var_info(set_var_info *var_info, const char *base_name, string name, + string value, uint32_t options, bool var_real_type) { + if (base_name != nullptr) { + var_info->base_name = string(base_name, strlen(base_name)); + } + var_info->name = name; + var_info->value = value; + var_info->options = options; + var_info->var_real_type = var_real_type; +} + +static const char* get_old_var_value(MYSQL_THD thd, sys_var *old_var) { + char show_var_buffer[sizeof(SHOW_VAR)]; + SHOW_VAR *show = (SHOW_VAR *)show_var_buffer; + show->name = old_var->name.str; + show->value = (char *)old_var; + show->type = SHOW_SYS; + const CHARSET_INFO *cs; + char val_buf[1024]; + size_t val_length; + return get_one_variable(thd, show, OPT_GLOBAL, show->type, nullptr, &cs, + val_buf, &val_length); +} + +static Item_result get_sys_var_result_type(sys_var *var) { + switch (var->show_type()) { + case SHOW_BOOL: + case SHOW_MY_BOOL: + case SHOW_INT: + case SHOW_LONG: + case SHOW_LONGLONG: + case SHOW_SIGNED_INT: + case SHOW_SIGNED_LONG: + case SHOW_SIGNED_LONGLONG: + case SHOW_HA_ROWS: + return INT_RESULT; + case SHOW_CHAR: + case SHOW_CHAR_PTR: + case SHOW_LEX_STRING: + return STRING_RESULT; + case SHOW_DOUBLE: + return REAL_RESULT; + default: + my_error(ER_VAR_CANT_BE_READ, MYF(0), var->name.str); + return STRING_RESULT; // keep the compiler happy + } +} + +static int ctc_fetch_old_variables(set_var_info *old_var_info, MYSQL_THD thd, set_var *setvar, string var_name) { + sys_var *old_var = intern_find_sys_var(var_name.c_str(), var_name.length()); + if (old_var == nullptr) { + my_error(ER_UNKNOWN_SYSTEM_VARIABLE, MYF(0), var_name.c_str()); + ctc_log_error("[check_system_var]:sysvar is nullptr and var_name : %s", var_name.c_str()); + return ER_UNKNOWN_SYSTEM_VARIABLE; + } + bool is_set_default_value = false; + bool is_null_value = false; + string old_var_name(old_var->name.str, old_var->name.length); + mysql_mutex_lock(&LOCK_global_system_variables); + const char *old_value_str = get_old_var_value(thd, old_var); + mysql_mutex_unlock(&LOCK_global_system_variables); + string old_var_value; + if (old_value_str == nullptr || strcmp(old_value_str, "null") == 0) { + old_var_value = "null"; + is_null_value = true; + } else { + old_var_value = string(old_value_str, strlen(old_value_str)); + } + bool var_real_type = get_sys_var_result_type(old_var) == INT_RESULT ? true : false; + uint32_t old_options = ctc_set_var_option(is_null_value, is_set_default_value, setvar->type); + ctc_set_var_info(old_var_info, nullptr, old_var_name, old_var_value, old_options, var_real_type); + return 0; +} + static int check_non_system_var(set_var_base *var, bool& need_forward, MYSQL_THD thd) { need_forward = false; if (typeid(*var) != typeid(set_var_user)) { @@ -617,13 +706,17 @@ static int check_non_system_var(set_var_base *var, bool& need_forward, MYSQL_THD } static int check_system_var(set_var_base *var, string &sql_str, MYSQL_THD thd, - bool& need_forward, bool& contain_subselect) { + bool &need_forward, bool &contain_subselect, + std::list &old_variables_info, + std::list &new_variables_info) { set_var *setvar = dynamic_cast(var); bool is_set_default_value = false; bool is_null_value = false; int ret = 0; string name_str; string val_str; + set_var_info old_var_info = {"", "", "", 0, false}; + set_var_info new_var_info = {"", "", "", 0, false}; #ifdef FEATURE_X_FOR_MYSQL_32 if (setvar) { std::function f = [&thd, &need_forward, setvar] @@ -665,13 +758,16 @@ static int check_system_var(set_var_base *var, string &sql_str, MYSQL_THD thd, if (setvar->value && setvar->value->result_type() == INT_RESULT) { var_real_type = true; } - uint32_t options = ctc_set_var_option(is_null_value, is_set_default_value, setvar); + uint32_t options = ctc_set_var_option(is_null_value, is_set_default_value, setvar->type); #ifdef FEATURE_X_FOR_MYSQL_26 - ret = ctc_set_var_meta(thd, options, setvar->base.str, name_str, val_str, var_real_type); + const char *base_name = setvar->base.str; #elif defined(FEATURE_X_FOR_MYSQL_32) - ret = ctc_set_var_meta(thd, options, setvar->m_var_tracker.get_var_name(), - name_str, val_str, var_real_type); + const char *base_name = setvar->m_var_tracker.get_var_name(); #endif + ctc_set_var_info(&new_var_info, base_name, name_str, val_str, options, var_real_type); + new_variables_info.emplace_back(new_var_info); + ret |= ctc_fetch_old_variables(&old_var_info, thd, setvar, name_str); + old_variables_info.emplace_back(old_var_info); } else { thd->clear_error(); need_forward = false; // 值校验失败, ctc不进行广播并返回成功, 后续报错由MySQL完成 @@ -679,6 +775,17 @@ static int check_system_var(set_var_base *var, string &sql_str, MYSQL_THD thd, } return ret; } +void get_execute_fail_inst_str(ctc_set_opt_request set_opt_request, string &str) { + uint32_t *inst_id = set_opt_request.fail_inst_id; + for (int i = 0; i < CTC_MAX_INST_NUM; i++) { + if (inst_id[i] != UINT32_MAX) { + if (!str.empty()) { + str += ","; + } + str += to_string(inst_id[i]); + } + } +} /* 参考set_var.cc: sql_set_variables */ static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) { @@ -693,14 +800,51 @@ static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) contain_subselect = true; } var_it.rewind(); + std::list old_variables_info; + std::list new_variables_info; while ((var = var_it++)) { if (typeid(*var) != typeid(set_var)) { ret = check_non_system_var(var, need_forward, thd); + if (ret != 0) { + ctc_log_error("check non system var failed, set option %s", sql_str.c_str()); + return -1; + } } else { - ret = check_system_var(var, sql_str, thd, need_forward, contain_subselect); + ret = check_system_var(var, sql_str, thd, need_forward, contain_subselect, + old_variables_info, new_variables_info); + if (ret != 0) { + ctc_log_error("check system var failed, set option %s", sql_str.c_str()); + return -1; + } } ctc_log_debug("set option %s, need_forward: %d", sql_str.c_str(), need_forward); } + if (!new_variables_info.empty()) { + ctc_set_opt_request set_opt_request; + ret = ctc_set_or_unset_var_meta(thd, new_variables_info, &set_opt_request, false); + if (ret != 0 && set_opt_request.err_code != 0) { + string err_msg = set_opt_request.err_msg; + if (!set_opt_request.is_exist_success) { + // all node execute failed + my_printf_error(set_opt_request.err_code, "all node execute set opt failed, error_message:%s", MYF(0), err_msg.c_str()); + return ret; + } + string execute_fail_inst = ""; + get_execute_fail_inst_str(set_opt_request, execute_fail_inst); + ctc_set_opt_request unset_opt_request; + memcpy(unset_opt_request.success_inst_id, set_opt_request.success_inst_id, CTC_MAX_INST_NUM); + ret = ctc_set_or_unset_var_meta(thd, old_variables_info, &unset_opt_request, true); + if (ret != 0 && set_opt_request.err_code != 0) { + my_printf_error(set_opt_request.err_code, "execute set opt failed, fail inst:%s. try restore other node fail, " + "error_message: %s", MYF(0), execute_fail_inst.c_str(), err_msg.c_str()); + return ret; + } else { + my_printf_error(set_opt_request.err_code, "execute set opt failed, fail inst:%s. try restore other node success, " + "error_message: %s", MYF(0), execute_fail_inst.c_str(), err_msg.c_str()); + return ret; + } + } + } if (IS_METADATA_NORMALIZATION() && !contain_subselect) { need_forward = false; } diff --git a/storage/ctc/ctc_meta_data.cc b/storage/ctc/ctc_meta_data.cc index 5909e3a..74647f2 100644 --- a/storage/ctc/ctc_meta_data.cc +++ b/storage/ctc/ctc_meta_data.cc @@ -864,28 +864,21 @@ static void ctc_init_thd_priv(THD** thd, Sctx_ptr *ctx) { (*thd) = new_thd; } -int ctc_set_sys_var(ctc_ddl_broadcast_request *broadcast_req) { - Sctx_ptr ctx; - THD *new_thd = nullptr; - ctc_init_thd_priv(&new_thd, &ctx); - - Item *res = nullptr; +int ctc_get_sys_var(THD *new_thd, ctc_set_opt_request *broadcast_req, set_opt_info_t *set_opt_info, List &tmp_var_list, Item *res) { set_var *var = nullptr; sys_var *sysvar = nullptr; - string base_name_src = broadcast_req->db_name; - string var_name = broadcast_req->user_name; + string base_name_src = set_opt_info->base_name; + string var_name = set_opt_info->var_name; // variable value is too long and exceeds the range of user_ip - string var_value = broadcast_req->sql_str; - List tmp_var_list; - - bool is_default_value = ((broadcast_req->options) & (CTC_SET_VARIABLE_TO_DEFAULT)) > 0; - bool is_null_value = ((broadcast_req->options) & (CTC_SET_VARIABLE_TO_NULL)) > 0; - bool var_is_int = ((broadcast_req->user_ip[0] & 1) != 0); + string var_value = set_opt_info->var_value; + bool is_default_value = ((set_opt_info->options) & (CTC_SET_VARIABLE_TO_DEFAULT)) > 0; + bool is_null_value = ((set_opt_info->options) & (CTC_SET_VARIABLE_TO_NULL)) > 0; + bool var_is_int = set_opt_info->var_real_type; enum_var_type type = OPT_GLOBAL; - if ((broadcast_req->options & CTC_SET_VARIABLE_PERSIST) > 0) { + if ((set_opt_info->options & CTC_SET_VARIABLE_PERSIST) > 0) { type = OPT_PERSIST; } - if ((broadcast_req->options & CTC_SET_VARIABLE_PERSIST_ONLY) > 0) { + if ((set_opt_info->options & CTC_SET_VARIABLE_PERSIST_ONLY) > 0) { type = OPT_PERSIST_ONLY; } LEX_CSTRING base_name = {nullptr, 0}; @@ -910,26 +903,46 @@ int ctc_set_sys_var(ctc_ddl_broadcast_request *broadcast_req) { } #ifdef FEATURE_X_FOR_MYSQL_26 var = new (new_thd->mem_root) set_var(type, sysvar, base_name, res); - ctc_set_var_type(broadcast_req->options, var); + ctc_set_var_type(set_opt_info->options, var); #elif defined(FEATURE_X_FOR_MYSQL_32) System_variable_tracker var_tracker = System_variable_tracker::make_tracker(var_name.c_str()); var = new (new_thd->mem_root) set_var(type, var_tracker, res); #endif tmp_var_list.push_back(var); - int ret = sql_set_variables(new_thd, &tmp_var_list, false); - if (ret != 0) { - uint err_code = new_thd->get_stmt_da()->mysql_errno(); - strncpy(broadcast_req->err_msg, new_thd->get_stmt_da()->message_text(), ERROR_MESSAGE_LEN - 1); - ctc_log_error("[ctc_set_sys_var]:set global opt fail; err_code: %u, var_name: %s, var_value: %s", - err_code, var_name.c_str(), var_value.c_str()); - return err_code; - } + return 0; +} + +int ctc_set_sys_var(ctc_set_opt_request *broadcast_req) { + Sctx_ptr ctx; + THD *new_thd = nullptr; + ctc_init_thd_priv(&new_thd, &ctx); - tmp_var_list.clear(); - if (res) { - res->cleanup(); + List tmp_var_list; + Item *res = nullptr; + set_opt_info_t *set_opt_info = broadcast_req->set_opt_info; + int ret = 0; + for (uint32_t i = 0; i < broadcast_req->opt_num; i++) { + ret = ctc_get_sys_var(new_thd, broadcast_req, set_opt_info, tmp_var_list, res); + if (ret != 0) { + return ret; + } + ret = sql_set_variables(new_thd, &tmp_var_list, false); + if (ret != 0) { + uint err_code = new_thd->get_stmt_da()->mysql_errno(); + strncpy(broadcast_req->err_msg, new_thd->get_stmt_da()->message_text(), ERROR_MESSAGE_LEN - 1); + ctc_log_error("[ctc_set_sys_var]:set global opt fail; err_code: %u, var_name: %s, var_value: %s", + err_code, set_opt_info->var_name, set_opt_info->var_value); + return err_code; + } + tmp_var_list.clear(); + if (res) { + res->cleanup(); + } + res = nullptr; + set_opt_info += 1; } + new_thd->free_items(); lex_end(new_thd->lex); new_thd->release_resources(); diff --git a/storage/ctc/ctc_meta_data.h b/storage/ctc/ctc_meta_data.h index 92fd986..2b607fb 100644 --- a/storage/ctc/ctc_meta_data.h +++ b/storage/ctc/ctc_meta_data.h @@ -28,7 +28,7 @@ int close_ctc_mdl_thd(uint32_t thd_id, uint32_t mysql_inst_id); int ctc_mdl_lock_thd(ctc_handler_t *tch, ctc_lock_table_info *lock_info, int *err_code); void ctc_mdl_unlock_thd(ctc_handler_t *tch, ctc_lock_table_info *lock_info); -int ctc_set_sys_var(ctc_ddl_broadcast_request *broadcast_req); +int ctc_set_sys_var(ctc_set_opt_request *broadcast_req); int ctc_ddl_execute_lock_tables_by_req(ctc_handler_t *tch, ctc_lock_table_info *lock_info, int *err_code); void ctc_mdl_unlock_tables_thd(ctc_handler_t *tch); diff --git a/storage/ctc/ctc_mysql_proxy.cc b/storage/ctc/ctc_mysql_proxy.cc index 8141c5f..8ee2050 100644 --- a/storage/ctc/ctc_mysql_proxy.cc +++ b/storage/ctc/ctc_mysql_proxy.cc @@ -373,7 +373,7 @@ __attribute__((visibility("default"))) int ctc_ddl_execute_update(uint32_t thd_i return 0; } else if (is_meta_normalization && broadcast_req->sql_command == SQLCOM_SET_OPTION && (broadcast_req->options & CTC_SET_VARIABLE_WITH_SUBSELECT) == 0){ - return ctc_set_sys_var(broadcast_req); + return 0; } bool use_proxy = ctc_use_proxy(broadcast_req->sql_command); @@ -438,6 +438,43 @@ __attribute__((visibility("default"))) int ctc_ddl_execute_update(uint32_t thd_i return 0; } +__attribute__((visibility("default"))) int ctc_ddl_execute_set_opt(ctc_set_opt_request *broadcast_req, bool allow_fail) { + // 相同节点不用执行 + if (broadcast_req->mysql_inst_id == ctc_instance_id) { + ctc_log_note("ctc_ddl_execute_set_opt curnode not need execute, mysql_inst_id:%u", broadcast_req->mysql_inst_id); + return 0; + } + + // 重置时仅在成功实例执行restore + if (broadcast_req->is_unset) { + for (int i = 0; i < CTC_MAX_INST_NUM; i++) { + if (broadcast_req->success_inst_id[i] == ctc_instance_id) { + break; + } + if (i == CTC_MAX_INST_NUM - 1) { + ctc_log_note("ctc_ddl_execute_set_opt curnode not need execute, mysql_inst_id:%u, allow_fail:%d", + broadcast_req->mysql_inst_id, allow_fail); + return 0; + } + } + } + + int ret = ctc_set_sys_var(broadcast_req); + if (ret != 0) { + // 记录失败的实例ID + broadcast_req->fail_inst_id[broadcast_req->fail_offset++] = ctc_instance_id; + } else { + // 如果ret == 0, 设置 is_exist_success + broadcast_req->is_exist_success = true; + + // 非重置时记录成功的实例ID + if (!broadcast_req->is_unset) { + broadcast_req->success_inst_id[broadcast_req->success_offset++] = ctc_instance_id; + } + } + return ret; +} + static int ctc_ddl_get_lock(MYSQL *curr_conn, const uint64_t &conn_map_key, const char *lock_name, int *err_code) { uchar digest[MD5_HASH_SIZE]; compute_md5_hash(pointer_cast(digest), lock_name, strlen(lock_name)); diff --git a/storage/ctc/ctc_srv.h b/storage/ctc/ctc_srv.h index 7a4de58..8def04d 100644 --- a/storage/ctc/ctc_srv.h +++ b/storage/ctc/ctc_srv.h @@ -72,6 +72,8 @@ extern "C" { #define CTC_AUTOINC_NEW_STYLE_LOCKING 1 #define CTC_AUTOINC_NO_LOCKING 2 +#define CTC_MAX_INST_NUM 6 + typedef int64_t date_t; typedef struct { @@ -202,6 +204,28 @@ typedef struct { char err_msg[ERROR_MESSAGE_LEN]; } ctc_ddl_broadcast_request; +typedef struct { + char base_name[SMALL_RECORD_SIZE]; + char var_name[SMALL_RECORD_SIZE]; + char var_value[MAX_DDL_SQL_LEN]; + uint32_t options; + bool var_real_type; +} set_opt_info_t; + +typedef struct { + set_opt_info_t *set_opt_info; + uint32_t mysql_inst_id; + uint32_t success_inst_id[CTC_MAX_INST_NUM]; + uint32_t success_offset; + uint32_t fail_inst_id[CTC_MAX_INST_NUM]; + uint32_t fail_offset; + bool is_exist_success; + uint32_t opt_num; + bool is_unset; + int err_code; + char err_msg[ERROR_MESSAGE_LEN]; +} ctc_set_opt_request; + typedef struct { uint8_t type; char first[SMALL_RECORD_SIZE]; @@ -315,6 +339,7 @@ enum CTC_FUNC_TYPE { CTC_FUNC_TYPE_GET_SERIAL_VALUE, CTC_FUNC_TYPE_DROP_TABLE, CTC_FUNC_TYPE_EXCUTE_MYSQL_DDL_SQL, + CTC_FUNC_TYPE_SET_OPT, CTC_FUNC_TYPE_BROADCAST_REWRITE_SQL, CTC_FUNC_TYPE_CREATE_TABLESPACE, CTC_FUNC_TYPE_ALTER_TABLESPACE, @@ -341,6 +366,7 @@ enum CTC_FUNC_TYPE { CTC_FUNC_TYPE_WAIT_CONNETOR_STARTUPED, /* for duplex channel */ CTC_FUNC_TYPE_MYSQL_EXECUTE_UPDATE, + CTC_FUNC_TYPE_MYSQL_EXECUTE_SET_OPT, CTC_FUNC_TYPE_CLOSE_MYSQL_CONNECTION, CTC_FUNC_TYPE_LOCK_TABLES, CTC_FUNC_TYPE_UNLOCK_TABLES, @@ -608,7 +634,9 @@ int close_mysql_connection(uint32_t thd_id, uint32_t mysql_inst_id); int ctc_ddl_execute_lock_tables(ctc_handler_t *tch, char *db_name, ctc_lock_table_info *lock_info, int *err_code); int ctc_ddl_execute_unlock_tables(ctc_handler_t *tch, uint32_t mysql_inst_id, ctc_lock_table_info *lock_info); EXTER_ATTACK int ctc_ddl_execute_update(uint32_t thd_id, ctc_ddl_broadcast_request *broadcast_req, bool *allow_fail); +EXTER_ATTACK int ctc_ddl_execute_set_opt(ctc_set_opt_request *broadcast_req, bool allow_fail); EXTER_ATTACK int ctc_execute_mysql_ddl_sql(ctc_handler_t *tch, ctc_ddl_broadcast_request *broadcast_req, bool allow_fail); +EXTER_ATTACK int ctc_execute_set_opt(ctc_handler_t *tch, ctc_set_opt_request *broadcast_req, bool allow_fail); int ctc_execute_rewrite_open_conn(uint32_t thd_id, ctc_ddl_broadcast_request *broadcast_req); int ctc_broadcast_rewrite_sql(ctc_handler_t *tch, ctc_ddl_broadcast_request *broadcast_req, bool allow_fail); diff --git a/storage/ctc/ctc_srv_mq_module.cc b/storage/ctc/ctc_srv_mq_module.cc index 80a0706..1f1d5be 100644 --- a/storage/ctc/ctc_srv_mq_module.cc +++ b/storage/ctc/ctc_srv_mq_module.cc @@ -73,6 +73,15 @@ static void* mq_msg_handler(void *arg) { CTC_IGNORE_ERROR_WHEN_MYSQL_SHUTDOWN(req, "ctc_ddl_execute_update"); break; } + case CTC_FUNC_TYPE_MYSQL_EXECUTE_SET_OPT: { + execute_mysql_set_opt_request *req = + (execute_mysql_set_opt_request *)message_block->seg_buf[0]; + req->result = ctc_ddl_execute_set_opt(&req->broadcast_req, req->allow_fail); + ctc_log_system("[Disaster Recovery] execute_mysql_set_opt : result:%d", req->result); + ctc_log_note("execute_mysql_set_opt : result:%d", req->result); + CTC_IGNORE_ERROR_WHEN_MYSQL_SHUTDOWN(req, "ctc_ddl_execute_set_opt"); + break; + } case CTC_FUNC_TYPE_CLOSE_MYSQL_CONNECTION: { struct close_mysql_connection_request *req = (struct close_mysql_connection_request *)message_block->seg_buf[0]; req->result = close_mysql_connection(req->thd_id, req->inst_id); diff --git a/storage/ctc/ctc_srv_mq_stub.cc b/storage/ctc/ctc_srv_mq_stub.cc index 4d56307..7f838dc 100644 --- a/storage/ctc/ctc_srv_mq_stub.cc +++ b/storage/ctc/ctc_srv_mq_stub.cc @@ -1354,6 +1354,41 @@ int ctc_execute_mysql_ddl_sql(ctc_handler_t *tch, ctc_ddl_broadcast_request *bro return result; } +int ctc_execute_set_opt(ctc_handler_t *tch, ctc_set_opt_request *broadcast_req, bool allow_fail) { + void *shm_inst_req = get_one_shm_inst(tch); + void *shm_inst_info = get_one_shm_inst(tch); + execute_set_opt_request *req = (execute_set_opt_request *)alloc_share_mem(shm_inst_req, sizeof(execute_set_opt_request)); + DBUG_EXECUTE_IF("excute_general_ddl_shm_oom", { req = NULL; }); + if (req == NULL) { + ctc_log_error("alloc shm mem error, shm_inst_req(%p), size(%lu)", shm_inst_req, sizeof(execute_set_opt_request)); + return ERR_ALLOC_MEMORY; + } + memcpy(&req->broadcast_req, broadcast_req, sizeof(ctc_set_opt_request)); + req->broadcast_req.set_opt_info = (set_opt_info_t *)alloc_share_mem(shm_inst_info, broadcast_req->opt_num * sizeof(set_opt_info_t)); + if (req->broadcast_req.set_opt_info == NULL) { + ctc_log_error("alloc shm mem error, shm_inst_info(%p), size(%lu)", shm_inst_info, broadcast_req->opt_num * sizeof(set_opt_info_t)); + free_share_mem(shm_inst_req, req); + return ERR_ALLOC_MEMORY; + } + memcpy(req->broadcast_req.set_opt_info, broadcast_req->set_opt_info, broadcast_req->opt_num * sizeof(set_opt_info_t)); + req->tch = *tch; + req->allow_fail = allow_fail; + int result = ERR_CONNECTION_FAILED; + int ret = ctc_mq_deal_func(shm_inst_req, CTC_FUNC_TYPE_SET_OPT, req, tch->msg_buf); + *tch = req->tch; + broadcast_req->err_code = req->broadcast_req.err_code; + broadcast_req->is_exist_success = req->broadcast_req.is_exist_success; + strncpy(broadcast_req->err_msg, req->broadcast_req.err_msg, ERROR_MESSAGE_LEN - 1); + memcpy(broadcast_req->success_inst_id, req->broadcast_req.success_inst_id, sizeof(broadcast_req->success_inst_id)); + memcpy(broadcast_req->fail_inst_id, req->broadcast_req.fail_inst_id, sizeof(broadcast_req->fail_inst_id)); + if (ret == CT_SUCCESS) { + result = req->result; + } + free_share_mem(shm_inst_info, req->broadcast_req.set_opt_info); + free_share_mem(shm_inst_req, req); + return result; +} + int ctc_broadcast_mysql_dd_invalidate(ctc_handler_t *tch, ctc_invalidate_broadcast_request *broadcast_req) { void *shm_inst = get_one_shm_inst(tch); invalidate_mysql_dd_request *req = (invalidate_mysql_dd_request *)alloc_share_mem(shm_inst, sizeof(invalidate_mysql_dd_request)); diff --git a/storage/ctc/ha_ctc_ddl.h b/storage/ctc/ha_ctc_ddl.h index 910279f..9a6f33c 100644 --- a/storage/ctc/ha_ctc_ddl.h +++ b/storage/ctc/ha_ctc_ddl.h @@ -170,6 +170,14 @@ static map mysql_collate_num_to_ctc_type = { {76, COLLATE_UTF8_TOLOWER_CI}, }; +typedef struct { + string base_name; + string name; + string value; + uint32_t options; + bool var_real_type; +} set_var_info; + static map g_ctc_alter_tablespace_map = { {ALTER_TABLESPACE_ADD_FILE, CTC_ALTSPACE_ADD_DATAFILE}, {ALTER_TABLESPACE_DROP_FILE, CTC_ALTSPACE_DROP_DATAFILE}, diff --git a/storage/ctc/srv_mq_msg.h b/storage/ctc/srv_mq_msg.h index 1338647..8c64727 100644 --- a/storage/ctc/srv_mq_msg.h +++ b/storage/ctc/srv_mq_msg.h @@ -426,6 +426,19 @@ struct execute_mysql_ddl_sql_request { bool allow_fail; }; +struct execute_mysql_set_opt_request { + ctc_set_opt_request broadcast_req; + int result; + bool allow_fail; +}; + +struct execute_set_opt_request { + ctc_set_opt_request broadcast_req; + ctc_handler_t tch; + int result; + bool allow_fail; +}; + struct lock_instance_request { bool is_mysqld_starting; ctc_lock_table_mode_t lock_type; -- Gitee