From 172e2f888bdfbc9d543fd5696be2495d3dcd65c7 Mon Sep 17 00:00:00 2001 From: Jiachen1018 Date: Fri, 23 Feb 2024 17:59:37 +0800 Subject: [PATCH] read only --- storage/tianchi/ha_tse.cc | 45 +++++++++++++++++------------- storage/tianchi/srv_mq_msg.h | 2 ++ storage/tianchi/tse_srv.h | 2 +- storage/tianchi/tse_srv_mq_stub.cc | 5 ++-- storage/tianchi/tse_util.cc | 30 ++++++++++++++++++++ storage/tianchi/tse_util.h | 1 + 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/storage/tianchi/ha_tse.cc b/storage/tianchi/ha_tse.cc index 43450a9..f8c23bf 100644 --- a/storage/tianchi/ha_tse.cc +++ b/storage/tianchi/ha_tse.cc @@ -1606,7 +1606,7 @@ static int tse_commit(handlerton *hton, THD *thd, bool commit_trx) { thd_sess_ctx_s *sess_ctx = (thd_sess_ctx_s *)thd_get_ha_data(thd, hton); assert(sess_ctx != nullptr); - if (will_commit) { + if (will_commit && (sess_ctx->is_tse_trx_begin || is_ddl_sql_cmd(thd->lex->sql_command))) { commit_preprocess(thd, &tch); attachable_trx_update_pre_addr(tse_hton, thd, &tch, true); @@ -1676,7 +1676,7 @@ static int tse_rollback(handlerton *hton, THD *thd, bool rollback_trx) { thd_sess_ctx_s *sess_ctx = (thd_sess_ctx_s *)thd_get_ha_data(thd, hton); assert(sess_ctx != nullptr); - if (will_rollback) { + if (will_rollback && (sess_ctx->is_tse_trx_begin || is_ddl_sql_cmd(thd->lex->sql_command))) { int32_t total_csize = sess_ctx->cursors_map->size(); if (sess_ctx->invalid_cursors != nullptr) { total_csize += sess_ctx->invalid_cursors->size(); @@ -1733,7 +1733,16 @@ static int tse_close_connect(handlerton *hton, THD *thd) { local_tch.thd_id = tch.thd_id; local_tch.is_broadcast = tch.is_broadcast; - int ret = tse_close_session(&local_tch); + assert(sess_ctx != nullptr); + int32 total_csize = sess_ctx->cursors_map->size(); + if (sess_ctx->invalid_cursors != nullptr) { + total_csize += sess_ctx->invalid_cursors->size(); + } + uint64_t *cursors = (uint64_t *)tse_alloc_buf(&tch, sizeof(uint64_t) * total_csize); + assert((total_csize == 0) ^ (cursors != nullptr)); + ctc_copy_cursors_to_free(sess_ctx, cursors, 0); + int ret = tse_close_session(&local_tch, cursors, total_csize); + tse_free_buf(&tch, (uint8_t *)cursors); release_sess_ctx(sess_ctx, hton, thd); return convert_tse_error_code_to_mysql((ct_errno_t)ret); } @@ -4103,25 +4112,21 @@ int ha_tse::start_stmt(THD *thd, thr_lock_type) { return 0; } - uint32_t lock_wait_timeout = THDVAR(thd, lock_wait_timeout); uint32_t autocommit = !thd->in_multi_stmt_transaction_mode(); - int isolation_level = isolation_level_to_cantian(thd_get_trx_isolation(thd)); - - tianchi_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, m_select_lock == lock_mode::EXCLUSIVE_LOCK}; - - bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); - ct_errno_t ret = (ct_errno_t)tse_trx_begin(&m_tch, trx_context, is_mysql_local); - - check_error_code_to_mysql(ha_thd(), &ret); - - update_sess_ctx_by_tch(m_tch, tse_hton, thd); - - if (ret != CT_SUCCESS) { - tse_log_error("start trx failed with error code: %d", ret); - return convert_tse_error_code_to_mysql(ret); + if (!(m_select_lock != lock_mode::EXCLUSIVE_LOCK && tse_command_type_read(thd->query_plan.get_command()))) { + uint32_t lock_wait_timeout = THDVAR(thd, lock_wait_timeout); + int isolation_level = isolation_level_to_cantian(thd_get_trx_isolation(thd)); + tianchi_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, m_select_lock == lock_mode::EXCLUSIVE_LOCK}; + bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); + ct_errno_t ret = (ct_errno_t)tse_trx_begin(&m_tch, trx_context, is_mysql_local); + check_error_code_to_mysql(ha_thd(), &ret); + update_sess_ctx_by_tch(m_tch, tse_hton, thd); + if (ret != CT_SUCCESS) { + tse_log_error("start trx failed with error code: %d", ret); + return convert_tse_error_code_to_mysql(ret); + } + sess_ctx->is_tse_trx_begin = 1; } - - sess_ctx->is_tse_trx_begin = 1; if (!autocommit) { trans_register_ha(thd, true, ht, nullptr); } diff --git a/storage/tianchi/srv_mq_msg.h b/storage/tianchi/srv_mq_msg.h index 657d652..076ef38 100644 --- a/storage/tianchi/srv_mq_msg.h +++ b/storage/tianchi/srv_mq_msg.h @@ -47,6 +47,8 @@ struct register_instance_request { struct close_session_request { tianchi_handler_t tch; int result; + int32_t csize; + uint64_t *cursors; }; struct open_table_request { diff --git a/storage/tianchi/tse_srv.h b/storage/tianchi/tse_srv.h index 4779a5e..d53438d 100644 --- a/storage/tianchi/tse_srv.h +++ b/storage/tianchi/tse_srv.h @@ -502,7 +502,7 @@ int tse_release_inst_id(uint32_t inst_id); int tse_open_table(tianchi_handler_t *tch, const char *table_name, const char *user_name); int tse_close_table(tianchi_handler_t *tch); -int tse_close_session(tianchi_handler_t *tch); +int tse_close_session(tianchi_handler_t *tch, uint64_t *cursors, int32_t csize); void tse_kill_session(tianchi_handler_t *tch); uint8_t *tse_alloc_buf(tianchi_handler_t *tch, uint32_t buf_size); diff --git a/storage/tianchi/tse_srv_mq_stub.cc b/storage/tianchi/tse_srv_mq_stub.cc index 310fb47..0623f67 100644 --- a/storage/tianchi/tse_srv_mq_stub.cc +++ b/storage/tianchi/tse_srv_mq_stub.cc @@ -70,7 +70,7 @@ int tse_open_table(tianchi_handler_t *tch, const char *table_name, const char *u return result; } -int tse_close_session(tianchi_handler_t *tch) { +int tse_close_session(tianchi_handler_t *tch, uint64_t *cursors, int32_t csize) { tse_log_note("close session"); void *shm_inst = get_one_shm_inst(tch); close_session_request *req = (close_session_request*)alloc_share_mem(shm_inst, sizeof(close_session_request)); @@ -79,7 +79,8 @@ int tse_close_session(tianchi_handler_t *tch) { return ERR_ALLOC_MEMORY; } req->tch = *tch; - + req->csize = csize; + req->cursors = cursors; int result = ERR_CONNECTION_FAILED; int ret = tse_mq_deal_func(shm_inst, TSE_FUNC_TYPE_CLOSE_SESSION, req, tch->msg_buf); *tch = req->tch; diff --git a/storage/tianchi/tse_util.cc b/storage/tianchi/tse_util.cc index 2c1eb38..6733a03 100644 --- a/storage/tianchi/tse_util.cc +++ b/storage/tianchi/tse_util.cc @@ -707,4 +707,34 @@ int tse_check_unlock_instance(MYSQL_THD thd) { tse_unlock_instance(&is_mysqld_starting, &tch); tse_log_system("[TSE_UNLOCK_INSTANCE]: SUCCESS. tse_inst:%u, conn_id:%u", tch.inst_id, tch.thd_id); return 0; +} + +bool tse_command_type_read(enum_sql_command cmd) { + switch (cmd) { + case SQLCOM_SELECT: + case SQLCOM_CHECK: + case SQLCOM_SHOW_DATABASES: + case SQLCOM_SHOW_TABLES: + case SQLCOM_SHOW_FIELDS: + case SQLCOM_SHOW_KEYS: + case SQLCOM_SHOW_VARIABLES: + case SQLCOM_SHOW_STATUS: + case SQLCOM_SHOW_CREATE: + case SQLCOM_SHOW_CHARSETS: + case SQLCOM_SHOW_COLLATIONS: + case SQLCOM_SHOW_CREATE_DB: + case SQLCOM_SHOW_TABLE_STATUS: + case SQLCOM_SHOW_TRIGGERS: + case SQLCOM_SHOW_CREATE_PROC: + case SQLCOM_SHOW_CREATE_FUNC: + case SQLCOM_SHOW_STATUS_PROC: + case SQLCOM_SHOW_STATUS_FUNC: + case SQLCOM_SHOW_CREATE_EVENT: + case SQLCOM_SHOW_EVENTS: + case SQLCOM_SHOW_CREATE_TRIGGER: + case SQLCOM_SHOW_CREATE_USER: + return CT_TRUE; + default: + return CT_FALSE; + } } \ No newline at end of file diff --git a/storage/tianchi/tse_util.h b/storage/tianchi/tse_util.h index 3cad7f2..e3e1d00 100644 --- a/storage/tianchi/tse_util.h +++ b/storage/tianchi/tse_util.h @@ -36,6 +36,7 @@ static unordered_set mysql_system_db{"information_schema", "mysql", "per #define TSE_GET_THD_DB_NAME(thd) (thd->db().str == NULL) ? nullptr : const_cast(thd->db().str) +bool tse_command_type_read(enum_sql_command cmd); void tse_split_normalized_name(const char *file_name, char db[], size_t db_buf_len, char name[], size_t name_buf_len, bool *is_tmp_table); void tse_copy_name(char to_name[], const char from_name[], size_t to_buf_len); -- Gitee