From 599eb69e76c66dfefc4ab11e723e4d22665bb586 Mon Sep 17 00:00:00 2001 From: hezijian Date: Thu, 1 Aug 2024 10:12:55 +0000 Subject: [PATCH 1/2] =?UTF-8?q?!7=20=E5=8D=95=E8=BF=9B=E7=A8=8B=E9=80=82?= =?UTF-8?q?=E9=85=8D=20*=20=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=20*=20safe=20copy=20*=20safe=20copy=20*=20=E6=97=A0?= =?UTF-8?q?=E7=94=A8=E5=8F=98=E9=87=8F=E9=9A=94=E7=A6=BB=20*=20=E5=BD=92?= =?UTF-8?q?=E4=B8=80=E9=9D=9E=E5=BD=92=E4=B8=80=E4=BB=A3=E7=A0=81=E9=9A=94?= =?UTF-8?q?=E7=A6=BB=20*=20fix=20unused=20variable=20*=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=BF=94=E5=9B=9E=E5=80=BC=20*=20=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E6=8B=89=E8=B5=B7=E6=A8=A1=E5=BC=8F=20*=20tse->ctc=20*=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8A=A5=E9=94=99=E8=BF=94=E5=9B=9E=E5=80=BC?= =?UTF-8?q?=20*=20=E5=8D=95=E8=BF=9B=E7=A8=8B=E6=97=A5=E5=BF=97=E9=9A=94?= =?UTF-8?q?=E7=A6=BB=20*=20=E5=85=B1=E4=BA=AB=E5=86=85=E5=AD=98=E5=8A=A0?= =?UTF-8?q?=E5=9B=BA=EF=BC=8C=E6=97=A5=E5=BF=97=E5=8C=BA=E5=88=86=20*=20?= =?UTF-8?q?=E5=85=B1=E4=BA=AB=E5=8D=95=E8=BF=9B=E7=A8=8B=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=87=BD=E6=95=B0=20*=20=E5=BD=92=E4=B8=80=E9=9D=9E=E5=BD=92?= =?UTF-8?q?=E4=B8=80=E6=97=A0=E5=8C=BA=E5=88=AB=20*=20=E5=8D=95=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E9=83=BD=E7=9B=B4=E6=8E=A5=E8=B5=B7=E5=8F=82=E5=A4=A9?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=20*=20=E4=BF=AE=E8=A1=A5=E9=9D=9E=E5=BD=92?= =?UTF-8?q?=E4=B8=80=20*=20=E5=8C=85=E5=90=ABmetadataswitch=20*=20?= =?UTF-8?q?=E5=BD=92=E4=B8=80=E6=B5=81=E7=A8=8B=E9=80=82=E9=85=8D=E9=9D=9E?= =?UTF-8?q?=E5=BD=92=E4=B8=80=20*=20=E5=90=8C=E6=97=B6=E9=80=82=E9=85=8D?= =?UTF-8?q?=E5=BD=92=E4=B8=80=E5=92=8C=E9=9D=9E=E5=BD=92=E4=B8=80=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=20*=20=E5=8D=95=E8=BF=9B=E7=A8=8B=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81=E6=95=B4=E7=90=86=20*=20=E7=A7=80?= =?UTF-8?q?=E7=BB=99=E9=80=82=E9=85=8D=20*=20=E5=90=AF=E5=8A=A8=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=E4=BB=A5open=E5=90=AF=E5=8A=A8=20*=20Revert=20"?= =?UTF-8?q?=E9=80=82=E9=85=8Dopen=E6=A8=A1=E5=BC=8F"=20*=20=E9=80=82?= =?UTF-8?q?=E9=85=8Dopen=E6=A8=A1=E5=BC=8F=20*=20=E4=BF=AE=E6=94=B9plugin?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E4=BB=A5=E9=83=A8=E7=BD=B2=E5=8D=95=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=20*=20=E4=BF=AE=E6=94=B9=E9=9D=9E=E5=BD=92=E4=B8=80?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E6=B5=81=E7=A8=8B=20*=20=E5=BD=92?= =?UTF-8?q?=E4=B8=80=E4=BB=A3=E7=A0=81=E9=9A=94=E7=A6=BB=20*=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E6=9C=AA=E7=94=A8=E5=8F=98=E9=87=8F=20*=20=E6=A0=87?= =?UTF-8?q?=E6=B3=A8=E6=97=A0=E7=94=A8=E5=8F=98=E9=87=8F=20*=20=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=9C=AA=E7=94=A8=E5=8F=98=E9=87=8F=20*=20=E6=A0=87?= =?UTF-8?q?=E6=B3=A8=E6=97=A0=E7=94=A8=E5=8F=98=E9=87=8F=20*=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=20*=20=E7=BC=96=E8=AF=91=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- storage/tianchi/datatype_cnvrtr.cc | 4 + storage/tianchi/ha_tse.cc | 209 +++++++++++++++------ storage/tianchi/ha_tse.h | 7 +- storage/tianchi/ha_tsepart.cc | 43 ++++- storage/tianchi/ha_tsepart.h | 3 +- storage/tianchi/mysql_daac_plugin.cc | 36 ++-- storage/tianchi/tse_ddl_rewriter_plugin.cc | 107 ++++++----- storage/tianchi/tse_mysql_proxy.cc | 46 +++-- storage/tianchi/tse_srv_mq_module.cc | 4 + 9 files changed, 316 insertions(+), 143 deletions(-) diff --git a/storage/tianchi/datatype_cnvrtr.cc b/storage/tianchi/datatype_cnvrtr.cc index 85705d9..a4df381 100644 --- a/storage/tianchi/datatype_cnvrtr.cc +++ b/storage/tianchi/datatype_cnvrtr.cc @@ -1627,6 +1627,10 @@ void copy_column_data_to_mysql(field_info_t *field_info, const field_cnvrt_aux_t if (is_index_only) { uint32_t blob_len = field_info->field_len; char *blob_buf = (char *)my_malloc(PSI_NOT_INSTRUMENTED, blob_len * sizeof(char), MYF(MY_WME)); + if (blob_buf == nullptr) { + tse_log_error("[cantian2mysql]Apply for blob buf:%u Failed", blob_len); + return; + } memcpy(blob_buf, field_info->cantian_cur_field, blob_len); memcpy(field_info->mysql_cur_field, &blob_buf, sizeof(char *)); bitmap_set_bit(field_info->field->table->read_set, field_info->field->field_index()); diff --git a/storage/tianchi/ha_tse.cc b/storage/tianchi/ha_tse.cc index 6944984..1528725 100644 --- a/storage/tianchi/ha_tse.cc +++ b/storage/tianchi/ha_tse.cc @@ -368,17 +368,20 @@ bool is_ctc_mdl_thd(THD* thd) { // 是否为元数据归一的初始化流程 bool is_meta_version_initialize() { - bool is_meta_normalization = CHECK_HAS_MEMBER(handlerton, get_metadata_switch); - if (is_meta_normalization && is_initialize()) { - return true; - } +#ifdef METADATA_NORMALIZED + return is_initialize(); +#else return false; +#endif } // 是否为--upgrade=FORCE bool is_meta_version_upgrading_force() { - bool is_meta_normalization = CHECK_HAS_MEMBER(handlerton, get_metadata_switch); - return is_meta_normalization && (opt_upgrade_mode == UPGRADE_FORCE); +#ifdef METADATA_NORMALIZED + return (opt_upgrade_mode == UPGRADE_FORCE); +#else + return false; +#endif } bool is_alter_table_scan(bool m_error_if_not_empty) { @@ -397,7 +400,7 @@ bool engine_skip_ddl(MYSQL_THD thd) { bool engine_ddl_passthru(MYSQL_THD thd) { // 元数据归一初始化场景,接口流程需要走到参天 - if (is_meta_version_initialize() || is_meta_version_upgrading_force()) { + if (is_initialize() || is_meta_version_upgrading_force()) { return false; } bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); @@ -801,11 +804,12 @@ bool ha_tse::check_unsupported_operation(THD *thd, HA_CREATE_INFO *create_info) my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "The current operation is not supported."); return true; } - - if (create_info != nullptr && (create_info->options & HA_LEX_CREATE_TMP_TABLE) && !IS_METADATA_NORMALIZATION()) { +#ifndef METADATA_NORMALIZED + if (create_info != nullptr && (create_info->options & HA_LEX_CREATE_TMP_TABLE)) { my_error(ER_NOT_ALLOWED_COMMAND, MYF(0)); return HA_ERR_UNSUPPORTED; } +#endif if (create_info != nullptr && create_info->index_file_name) { my_error(ER_ILLEGAL_HA, MYF(0), table_share != nullptr ? table_share->table_name.str : " "); return true; @@ -950,7 +954,7 @@ static handler *tse_create_handler(handlerton *hton, TABLE_SHARE *table, bool pa return file; } - +#ifdef METADATA_NORMALIZED static bool tse_check_if_log_table(const char* db_name, const char* table_name) { LEX_CSTRING cstr_db_name = {db_name, strlen(db_name)}; LEX_CSTRING cstr_table_name = {table_name, strlen(table_name)}; @@ -964,7 +968,7 @@ static bool tse_check_if_log_table(const char* db_name, const char* table_name) } return false; } - +#endif /** @brief Check if the given db.tablename is a system table for this SE. @@ -979,11 +983,12 @@ static bool tse_check_if_log_table(const char* db_name, const char* table_name) static bool tse_is_supported_system_table(const char *db MY_ATTRIBUTE((unused)), const char *table_name MY_ATTRIBUTE((unused)), bool is_sql_layer_system_table MY_ATTRIBUTE((unused))) { - if (IS_METADATA_NORMALIZATION()) { - return true; - } - + +#ifdef METADATA_NORMALIZED + return true; +#else return false; +#endif } /** @@ -1259,6 +1264,7 @@ thd_sess_ctx_s *get_or_init_sess_ctx(handlerton *hton, THD *thd) { sess_ctx = (thd_sess_ctx_s *)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(thd_sess_ctx_s), MYF(MY_WME)); if (sess_ctx == nullptr) { + tse_log_error("my_malloc error for sess_ctx"); return nullptr; } @@ -1361,6 +1367,9 @@ void update_sess_ctx_cursor_by_tch(tianchi_handler_t &tch, handlerton *hton, THD if (total_csize >= SESSION_CURSOR_NUM) { uint32_t free_csize = sess_ctx->invalid_cursors->size(); uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * free_csize); + if (cursors == nullptr) { + tse_log_error("tse_alloc_buf for cursors in update_sess_ctx_cursor_by_tch failed"); + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 1); assert(sess_ctx->invalid_cursors->empty()); @@ -1627,6 +1636,9 @@ static void tse_free_cursors_no_autocommit(THD *thd, tianchi_handler_t *tch, thd } uint64_t *cursors = (uint64_t *)tse_alloc_buf(tch, sizeof(uint64_t) * total_csize); + if (cursors == nullptr) { + tse_log_error("tse_alloc_buf for cursors in tse_free_cursors_no_autocommit failed"); + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); tse_free_session_cursors(tch, cursors, total_csize); @@ -1667,6 +1679,8 @@ static int tse_commit(handlerton *hton, THD *thd, bool commit_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); + if (cursors == nullptr) { + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ret = (ct_errno_t)tse_trx_commit(&tch, cursors, total_csize, &is_ddl_commit); @@ -1734,6 +1748,9 @@ static int tse_rollback(handlerton *hton, THD *thd, bool rollback_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); + if (cursors == nullptr) { + tse_log_error("tse_alloc_buf for cursors in tse_rollback failed"); + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ret = (ct_errno_t)tse_trx_rollback(&tch, cursors, total_csize); @@ -1751,6 +1768,9 @@ static int tse_rollback(handlerton *hton, THD *thd, bool rollback_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); + if (cursors == nullptr) { + tse_log_error("tse_alloc_buf for cursors in tse_rollback failed"); + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); (void)tse_srv_rollback_savepoint(&tch, cursors, total_csize, TSE_SQL_START_INTERNAL_SAVEPOINT); @@ -1983,32 +2003,38 @@ static bool tse_notify_exclusive_mdl(THD *thd, const MDL_key *mdl_key, we can not check sql length while using prepare statement, so we need to check the sql length before ddl sql again */ - if (!IS_METADATA_NORMALIZATION() && thd->query().str && tse_check_ddl_sql_length(thd->query().str)) { +#ifndef METADATA_NORMALIZED + int query_len = thd->query().length; + if (query_len > MAX_DDL_SQL_LEN_CONTEXT) { + string err_msg = "`" + string(thd->query().str).substr(0, 100) + "...` Is Large Than " + to_string(MAX_DDL_SQL_LEN_CONTEXT); + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), err_msg.c_str()); return true; } +#endif if (engine_ddl_passthru(thd)) { return false; } - if (!IS_METADATA_NORMALIZATION()) { - if (engine_skip_ddl(thd)) { - tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", thd->query().str); - return false; - } - if (!ddl_enabled_normal(thd)) { - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); - return true; - } +#ifndef METADATA_NORMALIZED + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", thd->query().str); + return false; + } - if (thd->lex->query_tables == nullptr && mdl_key->mdl_namespace() != MDL_key::SCHEMA) { - return false; - } + if (!ddl_enabled_normal(thd)) { + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); + return true; + } + + if (thd->lex->query_tables == nullptr && mdl_key->mdl_namespace() != MDL_key::SCHEMA) { + return false; + } - if (mysql_system_db.find(mdl_key->db_name()) != mysql_system_db.end()) { - return false; - } + if (mysql_system_db.find(mdl_key->db_name()) != mysql_system_db.end()) { + return false; } +#endif int ret = 0; tianchi_handler_t tch; @@ -2037,7 +2063,8 @@ static bool tse_notify_exclusive_mdl(THD *thd, const MDL_key *mdl_key, static bool tse_notify_alter_table(THD *thd, const MDL_key *mdl_key, ha_notification_type notification_type) { vector ticket_list; - if (IS_METADATA_NORMALIZATION() && notification_type == HA_NOTIFY_PRE_EVENT) { +#ifdef METADATA_NORMALIZED + if (notification_type == HA_NOTIFY_PRE_EVENT) { int pre_lock_ret = tse_lock_table_pre(thd, ticket_list); if (pre_lock_ret != 0) { tse_lock_table_post(thd, ticket_list); @@ -2045,12 +2072,13 @@ static bool tse_notify_alter_table(THD *thd, const MDL_key *mdl_key, return true; } } - +#endif bool ret = tse_notify_exclusive_mdl(thd, mdl_key, notification_type, nullptr); - - if (IS_METADATA_NORMALIZATION() && notification_type == HA_NOTIFY_PRE_EVENT) { +#ifdef METADATA_NORMALIZED + if (notification_type == HA_NOTIFY_PRE_EVENT) { tse_lock_table_post(thd, ticket_list); } +#endif return ret; } @@ -2103,6 +2131,9 @@ static int tse_rollback_savepoint(handlerton *hton, THD *thd, void *savepoint) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); + if (cursors == nullptr) { + tse_log_error("tse_alloc_buf for cursors in tse_rollback_savepoint failed"); + } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ct_errno_t ret = (ct_errno_t)tse_srv_rollback_savepoint(&tch, cursors, total_csize, name); @@ -4114,12 +4145,12 @@ int ha_tse::external_lock(THD *thd, int lock_type) { out they lock meaning. */ DBUG_TRACE; - - if (IS_METADATA_NORMALIZATION() && - tse_check_if_log_table(table_share->db.str, table_share->table_name.str)) { +#ifdef METADATA_NORMALIZED + if (tse_check_if_log_table(table_share->db.str, table_share->table_name.str)) { is_log_table = true; return 0; } +#endif is_log_table = false; if (engine_ddl_passthru(thd) && (is_create_table_check(thd) || is_alter_table_copy(thd))) { @@ -4710,18 +4741,16 @@ static int tse_init_func(void *p) { // 元数据归一流程初始化下发参天 // 主干非initialize_insecure模式,需要注册共享内存接收线程并等待参天启动完成 - if (!opt_initialize_insecure || CHECK_HAS_MEMBER(handlerton, get_inst_id)) { - ret = srv_wait_instance_startuped(); - if (ret != 0) { - tse_log_error("wait cantian instance startuped failed:%d", ret); - return HA_ERR_INITIALIZATION; - } - - ret = tse_reg_instance(); - if (ret != 0) { - tse_log_error("[CTC_INIT]:ctc_reg_instance failed:%d", ret); - return HA_ERR_INITIALIZATION; - } + ret = srv_wait_instance_startuped(); + if (ret != 0) { + tse_log_error("wait cantian instance startuped failed:%d", ret); + return HA_ERR_INITIALIZATION; + } + + ret = tse_reg_instance(); + if (ret != 0) { + tse_log_error("[CTC_INIT]:ctc_reg_instance failed:%d", ret); + return HA_ERR_INITIALIZATION; } ret = tse_check_tx_isolation(); @@ -5259,18 +5288,44 @@ int ha_tse::initialize_cbo_stats() } m_share->cbo_stats = (tianchi_cbo_stats_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tianchi_cbo_stats_t), MYF(MY_WME)); if (m_share->cbo_stats == nullptr) { +#ifdef WITH_DAAC tse_log_error("alloc shm mem failed, m_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); +#else + tse_log_error("alloc mem failed, m_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); +#endif return ERR_ALLOC_MEMORY; } *m_share->cbo_stats = {0, 0, 0, 0, 0, 0, nullptr, nullptr}; m_share->cbo_stats->tse_cbo_stats_table = (tse_cbo_stats_table_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tse_cbo_stats_table_t), MYF(MY_WME)); + if (m_share->cbo_stats->tse_cbo_stats_table == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_share->cbo_stats->tse_cbo_stats_table(%lu)", sizeof(tse_cbo_stats_table_t)); +#else + tse_log_error("alloc shm mem failed, m_share->cbo_stats->tse_cbo_stats_table(%lu)", sizeof(tse_cbo_stats_table_t)); +#endif + return ERR_ALLOC_MEMORY; + } m_share->cbo_stats->tse_cbo_stats_table->columns = (tse_cbo_stats_column_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->fields * sizeof(tse_cbo_stats_column_t), MYF(MY_WME)); - + if (m_share->cbo_stats->tse_cbo_stats_table->columns == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_share->cbo_stats->tse_cbo_stats_table->columns size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); +#else + tse_log_error("alloc shm mem failed, m_share->cbo_stats->tse_cbo_stats_table->columns size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); +#endif + return ERR_ALLOC_MEMORY; + } m_share->cbo_stats->ndv_keys = (uint32_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS, MYF(MY_WME)); - + if (m_share->cbo_stats->ndv_keys == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); +#else + tse_log_error("alloc shm mem failed, m_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); +#endif + return ERR_ALLOC_MEMORY; + } m_share->cbo_stats->msg_len = table->s->fields * sizeof(tse_cbo_stats_column_t); m_share->cbo_stats->key_len = table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS; return CT_SUCCESS; @@ -5355,7 +5410,11 @@ const Item *ha_tse::cond_push(const Item *cond, bool other_tbls_ok MY_ATTRIBUTE( m_cond = (tse_conds *)tse_alloc_buf(&m_tch, sizeof(tse_conds)); if (m_cond == nullptr) { - tse_log_warning("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#else + tse_log_error("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#endif return remainder; } @@ -5433,7 +5492,11 @@ int ha_tse::engine_push(AQP::Table_access *table_aqp) m_cond = (tse_conds *)tse_alloc_buf(&m_tch, sizeof(tse_conds)); if (m_cond == nullptr) { - tse_log_warning("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#else + tse_log_error("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); +#endif return 0; } @@ -5454,3 +5517,41 @@ int ha_tse::engine_push(AQP::Table_access *table_aqp) table_aqp->set_condition(const_cast(m_remainder_conds)); return 0; } + +int is_cantian_run_mode_single() { + // 获取环境变量 RUN_MODE 的值 + const char* run_mode = getenv("RUN_MODE"); + + // 定义有效的单进程模式 + const char* valid_modes[] = { + "cantiand_with_mysql", + "cantiand_with_mysql_st", + "cantiand_with_mysql_in_cluster" + }; + + if (run_mode != NULL) { + // 标志变量,判断是否找到匹配模式 + int found = 0; + + // 遍历有效模式数组,检查 run_mode 是否在其中 + for (size_t i = 0; i < sizeof(valid_modes) / sizeof(valid_modes[0]); i++) { + if (strcmp(run_mode, valid_modes[i]) == 0) { + found = 1; + break; + } + } + + // 如果找到匹配模式,则设置 cantian_cluster_ready 为 true 并返回 SUCCESS + if (found) { + tse_log_system("RUN_MODE %s is single process", run_mode); + return 1; + } else { + tse_log_system("RUN_MODE %s is not single process", run_mode); + return 0; + } + } else { + tse_log_system("RUN_MODE not set"); + return 0; + } + return 0; +} \ No newline at end of file diff --git a/storage/tianchi/ha_tse.h b/storage/tianchi/ha_tse.h index b1f2690..57ab0ec 100644 --- a/storage/tianchi/ha_tse.h +++ b/storage/tianchi/ha_tse.h @@ -477,9 +477,7 @@ public: #ifdef METADATA_NORMALIZED int write_row(uchar *buf, bool write_through = false) override; -#endif - -#ifndef METADATA_NORMALIZED +#else int write_row(uchar *buf) override; #endif @@ -991,6 +989,7 @@ bool tse_is_temporary(const dd::Table *table_def); int32_t tse_get_cluster_role(); void tse_set_mysql_read_only(); void tse_reset_mysql_read_only(); +int is_cantian_run_mode_single(); #pragma GCC visibility pop -#endif +#endif \ No newline at end of file diff --git a/storage/tianchi/ha_tsepart.cc b/storage/tianchi/ha_tsepart.cc index 7f46d04..dfac8a3 100644 --- a/storage/tianchi/ha_tsepart.cc +++ b/storage/tianchi/ha_tsepart.cc @@ -1005,7 +1005,11 @@ int ha_tsepart::initialize_cbo_stats() { m_part_share->cbo_stats = (tianchi_cbo_stats_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tianchi_cbo_stats_t), MYF(MY_WME)); if (m_part_share->cbo_stats == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_part_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); +#else tse_log_error("alloc shm mem failed, m_part_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); +#endif return ERR_ALLOC_MEMORY; } *m_part_share->cbo_stats = {0, 0, 0, 0, 0, 0, nullptr, nullptr}; @@ -1014,11 +1018,35 @@ int ha_tsepart::initialize_cbo_stats() { m_part_share->cbo_stats->tse_cbo_stats_table = (tse_cbo_stats_table_t*)my_malloc(PSI_NOT_INSTRUMENTED, part_num * sizeof(tse_cbo_stats_table_t), MYF(MY_WME)); + if (m_part_share->cbo_stats->tse_cbo_stats_table == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", part_num * sizeof(tse_cbo_stats_table_t)); +#else + tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", part_num * sizeof(tse_cbo_stats_table_t)); +#endif + return ERR_ALLOC_MEMORY; + } m_part_share->cbo_stats->ndv_keys = (uint32_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS, MYF(MY_WME)); + if (m_part_share->cbo_stats->ndv_keys == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_part_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); +#else + tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); +#endif + return ERR_ALLOC_MEMORY; + } for (uint i = 0; i < part_num; i++) { m_part_share->cbo_stats->tse_cbo_stats_table[i].columns = (tse_cbo_stats_column_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->fields * sizeof(tse_cbo_stats_column_t), MYF(MY_WME)); + if (m_part_share->cbo_stats->tse_cbo_stats_table[i].columns == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); +#else + tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); +#endif + return ERR_ALLOC_MEMORY; + } } m_part_share->cbo_stats->msg_len = table->s->fields * sizeof(tse_cbo_stats_column_t); m_part_share->cbo_stats->key_len = table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS; @@ -1117,13 +1145,14 @@ int ha_tsepart::repair(THD *thd, HA_CHECK_OPT *) } broadcast_req.options |= TSE_NOT_NEED_CANTIAN_EXECUTE; - if (IS_METADATA_NORMALIZATION()) { - ct_errno_t ret = (ct_errno_t)ctc_record_sql_for_cantian(&tch, &broadcast_req, false); - assert (ret == CT_SUCCESS); - } else { - ct_errno_t ret = (ct_errno_t)tse_execute_mysql_ddl_sql(&tch, &broadcast_req, false); - assert (ret == CT_SUCCESS); - } + ct_errno_t ret = CT_SUCCESS; +#ifdef METADATA_NORMALIZED + ret = (ct_errno_t)ctc_record_sql_for_cantian(&tch, &broadcast_req, false); + assert (ret == CT_SUCCESS); +#else + ret = (ct_errno_t)tse_execute_mysql_ddl_sql(&tch, &broadcast_req, false); + assert (ret == CT_SUCCESS); +#endif return HA_ADMIN_OK; } diff --git a/storage/tianchi/ha_tsepart.h b/storage/tianchi/ha_tsepart.h index 5b6e2e9..9a73c72 100644 --- a/storage/tianchi/ha_tsepart.h +++ b/storage/tianchi/ha_tsepart.h @@ -250,8 +250,7 @@ class ha_tsepart : public ha_tse, void part_autoinc_has_expl_non_null_value_update_row(uchar *new_data); #ifdef METADATA_NORMALIZED int write_row(uchar *record, bool write_through MY_ATTRIBUTE((unused)) = false) override { -#endif -#ifndef METADATA_NORMALIZED +#else int write_row(uchar *record) override { #endif if (table->next_number_field) { diff --git a/storage/tianchi/mysql_daac_plugin.cc b/storage/tianchi/mysql_daac_plugin.cc index 91c6b10..bad822d 100644 --- a/storage/tianchi/mysql_daac_plugin.cc +++ b/storage/tianchi/mysql_daac_plugin.cc @@ -38,6 +38,7 @@ #include "sql/sql_plugin.h" // st_plugin_int #include "sql/sql_initialize.h" // opt_initialize_insecure #include "tse_log.h" +#include "ha_tse.h" struct mysql_daac_context { my_thread_handle daac_startup_thread; @@ -59,6 +60,7 @@ static std::string get_cantiand_mode() { if (tmp_mode != NULL && strlen(tmp_mode) > 0) { mode = tmp_mode; } + return mode; } @@ -68,6 +70,7 @@ static std::string get_cantiand_home_dir() { if (tmp_home_dir != NULL && strlen(tmp_home_dir) > 0) { home_dir = tmp_home_dir; } + tse_log_system("get cantiand home_dir:%s", home_dir.c_str()); return home_dir; } @@ -99,18 +102,22 @@ static void *mysql_daac_startup_thread(void *p) { struct mysql_daac_context *daac_context = NULL; int daemon_daac_plugin_init() { DBUG_TRACE; - if (opt_initialize_insecure) { - tse_log_debug("initialize-insecure mode no need start the daac startup thread."); - return 0; - } - const char *se_name = "ctc_ddl_rewriter"; - const LEX_CSTRING name = {se_name, strlen(se_name)}; - if (!plugin_is_ready(name, MYSQL_AUDIT_PLUGIN)) { - tse_log_error("tse_ddl_rewriter plugin install failed."); - return -1; + // skip this part if single process + if (!is_cantian_run_mode_single()) { + if (opt_initialize_insecure) { + tse_log_warning("initialize-insecure mode no need start the daac startup thread."); + return 0; + } + + const char *se_name = "ctc_ddl_rewriter"; + const LEX_CSTRING name = {se_name, strlen(se_name)}; + if (!plugin_is_ready(name, MYSQL_AUDIT_PLUGIN)) { + tse_log_error("tse_ddl_rewriter plugin install failed."); + return -1; + } } - + if (daac_context != NULL) { tse_log_error("daemon_daac_plugin_init daac_context:%p not NULL", daac_context); return 0; @@ -119,7 +126,14 @@ int daemon_daac_plugin_init() { daac_context = (struct mysql_daac_context *)my_malloc( PSI_NOT_INSTRUMENTED, sizeof(struct mysql_daac_context), MYF(0)); - + if (daac_context == nullptr) { +#ifdef WITH_DAAC + tse_log_error("alloc mem failed, daac_context size(%lu)", sizeof(struct mysql_daac_context)); +#else + tse_log_error("alloc shm mem failed, daac_context size(%lu)", sizeof(struct mysql_daac_context)); +#endif + return -1; + } my_thread_attr_t startup_attr; /* Thread attributes */ my_thread_attr_init(&startup_attr); my_thread_attr_setdetachstate(&startup_attr, MY_THREAD_CREATE_JOINABLE); diff --git a/storage/tianchi/tse_ddl_rewriter_plugin.cc b/storage/tianchi/tse_ddl_rewriter_plugin.cc index ee61f00..549d916 100644 --- a/storage/tianchi/tse_ddl_rewriter_plugin.cc +++ b/storage/tianchi/tse_ddl_rewriter_plugin.cc @@ -445,7 +445,7 @@ static bool tse_check_ddl_local_enable(string sql_str, bool &need_forwar) { return false; } - +#ifdef METADATA_NORMALIZED static uint32_t tse_set_var_option(bool is_null_value, bool is_set_default_value, set_var *setvar) { uint32_t options = 0; @@ -463,7 +463,8 @@ static uint32_t tse_set_var_option(bool is_null_value, bool is_set_default_value } return options; } - +#endif +#ifdef METADATA_NORMALIZED static int tse_set_var_meta(MYSQL_THD thd, uint32_t options, const char* base_name, string var_name, string var_value) { tianchi_handler_t tch; @@ -489,6 +490,7 @@ static int tse_set_var_meta(MYSQL_THD thd, uint32_t options, const char* base_na return ret; } +#endif static int tse_get_variables_value_string(MYSQL_THD thd, string &sql_str, set_var* setvar, string& val_str, bool& is_null_value, bool &need_forward) { @@ -574,7 +576,9 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) var_it.rewind(); while ((var = var_it++)) { set_var *setvar = dynamic_cast(var); +#ifdef METADATA_NORMALIZED bool is_set_default_value = false; +#endif bool is_null_value = false; if (setvar && setvar->var) { need_forward = !setvar->var->is_readonly() && setvar->is_global_persist(); @@ -582,7 +586,9 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) if (!contain_subselect) { /* get user value (@xxxxx) as string */ if (!setvar->value) { +#ifdef METADATA_NORMALIZED is_set_default_value = true; +#endif val_str = ""; } else { ret = tse_get_variables_value_string(thd, sql_str, setvar, val_str, is_null_value, need_forward); @@ -598,8 +604,8 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "Set global variable query is not allowed (ctc_setopt_disabled = true)"); return -1; } - - if(IS_METADATA_NORMALIZATION() && !contain_subselect && need_forward && setvar) { +#ifdef METADATA_NORMALIZED + if(!contain_subselect && need_forward && setvar) { if (setvar->check(thd) == 0) { uint32_t options = tse_set_var_option(is_null_value, is_set_default_value, setvar); ret = tse_set_var_meta(thd, options, setvar->base.str, name_str, val_str); @@ -608,15 +614,17 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) need_forward = false; // 值校验失败, ctc不进行广播并返回成功, 后续报错由MySQL完成 } } - +#endif tse_log_system("set option %s, need_forward: %d", sql_str.c_str(), need_forward); } - if (IS_METADATA_NORMALIZATION() && !contain_subselect) { +#ifdef METADATA_NORMALIZED + if (!contain_subselect) { need_forward = false; } +#endif return ret; } - +#ifndef METADATA_NORMALIZED static int is_system_db(const char *ddl_db) { if (mysql_system_db.find(ddl_db) != mysql_system_db.end()) { my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), @@ -626,6 +634,7 @@ static int is_system_db(const char *ddl_db) { return 0; } +#endif static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { need_forward = false; // broadcast by storage engine @@ -649,18 +658,18 @@ static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { return -1; } - if (!IS_METADATA_NORMALIZATION()) { - // create like table 检查是否是系统库 - if (thd->lex->query_tables != nullptr && - thd->lex->query_tables->next_global != nullptr && - thd->lex->create_info != nullptr && - thd->lex->create_info->options & HA_LEX_CREATE_TABLE_LIKE && - !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE) && - !thd->lex->drop_temporary) { - const char *ddl_db = thd->lex->query_tables->next_global->db; - return is_system_db(ddl_db); - } +#ifndef METADATA_NORMALIZED + // create like table 检查是否是系统库 + if (thd->lex->query_tables != nullptr && + thd->lex->query_tables->next_global != nullptr && + thd->lex->create_info != nullptr && + thd->lex->create_info->options & HA_LEX_CREATE_TABLE_LIKE && + !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE) && + !thd->lex->drop_temporary) { + const char *ddl_db = thd->lex->query_tables->next_global->db; + return is_system_db(ddl_db); } +#endif // create tablespace 检查是否为engine=Innodb情况 if (thd->lex->sql_command == SQLCOM_ALTER_TABLESPACE) { @@ -674,15 +683,15 @@ static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { } } - if (!IS_METADATA_NORMALIZATION()) { - // create表 && drop表/库 (检查是否是系统库上ddl) - if (thd->lex->query_tables != nullptr && - (thd->lex->create_info != nullptr && !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE)) && - !thd->lex->drop_temporary) { - const char *ddl_db = thd->lex->query_tables->db; - return is_system_db(ddl_db); - } +#ifndef METADATA_NORMALIZED + // create表 && drop表/库 (检查是否是系统库上ddl) + if (thd->lex->query_tables != nullptr && + (thd->lex->create_info != nullptr && !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE)) && + !thd->lex->drop_temporary) { + const char *ddl_db = thd->lex->query_tables->db; + return is_system_db(ddl_db); } +#endif return 0; } @@ -758,7 +767,9 @@ static int tse_lock_tables_ddl(string &, MYSQL_THD thd, bool &) { (int32_t)TL_UNLOCK}; FILL_USER_INFO_WITH_THD(lock_info, thd); strncpy(lock_info.db_name, table->db, SMALL_RECORD_SIZE); + lock_info.db_name[SMALL_RECORD_SIZE - 1] = '\0'; strncpy(lock_info.table_name, table->table_name, SMALL_RECORD_SIZE); + lock_info.table_name[SMALL_RECORD_SIZE - 1] = '\0'; ret = tse_unlock_table(&tch, ctc_instance_id, &lock_info); if (ret != 0) { tse_log_error("[TSE_DDL_REWRITE]:unlock table failed, table:%s.%s", lock_info.db_name, lock_info.table_name); @@ -1123,31 +1134,31 @@ bool plugin_ddl_block(MYSQL_THD thd, if (!need_forward) { return false; } - - if (IS_METADATA_NORMALIZATION() && !is_dcl_sql_cmd(thd->lex->sql_command)) { +#ifdef METADATA_NORMALIZED + if (!is_dcl_sql_cmd(thd->lex->sql_command)) { if (ctc_record_sql(thd, broadcast_cmd.need_select_db)) { tse_log_error("[CTC_META_SQL]:record sql str failed. sql:%s", query_str.c_str()); return true; } } +#endif - if (!IS_METADATA_NORMALIZATION()) { - if (engine_skip_ddl(thd)) { - tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", query_str.c_str()); - return false; - } - // disallow ddl query if ctc_concurrent_ddl=OFF and tse_enable_ddl not set - if (!ddl_enabled_normal(thd)) { - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); - return true; - } - - return check_agent_connection(thd); +#ifndef METADATA_NORMALIZED + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", query_str.c_str()); + return false; + } + // disallow ddl query if ctc_concurrent_ddl=OFF and tse_enable_ddl not set + if (!ddl_enabled_normal(thd)) { + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); + return true; } + return check_agent_connection(thd); +#endif return false; } - +#ifndef METADATA_NORMALIZED // due to MDL_key::BACKUP_LOCK`s MDL_INTENTION_EXCLUSIVE comflicts with MDL_key::BACKUP_LOCK`s MDL_SHARED (user execute STMT `lock instance for backup`) static bool tse_is_instance_locked_by_backup(MYSQL_THD thd) { MDL_request mdl_request; @@ -1166,7 +1177,9 @@ static bool tse_is_instance_locked_by_backup(MYSQL_THD thd) { return false; } } +#endif +#ifndef METADATA_NORMALIZED static bool tse_is_have_global_read_lock(MYSQL_THD thd) { // check if current connetion hold global read lock, let it go if (thd->global_read_lock.is_acquired()) { @@ -1180,11 +1193,11 @@ static bool tse_is_have_global_read_lock(MYSQL_THD thd) { return false; } - +#endif static inline bool tse_is_broadcast_by_storage_engine(ddl_broadcast_cmd broadcast_cmd) { return broadcast_cmd.pre_func == tse_check_ddl || broadcast_cmd.pre_func == tse_check_ddl_engine; } - +#ifndef METADATA_NORMALIZED static bool tse_is_set_session_var(MYSQL_THD thd, string &query_str) { if (thd->lex->sql_command != SQLCOM_SET_OPTION) { return false; @@ -1211,6 +1224,7 @@ static bool tse_is_set_session_var(MYSQL_THD thd, string &query_str) { return false; } +#endif static int tse_check_metadata_switch() { metadata_switchs metadata_switch = (metadata_switchs)tse_get_metadata_switch(); @@ -1275,7 +1289,10 @@ static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, } } else if (sql_cmd == SQLCOM_UNLOCK_INSTANCE) { tse_check_unlock_instance(thd); - } else if (!IS_METADATA_NORMALIZATION() && (need_forward || tse_is_broadcast_by_storage_engine(it->second))) { +#ifdef METADATA_NORMALIZED + } +#else + } else if ((need_forward || tse_is_broadcast_by_storage_engine(it->second))) { // block ddl when instance has exclusive backup lock (LOCK INSTANCE FOR BACKUP), ref sql_backup_lock.cc if (tse_is_instance_locked_by_backup(thd)) { @@ -1295,7 +1312,7 @@ static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, return -1; } } - +#endif ddl_broadcast_cmd broadcast_cmd = it->second; return need_forward && ddl_broadcast_and_wait(thd, query_str, (uint8_t)sql_cmd, broadcast_cmd); // 0: success other: fail } diff --git a/storage/tianchi/tse_mysql_proxy.cc b/storage/tianchi/tse_mysql_proxy.cc index 3bc2e38..0937dde 100644 --- a/storage/tianchi/tse_mysql_proxy.cc +++ b/storage/tianchi/tse_mysql_proxy.cc @@ -367,8 +367,12 @@ __attribute__((visibility("default"))) int tse_ddl_execute_update(uint32_t thd_i tse_log_note("tse_ddl_execute_update curnode not need execute,mysql_inst_id:%u", broadcast_req->mysql_inst_id); return 0; } +#ifdef METADATA_NORMALIZED + bool is_meta_normalization = true; +#else + bool is_meta_normalization = false; +#endif - bool is_meta_normalization = IS_METADATA_NORMALIZATION(); if (is_meta_normalization && broadcast_req->sql_command != SQLCOM_SET_OPTION) { return 0; } else if (is_meta_normalization && broadcast_req->sql_command == SQLCOM_SET_OPTION @@ -506,17 +510,16 @@ int32_t tse_check_table_exist(MYSQL *curr_conn_proxy, const char *db_name, const } __attribute__((visibility("default"))) int tse_ddl_execute_lock_tables(tianchi_handler_t *tch, char *db_name, tse_lock_table_info *lock_info, int *err_code) { - - if (IS_METADATA_NORMALIZATION()) { - if (lock_info->sql_type == SQLCOM_LOCK_TABLES) { - if (tse_ddl_execute_lock_tables_by_req(tch, lock_info, err_code)) { - return *err_code; - } - } else if (tse_mdl_lock_thd(tch, lock_info, err_code)) { +#ifdef METADATA_NORMALIZED + if (lock_info->sql_type == SQLCOM_LOCK_TABLES) { + if (tse_ddl_execute_lock_tables_by_req(tch, lock_info, err_code)) { return *err_code; } - return 0; + } else if (tse_mdl_lock_thd(tch, lock_info, err_code)) { + return *err_code; } + return 0; +#endif bool is_same_node = (tch->inst_id == ctc_instance_id); uint64_t conn_map_key = tse_get_conn_key(tch->inst_id, tch->thd_id, !is_same_node); @@ -601,14 +604,17 @@ __attribute__((visibility("default"))) int tse_ddl_execute_lock_tables(tianchi_h __attribute__((visibility("default"))) int tse_ddl_execute_unlock_tables(tianchi_handler_t *tch, uint32_t mysql_inst_id, tse_lock_table_info *lock_info) { - if (IS_METADATA_NORMALIZATION()) { - UNUSED_PARAM(mysql_inst_id); - if (lock_info->sql_type == SQLCOM_UNLOCK_TABLES) { - tse_mdl_unlock_tables_thd(tch); - } - tse_mdl_unlock_thd(tch, lock_info); - return 0; +#ifdef METADATA_NORMALIZED + UNUSED_PARAM(mysql_inst_id); + if (lock_info->sql_type == SQLCOM_UNLOCK_TABLES) { + tse_mdl_unlock_tables_thd(tch); } + tse_mdl_unlock_thd(tch, lock_info); + return 0; +#else + (void)mysql_inst_id; + (void)lock_info; +#endif bool is_same_node = (tch->inst_id == ctc_instance_id); uint64_t conn_map_key = tse_get_conn_key(tch->inst_id, tch->thd_id, !is_same_node); @@ -672,10 +678,10 @@ __attribute__((visibility("default"))) int tse_ddl_execute_unlock_tables(tianchi * 低16位全为1代表整个参天节点故障,清理与参天实例id相关的资源 */ __attribute__((visibility("default"))) int close_mysql_connection(uint32_t thd_id, uint32_t mysql_inst_id) { - if (IS_METADATA_NORMALIZATION()) { - close_tse_mdl_thd(thd_id, mysql_inst_id); - return 0; - } +#ifdef METADATA_NORMALIZED + close_tse_mdl_thd(thd_id, mysql_inst_id); + return 0; +#endif if (thd_id == 0) { if ((uint16_t)mysql_inst_id == (uint16_t)CANTIAN_DOWN_MASK) { diff --git a/storage/tianchi/tse_srv_mq_module.cc b/storage/tianchi/tse_srv_mq_module.cc index de6548e..a3ef3a2 100644 --- a/storage/tianchi/tse_srv_mq_module.cc +++ b/storage/tianchi/tse_srv_mq_module.cc @@ -298,7 +298,11 @@ static void tse_log_reg_error_by_code(int error_code) { switch(error_code) { case ERR_CONNECTION_FAILED: +#ifdef WITH_DAAC + tse_log_error("connection failed"); +#else tse_log_error("shm connection failed"); +#endif break; case REG_MISMATCH_CTC_VERSION: tse_log_error("CTC client version mismatch server!"); -- Gitee From 47b5308828edac4cb0b2482bd00d71c7ed83a351 Mon Sep 17 00:00:00 2001 From: hezijian Date: Thu, 1 Aug 2024 10:15:16 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=E5=9B=9E=E9=80=80=20'Pull=20Request=20!7?= =?UTF-8?q?=20:=20=E5=8D=95=E8=BF=9B=E7=A8=8B=E9=80=82=E9=85=8D'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- storage/tianchi/datatype_cnvrtr.cc | 4 - storage/tianchi/ha_tse.cc | 209 ++++++--------------- storage/tianchi/ha_tse.h | 7 +- storage/tianchi/ha_tsepart.cc | 43 +---- storage/tianchi/ha_tsepart.h | 3 +- storage/tianchi/mysql_daac_plugin.cc | 36 ++-- storage/tianchi/tse_ddl_rewriter_plugin.cc | 107 +++++------ storage/tianchi/tse_mysql_proxy.cc | 46 ++--- storage/tianchi/tse_srv_mq_module.cc | 4 - 9 files changed, 143 insertions(+), 316 deletions(-) diff --git a/storage/tianchi/datatype_cnvrtr.cc b/storage/tianchi/datatype_cnvrtr.cc index a4df381..85705d9 100644 --- a/storage/tianchi/datatype_cnvrtr.cc +++ b/storage/tianchi/datatype_cnvrtr.cc @@ -1627,10 +1627,6 @@ void copy_column_data_to_mysql(field_info_t *field_info, const field_cnvrt_aux_t if (is_index_only) { uint32_t blob_len = field_info->field_len; char *blob_buf = (char *)my_malloc(PSI_NOT_INSTRUMENTED, blob_len * sizeof(char), MYF(MY_WME)); - if (blob_buf == nullptr) { - tse_log_error("[cantian2mysql]Apply for blob buf:%u Failed", blob_len); - return; - } memcpy(blob_buf, field_info->cantian_cur_field, blob_len); memcpy(field_info->mysql_cur_field, &blob_buf, sizeof(char *)); bitmap_set_bit(field_info->field->table->read_set, field_info->field->field_index()); diff --git a/storage/tianchi/ha_tse.cc b/storage/tianchi/ha_tse.cc index 1528725..6944984 100644 --- a/storage/tianchi/ha_tse.cc +++ b/storage/tianchi/ha_tse.cc @@ -368,20 +368,17 @@ bool is_ctc_mdl_thd(THD* thd) { // 是否为元数据归一的初始化流程 bool is_meta_version_initialize() { -#ifdef METADATA_NORMALIZED - return is_initialize(); -#else + bool is_meta_normalization = CHECK_HAS_MEMBER(handlerton, get_metadata_switch); + if (is_meta_normalization && is_initialize()) { + return true; + } return false; -#endif } // 是否为--upgrade=FORCE bool is_meta_version_upgrading_force() { -#ifdef METADATA_NORMALIZED - return (opt_upgrade_mode == UPGRADE_FORCE); -#else - return false; -#endif + bool is_meta_normalization = CHECK_HAS_MEMBER(handlerton, get_metadata_switch); + return is_meta_normalization && (opt_upgrade_mode == UPGRADE_FORCE); } bool is_alter_table_scan(bool m_error_if_not_empty) { @@ -400,7 +397,7 @@ bool engine_skip_ddl(MYSQL_THD thd) { bool engine_ddl_passthru(MYSQL_THD thd) { // 元数据归一初始化场景,接口流程需要走到参天 - if (is_initialize() || is_meta_version_upgrading_force()) { + if (is_meta_version_initialize() || is_meta_version_upgrading_force()) { return false; } bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); @@ -804,12 +801,11 @@ bool ha_tse::check_unsupported_operation(THD *thd, HA_CREATE_INFO *create_info) my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "The current operation is not supported."); return true; } -#ifndef METADATA_NORMALIZED - if (create_info != nullptr && (create_info->options & HA_LEX_CREATE_TMP_TABLE)) { + + if (create_info != nullptr && (create_info->options & HA_LEX_CREATE_TMP_TABLE) && !IS_METADATA_NORMALIZATION()) { my_error(ER_NOT_ALLOWED_COMMAND, MYF(0)); return HA_ERR_UNSUPPORTED; } -#endif if (create_info != nullptr && create_info->index_file_name) { my_error(ER_ILLEGAL_HA, MYF(0), table_share != nullptr ? table_share->table_name.str : " "); return true; @@ -954,7 +950,7 @@ static handler *tse_create_handler(handlerton *hton, TABLE_SHARE *table, bool pa return file; } -#ifdef METADATA_NORMALIZED + static bool tse_check_if_log_table(const char* db_name, const char* table_name) { LEX_CSTRING cstr_db_name = {db_name, strlen(db_name)}; LEX_CSTRING cstr_table_name = {table_name, strlen(table_name)}; @@ -968,7 +964,7 @@ static bool tse_check_if_log_table(const char* db_name, const char* table_name) } return false; } -#endif + /** @brief Check if the given db.tablename is a system table for this SE. @@ -983,12 +979,11 @@ static bool tse_check_if_log_table(const char* db_name, const char* table_name) static bool tse_is_supported_system_table(const char *db MY_ATTRIBUTE((unused)), const char *table_name MY_ATTRIBUTE((unused)), bool is_sql_layer_system_table MY_ATTRIBUTE((unused))) { - -#ifdef METADATA_NORMALIZED - return true; -#else + if (IS_METADATA_NORMALIZATION()) { + return true; + } + return false; -#endif } /** @@ -1264,7 +1259,6 @@ thd_sess_ctx_s *get_or_init_sess_ctx(handlerton *hton, THD *thd) { sess_ctx = (thd_sess_ctx_s *)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(thd_sess_ctx_s), MYF(MY_WME)); if (sess_ctx == nullptr) { - tse_log_error("my_malloc error for sess_ctx"); return nullptr; } @@ -1367,9 +1361,6 @@ void update_sess_ctx_cursor_by_tch(tianchi_handler_t &tch, handlerton *hton, THD if (total_csize >= SESSION_CURSOR_NUM) { uint32_t free_csize = sess_ctx->invalid_cursors->size(); uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * free_csize); - if (cursors == nullptr) { - tse_log_error("tse_alloc_buf for cursors in update_sess_ctx_cursor_by_tch failed"); - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 1); assert(sess_ctx->invalid_cursors->empty()); @@ -1636,9 +1627,6 @@ static void tse_free_cursors_no_autocommit(THD *thd, tianchi_handler_t *tch, thd } uint64_t *cursors = (uint64_t *)tse_alloc_buf(tch, sizeof(uint64_t) * total_csize); - if (cursors == nullptr) { - tse_log_error("tse_alloc_buf for cursors in tse_free_cursors_no_autocommit failed"); - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); tse_free_session_cursors(tch, cursors, total_csize); @@ -1679,8 +1667,6 @@ static int tse_commit(handlerton *hton, THD *thd, bool commit_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); - if (cursors == nullptr) { - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ret = (ct_errno_t)tse_trx_commit(&tch, cursors, total_csize, &is_ddl_commit); @@ -1748,9 +1734,6 @@ static int tse_rollback(handlerton *hton, THD *thd, bool rollback_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); - if (cursors == nullptr) { - tse_log_error("tse_alloc_buf for cursors in tse_rollback failed"); - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ret = (ct_errno_t)tse_trx_rollback(&tch, cursors, total_csize); @@ -1768,9 +1751,6 @@ static int tse_rollback(handlerton *hton, THD *thd, bool rollback_trx) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); - if (cursors == nullptr) { - tse_log_error("tse_alloc_buf for cursors in tse_rollback failed"); - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); (void)tse_srv_rollback_savepoint(&tch, cursors, total_csize, TSE_SQL_START_INTERNAL_SAVEPOINT); @@ -2003,38 +1983,32 @@ static bool tse_notify_exclusive_mdl(THD *thd, const MDL_key *mdl_key, we can not check sql length while using prepare statement, so we need to check the sql length before ddl sql again */ -#ifndef METADATA_NORMALIZED - int query_len = thd->query().length; - if (query_len > MAX_DDL_SQL_LEN_CONTEXT) { - string err_msg = "`" + string(thd->query().str).substr(0, 100) + "...` Is Large Than " + to_string(MAX_DDL_SQL_LEN_CONTEXT); - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), err_msg.c_str()); + if (!IS_METADATA_NORMALIZATION() && thd->query().str && tse_check_ddl_sql_length(thd->query().str)) { return true; } -#endif if (engine_ddl_passthru(thd)) { return false; } -#ifndef METADATA_NORMALIZED - if (engine_skip_ddl(thd)) { - tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", thd->query().str); - return false; - } - - if (!ddl_enabled_normal(thd)) { - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); - return true; - } + if (!IS_METADATA_NORMALIZATION()) { + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", thd->query().str); + return false; + } + if (!ddl_enabled_normal(thd)) { + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); + return true; + } - if (thd->lex->query_tables == nullptr && mdl_key->mdl_namespace() != MDL_key::SCHEMA) { - return false; - } + if (thd->lex->query_tables == nullptr && mdl_key->mdl_namespace() != MDL_key::SCHEMA) { + return false; + } - if (mysql_system_db.find(mdl_key->db_name()) != mysql_system_db.end()) { - return false; + if (mysql_system_db.find(mdl_key->db_name()) != mysql_system_db.end()) { + return false; + } } -#endif int ret = 0; tianchi_handler_t tch; @@ -2063,8 +2037,7 @@ static bool tse_notify_exclusive_mdl(THD *thd, const MDL_key *mdl_key, static bool tse_notify_alter_table(THD *thd, const MDL_key *mdl_key, ha_notification_type notification_type) { vector ticket_list; -#ifdef METADATA_NORMALIZED - if (notification_type == HA_NOTIFY_PRE_EVENT) { + if (IS_METADATA_NORMALIZATION() && notification_type == HA_NOTIFY_PRE_EVENT) { int pre_lock_ret = tse_lock_table_pre(thd, ticket_list); if (pre_lock_ret != 0) { tse_lock_table_post(thd, ticket_list); @@ -2072,13 +2045,12 @@ static bool tse_notify_alter_table(THD *thd, const MDL_key *mdl_key, return true; } } -#endif + bool ret = tse_notify_exclusive_mdl(thd, mdl_key, notification_type, nullptr); -#ifdef METADATA_NORMALIZED - if (notification_type == HA_NOTIFY_PRE_EVENT) { + + if (IS_METADATA_NORMALIZATION() && notification_type == HA_NOTIFY_PRE_EVENT) { tse_lock_table_post(thd, ticket_list); } -#endif return ret; } @@ -2131,9 +2103,6 @@ static int tse_rollback_savepoint(handlerton *hton, THD *thd, void *savepoint) { total_csize += sess_ctx->invalid_cursors->size(); } uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); - if (cursors == nullptr) { - tse_log_error("tse_alloc_buf for cursors in tse_rollback_savepoint failed"); - } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); ct_errno_t ret = (ct_errno_t)tse_srv_rollback_savepoint(&tch, cursors, total_csize, name); @@ -4145,12 +4114,12 @@ int ha_tse::external_lock(THD *thd, int lock_type) { out they lock meaning. */ DBUG_TRACE; -#ifdef METADATA_NORMALIZED - if (tse_check_if_log_table(table_share->db.str, table_share->table_name.str)) { + + if (IS_METADATA_NORMALIZATION() && + tse_check_if_log_table(table_share->db.str, table_share->table_name.str)) { is_log_table = true; return 0; } -#endif is_log_table = false; if (engine_ddl_passthru(thd) && (is_create_table_check(thd) || is_alter_table_copy(thd))) { @@ -4741,16 +4710,18 @@ static int tse_init_func(void *p) { // 元数据归一流程初始化下发参天 // 主干非initialize_insecure模式,需要注册共享内存接收线程并等待参天启动完成 - ret = srv_wait_instance_startuped(); - if (ret != 0) { - tse_log_error("wait cantian instance startuped failed:%d", ret); - return HA_ERR_INITIALIZATION; - } - - ret = tse_reg_instance(); - if (ret != 0) { - tse_log_error("[CTC_INIT]:ctc_reg_instance failed:%d", ret); - return HA_ERR_INITIALIZATION; + if (!opt_initialize_insecure || CHECK_HAS_MEMBER(handlerton, get_inst_id)) { + ret = srv_wait_instance_startuped(); + if (ret != 0) { + tse_log_error("wait cantian instance startuped failed:%d", ret); + return HA_ERR_INITIALIZATION; + } + + ret = tse_reg_instance(); + if (ret != 0) { + tse_log_error("[CTC_INIT]:ctc_reg_instance failed:%d", ret); + return HA_ERR_INITIALIZATION; + } } ret = tse_check_tx_isolation(); @@ -5288,44 +5259,18 @@ int ha_tse::initialize_cbo_stats() } m_share->cbo_stats = (tianchi_cbo_stats_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tianchi_cbo_stats_t), MYF(MY_WME)); if (m_share->cbo_stats == nullptr) { -#ifdef WITH_DAAC tse_log_error("alloc shm mem failed, m_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); -#else - tse_log_error("alloc mem failed, m_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); -#endif return ERR_ALLOC_MEMORY; } *m_share->cbo_stats = {0, 0, 0, 0, 0, 0, nullptr, nullptr}; m_share->cbo_stats->tse_cbo_stats_table = (tse_cbo_stats_table_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tse_cbo_stats_table_t), MYF(MY_WME)); - if (m_share->cbo_stats->tse_cbo_stats_table == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_share->cbo_stats->tse_cbo_stats_table(%lu)", sizeof(tse_cbo_stats_table_t)); -#else - tse_log_error("alloc shm mem failed, m_share->cbo_stats->tse_cbo_stats_table(%lu)", sizeof(tse_cbo_stats_table_t)); -#endif - return ERR_ALLOC_MEMORY; - } m_share->cbo_stats->tse_cbo_stats_table->columns = (tse_cbo_stats_column_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->fields * sizeof(tse_cbo_stats_column_t), MYF(MY_WME)); - if (m_share->cbo_stats->tse_cbo_stats_table->columns == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_share->cbo_stats->tse_cbo_stats_table->columns size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); -#else - tse_log_error("alloc shm mem failed, m_share->cbo_stats->tse_cbo_stats_table->columns size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); -#endif - return ERR_ALLOC_MEMORY; - } + m_share->cbo_stats->ndv_keys = (uint32_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS, MYF(MY_WME)); - if (m_share->cbo_stats->ndv_keys == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); -#else - tse_log_error("alloc shm mem failed, m_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); -#endif - return ERR_ALLOC_MEMORY; - } + m_share->cbo_stats->msg_len = table->s->fields * sizeof(tse_cbo_stats_column_t); m_share->cbo_stats->key_len = table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS; return CT_SUCCESS; @@ -5410,11 +5355,7 @@ const Item *ha_tse::cond_push(const Item *cond, bool other_tbls_ok MY_ATTRIBUTE( m_cond = (tse_conds *)tse_alloc_buf(&m_tch, sizeof(tse_conds)); if (m_cond == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); -#else - tse_log_error("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); -#endif + tse_log_warning("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); return remainder; } @@ -5492,11 +5433,7 @@ int ha_tse::engine_push(AQP::Table_access *table_aqp) m_cond = (tse_conds *)tse_alloc_buf(&m_tch, sizeof(tse_conds)); if (m_cond == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); -#else - tse_log_error("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); -#endif + tse_log_warning("alloc shm mem failed, m_cond size(%lu), pushdown cond is null.", sizeof(tse_conds)); return 0; } @@ -5517,41 +5454,3 @@ int ha_tse::engine_push(AQP::Table_access *table_aqp) table_aqp->set_condition(const_cast(m_remainder_conds)); return 0; } - -int is_cantian_run_mode_single() { - // 获取环境变量 RUN_MODE 的值 - const char* run_mode = getenv("RUN_MODE"); - - // 定义有效的单进程模式 - const char* valid_modes[] = { - "cantiand_with_mysql", - "cantiand_with_mysql_st", - "cantiand_with_mysql_in_cluster" - }; - - if (run_mode != NULL) { - // 标志变量,判断是否找到匹配模式 - int found = 0; - - // 遍历有效模式数组,检查 run_mode 是否在其中 - for (size_t i = 0; i < sizeof(valid_modes) / sizeof(valid_modes[0]); i++) { - if (strcmp(run_mode, valid_modes[i]) == 0) { - found = 1; - break; - } - } - - // 如果找到匹配模式,则设置 cantian_cluster_ready 为 true 并返回 SUCCESS - if (found) { - tse_log_system("RUN_MODE %s is single process", run_mode); - return 1; - } else { - tse_log_system("RUN_MODE %s is not single process", run_mode); - return 0; - } - } else { - tse_log_system("RUN_MODE not set"); - return 0; - } - return 0; -} \ No newline at end of file diff --git a/storage/tianchi/ha_tse.h b/storage/tianchi/ha_tse.h index 57ab0ec..b1f2690 100644 --- a/storage/tianchi/ha_tse.h +++ b/storage/tianchi/ha_tse.h @@ -477,7 +477,9 @@ public: #ifdef METADATA_NORMALIZED int write_row(uchar *buf, bool write_through = false) override; -#else +#endif + +#ifndef METADATA_NORMALIZED int write_row(uchar *buf) override; #endif @@ -989,7 +991,6 @@ bool tse_is_temporary(const dd::Table *table_def); int32_t tse_get_cluster_role(); void tse_set_mysql_read_only(); void tse_reset_mysql_read_only(); -int is_cantian_run_mode_single(); #pragma GCC visibility pop -#endif \ No newline at end of file +#endif diff --git a/storage/tianchi/ha_tsepart.cc b/storage/tianchi/ha_tsepart.cc index dfac8a3..7f46d04 100644 --- a/storage/tianchi/ha_tsepart.cc +++ b/storage/tianchi/ha_tsepart.cc @@ -1005,11 +1005,7 @@ int ha_tsepart::initialize_cbo_stats() { m_part_share->cbo_stats = (tianchi_cbo_stats_t*)my_malloc(PSI_NOT_INSTRUMENTED, sizeof(tianchi_cbo_stats_t), MYF(MY_WME)); if (m_part_share->cbo_stats == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_part_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); -#else tse_log_error("alloc shm mem failed, m_part_share->cbo_stats size(%lu)", sizeof(tianchi_cbo_stats_t)); -#endif return ERR_ALLOC_MEMORY; } *m_part_share->cbo_stats = {0, 0, 0, 0, 0, 0, nullptr, nullptr}; @@ -1018,35 +1014,11 @@ int ha_tsepart::initialize_cbo_stats() { m_part_share->cbo_stats->tse_cbo_stats_table = (tse_cbo_stats_table_t*)my_malloc(PSI_NOT_INSTRUMENTED, part_num * sizeof(tse_cbo_stats_table_t), MYF(MY_WME)); - if (m_part_share->cbo_stats->tse_cbo_stats_table == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", part_num * sizeof(tse_cbo_stats_table_t)); -#else - tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", part_num * sizeof(tse_cbo_stats_table_t)); -#endif - return ERR_ALLOC_MEMORY; - } m_part_share->cbo_stats->ndv_keys = (uint32_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS, MYF(MY_WME)); - if (m_part_share->cbo_stats->ndv_keys == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_part_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); -#else - tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->ndv_keys size(%lu)", table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS); -#endif - return ERR_ALLOC_MEMORY; - } for (uint i = 0; i < part_num; i++) { m_part_share->cbo_stats->tse_cbo_stats_table[i].columns = (tse_cbo_stats_column_t*)my_malloc(PSI_NOT_INSTRUMENTED, table->s->fields * sizeof(tse_cbo_stats_column_t), MYF(MY_WME)); - if (m_part_share->cbo_stats->tse_cbo_stats_table[i].columns == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); -#else - tse_log_error("alloc shm mem failed, m_part_share->cbo_stats->tse_cbo_stats_table size(%lu)", table->s->fields * sizeof(tse_cbo_stats_column_t)); -#endif - return ERR_ALLOC_MEMORY; - } } m_part_share->cbo_stats->msg_len = table->s->fields * sizeof(tse_cbo_stats_column_t); m_part_share->cbo_stats->key_len = table->s->keys * sizeof(uint32_t) * MAX_KEY_COLUMNS; @@ -1145,14 +1117,13 @@ int ha_tsepart::repair(THD *thd, HA_CHECK_OPT *) } broadcast_req.options |= TSE_NOT_NEED_CANTIAN_EXECUTE; - ct_errno_t ret = CT_SUCCESS; -#ifdef METADATA_NORMALIZED - ret = (ct_errno_t)ctc_record_sql_for_cantian(&tch, &broadcast_req, false); - assert (ret == CT_SUCCESS); -#else - ret = (ct_errno_t)tse_execute_mysql_ddl_sql(&tch, &broadcast_req, false); - assert (ret == CT_SUCCESS); -#endif + if (IS_METADATA_NORMALIZATION()) { + ct_errno_t ret = (ct_errno_t)ctc_record_sql_for_cantian(&tch, &broadcast_req, false); + assert (ret == CT_SUCCESS); + } else { + ct_errno_t ret = (ct_errno_t)tse_execute_mysql_ddl_sql(&tch, &broadcast_req, false); + assert (ret == CT_SUCCESS); + } return HA_ADMIN_OK; } diff --git a/storage/tianchi/ha_tsepart.h b/storage/tianchi/ha_tsepart.h index 9a73c72..5b6e2e9 100644 --- a/storage/tianchi/ha_tsepart.h +++ b/storage/tianchi/ha_tsepart.h @@ -250,7 +250,8 @@ class ha_tsepart : public ha_tse, void part_autoinc_has_expl_non_null_value_update_row(uchar *new_data); #ifdef METADATA_NORMALIZED int write_row(uchar *record, bool write_through MY_ATTRIBUTE((unused)) = false) override { -#else +#endif +#ifndef METADATA_NORMALIZED int write_row(uchar *record) override { #endif if (table->next_number_field) { diff --git a/storage/tianchi/mysql_daac_plugin.cc b/storage/tianchi/mysql_daac_plugin.cc index bad822d..91c6b10 100644 --- a/storage/tianchi/mysql_daac_plugin.cc +++ b/storage/tianchi/mysql_daac_plugin.cc @@ -38,7 +38,6 @@ #include "sql/sql_plugin.h" // st_plugin_int #include "sql/sql_initialize.h" // opt_initialize_insecure #include "tse_log.h" -#include "ha_tse.h" struct mysql_daac_context { my_thread_handle daac_startup_thread; @@ -60,7 +59,6 @@ static std::string get_cantiand_mode() { if (tmp_mode != NULL && strlen(tmp_mode) > 0) { mode = tmp_mode; } - return mode; } @@ -70,7 +68,6 @@ static std::string get_cantiand_home_dir() { if (tmp_home_dir != NULL && strlen(tmp_home_dir) > 0) { home_dir = tmp_home_dir; } - tse_log_system("get cantiand home_dir:%s", home_dir.c_str()); return home_dir; } @@ -102,22 +99,18 @@ static void *mysql_daac_startup_thread(void *p) { struct mysql_daac_context *daac_context = NULL; int daemon_daac_plugin_init() { DBUG_TRACE; + if (opt_initialize_insecure) { + tse_log_debug("initialize-insecure mode no need start the daac startup thread."); + return 0; + } - // skip this part if single process - if (!is_cantian_run_mode_single()) { - if (opt_initialize_insecure) { - tse_log_warning("initialize-insecure mode no need start the daac startup thread."); - return 0; - } - - const char *se_name = "ctc_ddl_rewriter"; - const LEX_CSTRING name = {se_name, strlen(se_name)}; - if (!plugin_is_ready(name, MYSQL_AUDIT_PLUGIN)) { - tse_log_error("tse_ddl_rewriter plugin install failed."); - return -1; - } + const char *se_name = "ctc_ddl_rewriter"; + const LEX_CSTRING name = {se_name, strlen(se_name)}; + if (!plugin_is_ready(name, MYSQL_AUDIT_PLUGIN)) { + tse_log_error("tse_ddl_rewriter plugin install failed."); + return -1; } - + if (daac_context != NULL) { tse_log_error("daemon_daac_plugin_init daac_context:%p not NULL", daac_context); return 0; @@ -126,14 +119,7 @@ int daemon_daac_plugin_init() { daac_context = (struct mysql_daac_context *)my_malloc( PSI_NOT_INSTRUMENTED, sizeof(struct mysql_daac_context), MYF(0)); - if (daac_context == nullptr) { -#ifdef WITH_DAAC - tse_log_error("alloc mem failed, daac_context size(%lu)", sizeof(struct mysql_daac_context)); -#else - tse_log_error("alloc shm mem failed, daac_context size(%lu)", sizeof(struct mysql_daac_context)); -#endif - return -1; - } + my_thread_attr_t startup_attr; /* Thread attributes */ my_thread_attr_init(&startup_attr); my_thread_attr_setdetachstate(&startup_attr, MY_THREAD_CREATE_JOINABLE); diff --git a/storage/tianchi/tse_ddl_rewriter_plugin.cc b/storage/tianchi/tse_ddl_rewriter_plugin.cc index 549d916..ee61f00 100644 --- a/storage/tianchi/tse_ddl_rewriter_plugin.cc +++ b/storage/tianchi/tse_ddl_rewriter_plugin.cc @@ -445,7 +445,7 @@ static bool tse_check_ddl_local_enable(string sql_str, bool &need_forwar) { return false; } -#ifdef METADATA_NORMALIZED + static uint32_t tse_set_var_option(bool is_null_value, bool is_set_default_value, set_var *setvar) { uint32_t options = 0; @@ -463,8 +463,7 @@ static uint32_t tse_set_var_option(bool is_null_value, bool is_set_default_value } return options; } -#endif -#ifdef METADATA_NORMALIZED + static int tse_set_var_meta(MYSQL_THD thd, uint32_t options, const char* base_name, string var_name, string var_value) { tianchi_handler_t tch; @@ -490,7 +489,6 @@ static int tse_set_var_meta(MYSQL_THD thd, uint32_t options, const char* base_na return ret; } -#endif static int tse_get_variables_value_string(MYSQL_THD thd, string &sql_str, set_var* setvar, string& val_str, bool& is_null_value, bool &need_forward) { @@ -576,9 +574,7 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) var_it.rewind(); while ((var = var_it++)) { set_var *setvar = dynamic_cast(var); -#ifdef METADATA_NORMALIZED bool is_set_default_value = false; -#endif bool is_null_value = false; if (setvar && setvar->var) { need_forward = !setvar->var->is_readonly() && setvar->is_global_persist(); @@ -586,9 +582,7 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) if (!contain_subselect) { /* get user value (@xxxxx) as string */ if (!setvar->value) { -#ifdef METADATA_NORMALIZED is_set_default_value = true; -#endif val_str = ""; } else { ret = tse_get_variables_value_string(thd, sql_str, setvar, val_str, is_null_value, need_forward); @@ -604,8 +598,8 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "Set global variable query is not allowed (ctc_setopt_disabled = true)"); return -1; } -#ifdef METADATA_NORMALIZED - if(!contain_subselect && need_forward && setvar) { + + if(IS_METADATA_NORMALIZATION() && !contain_subselect && need_forward && setvar) { if (setvar->check(thd) == 0) { uint32_t options = tse_set_var_option(is_null_value, is_set_default_value, setvar); ret = tse_set_var_meta(thd, options, setvar->base.str, name_str, val_str); @@ -614,17 +608,15 @@ static int tse_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) need_forward = false; // 值校验失败, ctc不进行广播并返回成功, 后续报错由MySQL完成 } } -#endif + tse_log_system("set option %s, need_forward: %d", sql_str.c_str(), need_forward); } -#ifdef METADATA_NORMALIZED - if (!contain_subselect) { + if (IS_METADATA_NORMALIZATION() && !contain_subselect) { need_forward = false; } -#endif return ret; } -#ifndef METADATA_NORMALIZED + static int is_system_db(const char *ddl_db) { if (mysql_system_db.find(ddl_db) != mysql_system_db.end()) { my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), @@ -634,7 +626,6 @@ static int is_system_db(const char *ddl_db) { return 0; } -#endif static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { need_forward = false; // broadcast by storage engine @@ -658,18 +649,18 @@ static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { return -1; } -#ifndef METADATA_NORMALIZED - // create like table 检查是否是系统库 - if (thd->lex->query_tables != nullptr && - thd->lex->query_tables->next_global != nullptr && - thd->lex->create_info != nullptr && - thd->lex->create_info->options & HA_LEX_CREATE_TABLE_LIKE && - !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE) && - !thd->lex->drop_temporary) { - const char *ddl_db = thd->lex->query_tables->next_global->db; - return is_system_db(ddl_db); + if (!IS_METADATA_NORMALIZATION()) { + // create like table 检查是否是系统库 + if (thd->lex->query_tables != nullptr && + thd->lex->query_tables->next_global != nullptr && + thd->lex->create_info != nullptr && + thd->lex->create_info->options & HA_LEX_CREATE_TABLE_LIKE && + !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE) && + !thd->lex->drop_temporary) { + const char *ddl_db = thd->lex->query_tables->next_global->db; + return is_system_db(ddl_db); + } } -#endif // create tablespace 检查是否为engine=Innodb情况 if (thd->lex->sql_command == SQLCOM_ALTER_TABLESPACE) { @@ -683,15 +674,15 @@ static int tse_check_ddl_engine(string &, MYSQL_THD thd, bool &need_forward) { } } -#ifndef METADATA_NORMALIZED - // create表 && drop表/库 (检查是否是系统库上ddl) - if (thd->lex->query_tables != nullptr && - (thd->lex->create_info != nullptr && !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE)) && - !thd->lex->drop_temporary) { - const char *ddl_db = thd->lex->query_tables->db; - return is_system_db(ddl_db); + if (!IS_METADATA_NORMALIZATION()) { + // create表 && drop表/库 (检查是否是系统库上ddl) + if (thd->lex->query_tables != nullptr && + (thd->lex->create_info != nullptr && !(thd->lex->create_info->options & HA_LEX_CREATE_TMP_TABLE)) && + !thd->lex->drop_temporary) { + const char *ddl_db = thd->lex->query_tables->db; + return is_system_db(ddl_db); + } } -#endif return 0; } @@ -767,9 +758,7 @@ static int tse_lock_tables_ddl(string &, MYSQL_THD thd, bool &) { (int32_t)TL_UNLOCK}; FILL_USER_INFO_WITH_THD(lock_info, thd); strncpy(lock_info.db_name, table->db, SMALL_RECORD_SIZE); - lock_info.db_name[SMALL_RECORD_SIZE - 1] = '\0'; strncpy(lock_info.table_name, table->table_name, SMALL_RECORD_SIZE); - lock_info.table_name[SMALL_RECORD_SIZE - 1] = '\0'; ret = tse_unlock_table(&tch, ctc_instance_id, &lock_info); if (ret != 0) { tse_log_error("[TSE_DDL_REWRITE]:unlock table failed, table:%s.%s", lock_info.db_name, lock_info.table_name); @@ -1134,31 +1123,31 @@ bool plugin_ddl_block(MYSQL_THD thd, if (!need_forward) { return false; } -#ifdef METADATA_NORMALIZED - if (!is_dcl_sql_cmd(thd->lex->sql_command)) { + + if (IS_METADATA_NORMALIZATION() && !is_dcl_sql_cmd(thd->lex->sql_command)) { if (ctc_record_sql(thd, broadcast_cmd.need_select_db)) { tse_log_error("[CTC_META_SQL]:record sql str failed. sql:%s", query_str.c_str()); return true; } } -#endif -#ifndef METADATA_NORMALIZED - if (engine_skip_ddl(thd)) { - tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", query_str.c_str()); - return false; - } - // disallow ddl query if ctc_concurrent_ddl=OFF and tse_enable_ddl not set - if (!ddl_enabled_normal(thd)) { - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); - return true; + if (!IS_METADATA_NORMALIZATION()) { + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", query_str.c_str()); + return false; + } + // disallow ddl query if ctc_concurrent_ddl=OFF and tse_enable_ddl not set + if (!ddl_enabled_normal(thd)) { + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); + return true; + } + + return check_agent_connection(thd); } - return check_agent_connection(thd); -#endif return false; } -#ifndef METADATA_NORMALIZED + // due to MDL_key::BACKUP_LOCK`s MDL_INTENTION_EXCLUSIVE comflicts with MDL_key::BACKUP_LOCK`s MDL_SHARED (user execute STMT `lock instance for backup`) static bool tse_is_instance_locked_by_backup(MYSQL_THD thd) { MDL_request mdl_request; @@ -1177,9 +1166,7 @@ static bool tse_is_instance_locked_by_backup(MYSQL_THD thd) { return false; } } -#endif -#ifndef METADATA_NORMALIZED static bool tse_is_have_global_read_lock(MYSQL_THD thd) { // check if current connetion hold global read lock, let it go if (thd->global_read_lock.is_acquired()) { @@ -1193,11 +1180,11 @@ static bool tse_is_have_global_read_lock(MYSQL_THD thd) { return false; } -#endif + static inline bool tse_is_broadcast_by_storage_engine(ddl_broadcast_cmd broadcast_cmd) { return broadcast_cmd.pre_func == tse_check_ddl || broadcast_cmd.pre_func == tse_check_ddl_engine; } -#ifndef METADATA_NORMALIZED + static bool tse_is_set_session_var(MYSQL_THD thd, string &query_str) { if (thd->lex->sql_command != SQLCOM_SET_OPTION) { return false; @@ -1224,7 +1211,6 @@ static bool tse_is_set_session_var(MYSQL_THD thd, string &query_str) { return false; } -#endif static int tse_check_metadata_switch() { metadata_switchs metadata_switch = (metadata_switchs)tse_get_metadata_switch(); @@ -1289,10 +1275,7 @@ static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, } } else if (sql_cmd == SQLCOM_UNLOCK_INSTANCE) { tse_check_unlock_instance(thd); -#ifdef METADATA_NORMALIZED - } -#else - } else if ((need_forward || tse_is_broadcast_by_storage_engine(it->second))) { + } else if (!IS_METADATA_NORMALIZATION() && (need_forward || tse_is_broadcast_by_storage_engine(it->second))) { // block ddl when instance has exclusive backup lock (LOCK INSTANCE FOR BACKUP), ref sql_backup_lock.cc if (tse_is_instance_locked_by_backup(thd)) { @@ -1312,7 +1295,7 @@ static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, return -1; } } -#endif + ddl_broadcast_cmd broadcast_cmd = it->second; return need_forward && ddl_broadcast_and_wait(thd, query_str, (uint8_t)sql_cmd, broadcast_cmd); // 0: success other: fail } diff --git a/storage/tianchi/tse_mysql_proxy.cc b/storage/tianchi/tse_mysql_proxy.cc index 0937dde..3bc2e38 100644 --- a/storage/tianchi/tse_mysql_proxy.cc +++ b/storage/tianchi/tse_mysql_proxy.cc @@ -367,12 +367,8 @@ __attribute__((visibility("default"))) int tse_ddl_execute_update(uint32_t thd_i tse_log_note("tse_ddl_execute_update curnode not need execute,mysql_inst_id:%u", broadcast_req->mysql_inst_id); return 0; } -#ifdef METADATA_NORMALIZED - bool is_meta_normalization = true; -#else - bool is_meta_normalization = false; -#endif + bool is_meta_normalization = IS_METADATA_NORMALIZATION(); if (is_meta_normalization && broadcast_req->sql_command != SQLCOM_SET_OPTION) { return 0; } else if (is_meta_normalization && broadcast_req->sql_command == SQLCOM_SET_OPTION @@ -510,16 +506,17 @@ int32_t tse_check_table_exist(MYSQL *curr_conn_proxy, const char *db_name, const } __attribute__((visibility("default"))) int tse_ddl_execute_lock_tables(tianchi_handler_t *tch, char *db_name, tse_lock_table_info *lock_info, int *err_code) { -#ifdef METADATA_NORMALIZED - if (lock_info->sql_type == SQLCOM_LOCK_TABLES) { - if (tse_ddl_execute_lock_tables_by_req(tch, lock_info, err_code)) { + + if (IS_METADATA_NORMALIZATION()) { + if (lock_info->sql_type == SQLCOM_LOCK_TABLES) { + if (tse_ddl_execute_lock_tables_by_req(tch, lock_info, err_code)) { + return *err_code; + } + } else if (tse_mdl_lock_thd(tch, lock_info, err_code)) { return *err_code; } - } else if (tse_mdl_lock_thd(tch, lock_info, err_code)) { - return *err_code; + return 0; } - return 0; -#endif bool is_same_node = (tch->inst_id == ctc_instance_id); uint64_t conn_map_key = tse_get_conn_key(tch->inst_id, tch->thd_id, !is_same_node); @@ -604,17 +601,14 @@ __attribute__((visibility("default"))) int tse_ddl_execute_lock_tables(tianchi_h __attribute__((visibility("default"))) int tse_ddl_execute_unlock_tables(tianchi_handler_t *tch, uint32_t mysql_inst_id, tse_lock_table_info *lock_info) { -#ifdef METADATA_NORMALIZED - UNUSED_PARAM(mysql_inst_id); - if (lock_info->sql_type == SQLCOM_UNLOCK_TABLES) { - tse_mdl_unlock_tables_thd(tch); + if (IS_METADATA_NORMALIZATION()) { + UNUSED_PARAM(mysql_inst_id); + if (lock_info->sql_type == SQLCOM_UNLOCK_TABLES) { + tse_mdl_unlock_tables_thd(tch); + } + tse_mdl_unlock_thd(tch, lock_info); + return 0; } - tse_mdl_unlock_thd(tch, lock_info); - return 0; -#else - (void)mysql_inst_id; - (void)lock_info; -#endif bool is_same_node = (tch->inst_id == ctc_instance_id); uint64_t conn_map_key = tse_get_conn_key(tch->inst_id, tch->thd_id, !is_same_node); @@ -678,10 +672,10 @@ __attribute__((visibility("default"))) int tse_ddl_execute_unlock_tables(tianchi * 低16位全为1代表整个参天节点故障,清理与参天实例id相关的资源 */ __attribute__((visibility("default"))) int close_mysql_connection(uint32_t thd_id, uint32_t mysql_inst_id) { -#ifdef METADATA_NORMALIZED - close_tse_mdl_thd(thd_id, mysql_inst_id); - return 0; -#endif + if (IS_METADATA_NORMALIZATION()) { + close_tse_mdl_thd(thd_id, mysql_inst_id); + return 0; + } if (thd_id == 0) { if ((uint16_t)mysql_inst_id == (uint16_t)CANTIAN_DOWN_MASK) { diff --git a/storage/tianchi/tse_srv_mq_module.cc b/storage/tianchi/tse_srv_mq_module.cc index a3ef3a2..de6548e 100644 --- a/storage/tianchi/tse_srv_mq_module.cc +++ b/storage/tianchi/tse_srv_mq_module.cc @@ -298,11 +298,7 @@ static void tse_log_reg_error_by_code(int error_code) { switch(error_code) { case ERR_CONNECTION_FAILED: -#ifdef WITH_DAAC - tse_log_error("connection failed"); -#else tse_log_error("shm connection failed"); -#endif break; case REG_MISMATCH_CTC_VERSION: tse_log_error("CTC client version mismatch server!"); -- Gitee