diff --git a/storage/ctc/ctc_mysql_proxy.cc b/storage/ctc/ctc_mysql_proxy.cc index 92a568a66178dcc3c92625dc309ba44f9c8d1d8e..a7546fc28569bdbdd1e8199d4814b36386404dd6 100644 --- a/storage/ctc/ctc_mysql_proxy.cc +++ b/storage/ctc/ctc_mysql_proxy.cc @@ -52,6 +52,8 @@ using namespace std; +__attribute__((visibility("default"))) mutex m_ctc_cluster_role_mutex; +__attribute__((visibility("default"))) int32_t ctc_cluster_role = (int32_t)dis_cluster_role::DEFAULT; struct ctc_mysql_conn_info { MYSQL* conn; set> table_lock_info; // 连接上已存在的表锁 (db, table) @@ -455,6 +457,34 @@ __attribute__((visibility("default"))) int ctc_ddl_execute_set_opt(uint32_t thd_ return ret; } +__attribute__((visibility("default"))) void ctc_set_mysql_read_only() { + ctc_log_system("[Disaster Recovecy] starting or initializing"); + super_read_only = true; + read_only = true; + opt_readonly = true; + ctc_log_system("[Disaster Recovery] set super_read_only = true."); +} + +__attribute__((visibility("default"))) void ctc_reset_mysql_read_only() { + ctc_log_system("[Disaster Recovecy] starting or initializing"); + super_read_only = false; + read_only = false; + opt_readonly = false; + ctc_log_system("[Disaster Recovery] set super_read_only = false."); +} + +__attribute__((visibility("default"))) int ctc_set_cluster_role_by_cantian(bool is_slave) { + lock_guard lock(m_ctc_mysql_proxy_mutex); + if (is_slave) { + ctc_cluster_role = (int32_t)dis_cluster_role::STANDBY; + ctc_set_mysql_read_only(); + } else { + ctc_cluster_role = (int32_t)dis_cluster_role::PRIMARY; + ctc_reset_mysql_read_only(); + } + return 0; +} + static int ctc_ddl_get_lock(MYSQL *curr_conn, const uint64_t &conn_map_key, const char *lock_name, int *err_code) { uchar digest[MD5_HASH_SIZE]; compute_md5_hash(pointer_cast(digest), lock_name, strlen(lock_name)); diff --git a/storage/ctc/ctc_srv.h b/storage/ctc/ctc_srv.h index 9918bd9821a2a304205ead94befc0f4880a463a4..87e72ba37b3e14ab3b47f18982ab7cb6f93d4933 100644 --- a/storage/ctc/ctc_srv.h +++ b/storage/ctc/ctc_srv.h @@ -20,7 +20,7 @@ #include #include - +#include "sql/tztime.h" #ifdef __cplusplus extern "C" { #endif @@ -38,6 +38,8 @@ extern "C" { #define ERROR_MESSAGE_LEN 512 #define MAX_DDL_SQL_LEN_CONTEXT (63488) // 62kb, 预留2kb #define MAX_DDL_SQL_LEN (MAX_DDL_SQL_LEN_CONTEXT + 30) // ddl sql语句的长度 不能超过64kb, 超过了会报错 +#define MAX_DML_SQL_LEN_CONTEXT (8192) +#define MAX_DML_SQL_LEN (MAX_DML_SQL_LEN_CONTEXT + 30) // dml sql语句的长度 不能超过8000, 超过了会截断 #define DD_BROADCAST_RECORD_LENGTH (3072) #define LOCK_TABLE_SQL_FMT_LEN 20 #define MAX_LOCK_TABLE_NAME (MAX_DDL_SQL_LEN - LOCK_TABLE_SQL_FMT_LEN) @@ -308,6 +310,8 @@ enum CTC_FUNC_TYPE { CTC_FUNC_TYPE_SCAN_RECORDS, CTC_FUNC_TYPE_TRX_COMMIT, CTC_FUNC_TYPE_TRX_ROLLBACK, + CTC_FUNC_TYPE_STATISTIC_BEGIN, + CTC_FUNC_TYPE_STATISTIC_COMMIT, CTC_FUNC_TYPE_TRX_BEGIN, CTC_FUNC_TYPE_LOCK_TABLE, CTC_FUNC_TYPE_UNLOCK_TABLE, @@ -651,8 +655,10 @@ int ctc_general_prefetch(ctc_handler_t *tch, uint8_t *records, uint16_t *record_ int ctc_free_session_cursors(ctc_handler_t *tch, uint64_t *cursors, int32_t csize); /* Transaction Related Interface */ -int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mysql_local); -int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *is_ddl_commit); +int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mysql_local, struct timeval begin_time, bool *enable_stat); +int ctc_statistic_begin(ctc_handler_t *tch,struct timeval begin_time, bool *enable_stat); +int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *is_ddl_commit, char *sql_str); +int ctc_statistic_commit(ctc_handler_t *tch,char *sql_str); int ctc_trx_rollback(ctc_handler_t *tch, uint64_t *cursors, int32_t csize); int ctc_srv_set_savepoint(ctc_handler_t *tch, const char *name); @@ -719,6 +725,8 @@ int ctc_broadcast_mysql_dd_invalidate(ctc_handler_t *tch, ctc_invalidate_broadca /* Disaster Recovery Related Interface*/ int ctc_set_cluster_role_by_cantian(bool is_slave); +void ctc_set_mysql_read_only(); +void ctc_reset_mysql_read_only(); int ctc_record_sql_for_cantian(ctc_handler_t *tch, ctc_ddl_broadcast_request *broadcast_req, bool allow_fail); diff --git a/storage/ctc/ctc_srv_mq_stub.cc b/storage/ctc/ctc_srv_mq_stub.cc index cce5a05e8bc18fb645356a0ff85f7db473826d47..7a90b4203bf5fb9f0175b1b994aeee032c6e08fd 100644 --- a/storage/ctc/ctc_srv_mq_stub.cc +++ b/storage/ctc/ctc_srv_mq_stub.cc @@ -526,7 +526,7 @@ int ctc_index_read(ctc_handler_t *tch, record_info_t *record_info, index_key_inf return result; } -int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mysql_local) { +int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mysql_local, struct timeval begin_time, bool *enable_stat) { void *shm_inst = get_one_shm_inst(tch); trx_begin_request *req = (trx_begin_request*)alloc_share_mem(shm_inst, sizeof(trx_begin_request)); if (req == NULL) { @@ -536,7 +536,8 @@ int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mys req->tch = *tch; req->trx_context = trx_context; req->is_mysql_local = is_mysql_local; - + req->begin_time = begin_time; + req->enable_stat = *enable_stat; int result = ERR_CONNECTION_FAILED; int ret = ctc_mq_deal_func(shm_inst, CTC_FUNC_TYPE_TRX_BEGIN, req, tch->msg_buf); *tch = req->tch; // 此处不管参天处理成功与否,都需要拷贝一次,避免session泄漏 @@ -546,8 +547,27 @@ int ctc_trx_begin(ctc_handler_t *tch, ctc_trx_context_t trx_context, bool is_mys free_share_mem(shm_inst, req); return result; } +int ctc_statistic_begin(ctc_handler_t *tch,struct timeval begin_time, bool *enable_stat) { + void *shm_inst = get_one_shm_inst(tch); + trx_begin_request *req = (trx_begin_request*)alloc_share_mem(shm_inst, sizeof(trx_begin_request)); + if (req == NULL) { + ctc_log_error("alloc shm mem error, shm_inst(%p), size(%lu)", shm_inst, sizeof(trx_begin_request)); + return ERR_ALLOC_MEMORY; + } + req->tch = *tch; + req->begin_time = begin_time; + req->enable_stat = *enable_stat; + int result = ERR_CONNECTION_FAILED; + int ret = ctc_mq_deal_func(shm_inst, CTC_FUNC_TYPE_STATISTIC_BEGIN, req, tch->msg_buf); + *tch = req->tch; // 此处不管参天处理成功与否,都需要拷贝一次,避免session泄漏 + if (ret == CT_SUCCESS) { + result = req->result; + } + free_share_mem(shm_inst, req); + return result; +} -int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *is_ddl_commit) { +int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *is_ddl_commit, char *sql_str) { void *shm_inst = get_one_shm_inst(tch); trx_commit_request *req = (trx_commit_request*)alloc_share_mem(shm_inst, sizeof(trx_commit_request)); if (req == NULL) { @@ -557,7 +577,7 @@ int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *i req->tch = *tch; req->csize = csize; req->cursors = cursors; - + strncpy(req->sql, sql_str, MAX_DML_SQL_LEN - 1); int result = ERR_CONNECTION_FAILED; int ret = ctc_mq_deal_func(shm_inst, CTC_FUNC_TYPE_TRX_COMMIT, req, tch->msg_buf); *is_ddl_commit = req->is_ddl_commit; @@ -567,7 +587,23 @@ int ctc_trx_commit(ctc_handler_t *tch, uint64_t *cursors, int32_t csize, bool *i free_share_mem(shm_inst, req); return result; } - +int ctc_statistic_commit(ctc_handler_t *tch,char *sql_str) { + void *shm_inst = get_one_shm_inst(tch); + trx_commit_request *req = (trx_commit_request*)alloc_share_mem(shm_inst, sizeof(trx_commit_request)); + if (req == NULL) { + ctc_log_error("alloc shm mem error, shm_inst(%p), size(%lu)", shm_inst, sizeof(trx_commit_request)); + return ERR_ALLOC_MEMORY; + } + req->tch = *tch; + strncpy(req->sql, sql_str, MAX_DML_SQL_LEN - 1); + int result = ERR_CONNECTION_FAILED; + int ret = ctc_mq_deal_func(shm_inst, CTC_FUNC_TYPE_STATISTIC_COMMIT, req, tch->msg_buf); + if (ret == CT_SUCCESS) { + result = req->result; + } + free_share_mem(shm_inst, req); + return result; +} int ctc_trx_rollback(ctc_handler_t *tch, uint64_t *cursors, int32_t csize) { void *shm_inst = get_one_shm_inst(tch); trx_rollback_request *req = (trx_rollback_request*)alloc_share_mem(shm_inst, sizeof(trx_rollback_request)); @@ -1857,7 +1893,7 @@ int ctc_get_sample_size(uint32_t *sample_size) ctc_log_error("ctc_mq_deal_func CTC_FUNC_TYPE_GET_SAMPLE_SIZE failed"); } *sample_size = *req; - ctc_log_error("[ctc_get_sample_size] size(%u)", *sample_size); + ctc_log_system("[ctc_get_sample_size] size(%u)", *sample_size); free_share_mem(shm_inst, req); return res; diff --git a/storage/ctc/ha_ctc.cc b/storage/ctc/ha_ctc.cc index 722771d9ad94a7b397819b2247236bc256607491..8fc41a6e9b4e4563042fc041ee0bfe39475eda05 100644 --- a/storage/ctc/ha_ctc.cc +++ b/storage/ctc/ha_ctc.cc @@ -72,12 +72,12 @@ Happy coding!
-Brian */ - +#include "my_systime.h" +#include "my_time.h" #include "ha_ctc.h" #include "ha_ctc_ddl.h" #include "ha_ctcpart.h" #include "ha_ctc_pq.h" - #include #include #include @@ -143,10 +143,10 @@ * SYSTEM VARIABLES CAN BE DISPLAYED AS: * mysql> SHOW GLOBAL VARIABLES like'%ctc%' */ - #define CTC_MAX_SAMPLE_SIZE (4096) // MB #define CTC_MIN_SAMPLE_SIZE (32) // MB #define CTC_DEFAULT_SAMPLE_SIZE (128) // MB +bool enable_stat = true; static void ctc_statistics_enabled_update(THD * thd, SYS_VAR *, void *var_ptr, const void *save) { bool enabled = *static_cast(var_ptr) = *static_cast(save); @@ -180,8 +180,8 @@ int32_t ctc_metadata_normalization = (int32_t)metadata_switchs::DEFAULT; static MYSQL_SYSVAR_INT(metadata_normalization, ctc_metadata_normalization, PLUGIN_VAR_READONLY, "Option for Mysql-Cantian metadata normalization.", nullptr, nullptr, -1, -1, 3, 0); -static mutex m_ctc_cluster_role_mutex; -int32_t ctc_cluster_role = (int32_t)dis_cluster_role::DEFAULT; +extern int32_t ctc_cluster_role; +extern mutex m_ctc_cluster_role_mutex; static MYSQL_SYSVAR_INT(cluster_role, ctc_cluster_role, PLUGIN_VAR_READONLY, "flag for Disaster Recovery Cluster Role.", nullptr, nullptr, -1, -1, 2, 0); @@ -1331,7 +1331,9 @@ static int ctc_start_trx_and_assign_scn( uint32_t lock_wait_timeout = THDVAR(thd, lock_wait_timeout); ctc_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, false}; bool is_mysql_local = (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED); - ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&tch, trx_context, is_mysql_local); + struct timeval begin_time; + gettimeofday(&begin_time, NULL); + ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&tch, trx_context, is_mysql_local, begin_time, &enable_stat); update_sess_ctx_by_tch(tch, hton, thd); if (ret != CT_SUCCESS) { ctc_log_error("start trx failed with error code: %d", ret); @@ -1475,6 +1477,33 @@ static void ctc_free_cursors_no_autocommit(THD *thd, ctc_handler_t *tch, thd_ses ctc_free_buf(tch, (uint8_t *)cursors); } +// bool is_dml_sql_cmd(enum_sql_command sql_cmd) { + +// if (sql_cmd == SQLCOM_SELECT || sql_cmd == SQLCOM_UPDATE || +// sql_cmd == SQLCOM_INSERT || sql_cmd == SQLCOM_INSERT_SELECT || +// sql_cmd == SQLCOM_DELETE || sql_cmd == SQLCOM_REPLACE || +// sql_cmd == SQLCOM_REPLACE_SELECT || sql_cmd == SQLCOM_DO +// ) { +// return true; +// } + +// return false; +// } + +bool isDMLCommand(const String& sql) { + if (sql.length() < 6) { + return false; + } + String prefix(sql.ptr(), 6, sql.charset()); + for (size_t i = 0; i < 6; ++i) { + prefix[i] = tolower(prefix[i]); + } + return strncmp(prefix.ptr(), "select", 6) == 0 || + strncmp(prefix.ptr(), "insert", 6) == 0 || + strncmp(prefix.ptr(), "delete", 6) == 0 || + strncmp(prefix.ptr(), "update", 6) == 0; +} + /** Commits a transaction in an ctc database or marks an SQL statement ended. @param: hton in, ctc handlerton @@ -1500,7 +1529,13 @@ static int ctc_commit(handlerton *hton, THD *thd, bool commit_trx) { ct_errno_t ret = CT_SUCCESS; thd_sess_ctx_s *sess_ctx = (thd_sess_ctx_s *)thd_get_ha_data(thd, hton); assert(sess_ctx != nullptr); - + // bool is_dmlsql = is_dml_sql_cmd(thd->lex->sql_command); + bool is_dmlsql = false; + if (thd->query().str != NULL && thd->query().length > 0) { + string dml_sql = string(thd->query().str).substr(0, thd->query().length); + String dml_sql_string(dml_sql.c_str(), dml_sql.length(), thd->charset()); + is_dmlsql = isDMLCommand(dml_sql_string); + } if (will_commit) { commit_preprocess(thd, &tch); attachable_trx_update_pre_addr(thd, &tch, true); @@ -1515,7 +1550,11 @@ static int ctc_commit(handlerton *hton, THD *thd, bool commit_trx) { } assert((total_csize == 0) ^ (cursors != nullptr)); ctc_copy_cursors_to_free(sess_ctx, cursors, 0); - ret = (ct_errno_t)ctc_trx_commit(&tch, cursors, total_csize, &is_ddl_commit); + char sql_str[MAX_DML_SQL_LEN] = ""; + if (is_dmlsql && enable_stat) { + strncpy(sql_str, thd->query().str, MAX_DML_SQL_LEN - 1); + } + ret = (ct_errno_t)ctc_trx_commit(&tch, cursors, total_csize, &is_ddl_commit, sql_str); ctc_free_buf(&tch, (uint8_t *)cursors); if (ret != CT_SUCCESS) { ctc_log_error("commit atomic ddl failed with error code: %d", ret); @@ -1540,6 +1579,16 @@ static int ctc_commit(handlerton *hton, THD *thd, bool commit_trx) { } sess_ctx->is_ctc_trx_begin = 0; } else { + char sql_str[MAX_DML_SQL_LEN] = ""; + if (is_dmlsql && enable_stat) { + strncpy(sql_str, thd->query().str, MAX_DML_SQL_LEN - 1); + ret = (ct_errno_t)ctc_statistic_commit(&tch,sql_str); + if (ret != CT_SUCCESS) { + ctc_log_error("commit statistic failed with error code: %d", ret); + END_RECORD_STATS(EVENT_TYPE_COMMIT) + return convert_ctc_error_code_to_mysql(ret); + } + } ctc_free_cursors_no_autocommit(thd, &tch, sess_ctx); } @@ -4203,7 +4252,8 @@ int ha_ctc::external_lock(THD *thd, int lock_type) { */ int ha_ctc::start_stmt(THD *thd, thr_lock_type) { DBUG_TRACE; - + struct timeval begin_time; + gettimeofday(&begin_time, NULL); trans_register_ha(thd, false, ht, nullptr); // register trans to STMT update_member_tch(m_tch, ctc_hton, thd, false); @@ -4226,6 +4276,11 @@ int ha_ctc::start_stmt(THD *thd, thr_lock_type) { if (sess_ctx->is_ctc_trx_begin) { assert(m_tch.sess_addr != INVALID_VALUE64); assert(m_tch.thd_id == thd->thread_id()); + ct_errno_t ret = (ct_errno_t)ctc_statistic_begin(&m_tch,begin_time, &enable_stat); + if (ret != CT_SUCCESS) { + ctc_log_error("start statistic failed with error code: %d", ret); + return convert_ctc_error_code_to_mysql(ret); + } return 0; } @@ -4234,10 +4289,8 @@ int ha_ctc::start_stmt(THD *thd, thr_lock_type) { int isolation_level = isolation_level_to_cantian(thd_get_trx_isolation(thd)); ctc_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, m_select_lock == lock_mode::EXCLUSIVE_LOCK}; - bool is_mysql_local = (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED); - ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&m_tch, trx_context, is_mysql_local); - + ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&m_tch, trx_context, is_mysql_local, begin_time, &enable_stat); check_error_code_to_mysql(ha_thd(), &ret); update_sess_ctx_by_tch(m_tch, ctc_hton, thd); @@ -4398,34 +4451,6 @@ static bool ctc_show_status(handlerton *, THD *thd, stat_print_fn *stat_print, e return false; } -void ctc_set_mysql_read_only() { - ctc_log_system("[Disaster Recovecy] starting or initializing"); - super_read_only = true; - read_only = true; - opt_readonly = true; - ctc_log_system("[Disaster Recovery] set super_read_only = true."); -} - -void ctc_reset_mysql_read_only() { - ctc_log_system("[Disaster Recovecy] starting or initializing"); - super_read_only = false; - read_only = false; - opt_readonly = false; - ctc_log_system("[Disaster Recovery] set super_read_only = false."); -} - -__attribute__((visibility("default"))) int ctc_set_cluster_role_by_cantian(bool is_slave) { - lock_guard lock(m_ctc_cluster_role_mutex); - if (is_slave) { - ctc_cluster_role = (int32_t)dis_cluster_role::STANDBY; - ctc_set_mysql_read_only(); - } else { - ctc_cluster_role = (int32_t)dis_cluster_role::PRIMARY; - ctc_reset_mysql_read_only(); - } - return 0; -} - bool is_single_run_mode() { #ifndef WITH_CANTIAN diff --git a/storage/ctc/ha_ctc.h b/storage/ctc/ha_ctc.h index 141126fed0c6cf9f019c5c55f5c12d2317e87c15..d3e693a3ac333f996c555a0a91902740e7adf425 100644 --- a/storage/ctc/ha_ctc.h +++ b/storage/ctc/ha_ctc.h @@ -22,6 +22,7 @@ #include #include +#include "sql/tztime.h" #include "my_inttypes.h" #include "sql/handler.h" #include "sql/table.h" @@ -1106,8 +1107,6 @@ bool is_starting(); bool ctc_is_temporary(const dd::Table *table_def); int32_t ctc_get_cluster_role(); -void ctc_set_mysql_read_only(); -void ctc_reset_mysql_read_only(); int alloc_str_mysql_mem(ctc_cbo_stats_t *cbo_stats, uint32_t part_num, TABLE *table); void free_columns_cbo_stats(ctc_cbo_stats_column_t *ctc_cbo_stats_columns, bool *is_str_first_addr, TABLE *table); diff --git a/storage/ctc/srv_mq_msg.h b/storage/ctc/srv_mq_msg.h index 60ad2787dc020ede1bcbc025a8cae59a28f5ce20..d08876cf534c4d77c2aa538982ddb29012f37652 100644 --- a/storage/ctc/srv_mq_msg.h +++ b/storage/ctc/srv_mq_msg.h @@ -168,6 +168,8 @@ struct trx_begin_request { int result; ctc_trx_context_t trx_context; bool is_mysql_local; + struct timeval begin_time; + bool enable_stat; }; struct trx_commit_request { @@ -176,6 +178,7 @@ struct trx_commit_request { bool is_ddl_commit; int32_t csize; uint64_t *cursors; + char sql[MAX_DML_SQL_LEN]; }; struct trx_rollback_request {