diff --git a/frameworks/libs/distributeddb/storage/src/storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/storage_engine.cpp index a463f32aafce0fc226f87eaf257a6eaab0ed4a7d..1f55997aab8743c0850c8082a16bf19eba2596cc 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_engine.cpp @@ -38,6 +38,8 @@ StorageEngine::StorageEngine() perm_(OperatePerm::NORMAL_PERM), operateAbort_(false), isExistConnection_(false), + readPendingCount_(0), + externalReadPendingCount_(0), engineState_(EngineState::INVALID) {} @@ -202,42 +204,88 @@ StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal) { - std::unique_lock lock(readMutex_); - errCode = -E_BUSY; - if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) { - LOGI("Not permitted to get the executor[%u]", static_cast(perm_)); - return nullptr; - } - - std::list &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_; - std::list &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_; - if (waitTime <= 0) { // non-blocking. - if (readIdleList.empty() && - readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) { + auto &pendingCount = isExternal ? externalReadPendingCount_ : readPendingCount_; + { + std::unique_lock lock(readMutex_); + errCode = -E_BUSY; + if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) { + LOGI("Not permitted to get the executor[%u]", static_cast(perm_)); return nullptr; } - return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal); + std::list &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_; + std::list &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_; + if (waitTime <= 0) { // non-blocking. + if (readIdleList.empty() && + readIdleList.size() + readUsingList.size() + pendingCount == engineAttr_.maxReadNum) { + return nullptr; + } + } else { + // Not prohibited and there is an available handle + uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum; + bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime), + [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum, &pendingCount]() { + return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && + (!readIdleList.empty() || + (readIdleList.size() + readUsingList.size() + pendingCount < maxReadHandleNum) || + operateAbort_); + }); + if (operateAbort_) { + LOGI("Abort find read executor and busy for operate!"); + return nullptr; + } + if (!result) { + LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]" + "pending count[%d]", result, static_cast(perm_), static_cast(perm), + readIdleList.size(), readUsingList.size(), engineAttr_.maxReadNum, pendingCount.load()); + return nullptr; + } + } + pendingCount++; } + auto executor = FetchReadStorageExecutor(errCode, isExternal); + pendingCount--; + readCondition_.notify_all(); + return executor; +} - // Not prohibited and there is an available handle - uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum; - bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime), - [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum]() { - return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && - (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < maxReadHandleNum) || - operateAbort_); - }); - if (operateAbort_) { - LOGI("Abort find read executor and busy for operate!"); - return nullptr; +StorageExecutor *StorageEngine::FetchReadStorageExecutor(int &errCode, bool isExternal) +{ + bool isNeedCreate; + { + std::unique_lock lock(readMutex_); + auto &idleList = isExternal ? externalReadIdleList_ : readIdleList_; + isNeedCreate = idleList.empty(); } - if (!result) { - LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result, - static_cast(perm_), static_cast(perm), readIdleList.size(), readUsingList.size(), - engineAttr_.maxReadNum); - return nullptr; + StorageExecutor *handle = nullptr; + if (isNeedCreate) { + errCode = CreateNewExecutor(false, handle); + } + std::unique_lock lock(readMutex_); + if (isNeedCreate) { + auto &usingList = isExternal ? externalReadUsingList_ : readUsingList_; + if ((errCode != E_OK) || (handle == nullptr)) { + if (errCode != -E_EKEYREVOKED) { + return nullptr; + } + LOGE("Key revoked status, couldn't create the new executor"); + if (!usingList.empty()) { + LOGE("Can't create new executor for revoked"); + errCode = -E_BUSY; + } + return nullptr; + } + AddStorageExecutor(handle, isExternal); + } + auto &usingList = isExternal ? externalReadUsingList_ : readUsingList_; + auto &idleList = isExternal ? externalReadIdleList_ : readIdleList_; + auto item = idleList.front(); + usingList.push_back(item); + idleList.remove(item); + if (!isEnhance_) { + LOGD("Get executor[%d] from [%.3s]", false, hashIdentifier_.c_str()); } - return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal); + errCode = E_OK; + return item; } void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal) @@ -266,21 +314,25 @@ void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal) idleCondition_.notify_all(); } } else { - std::unique_lock lock(readMutex_); - std::list &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_; - std::list &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_; - auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle); - if (iter != readUsingList.end()) { - readUsingList.remove(handle); - if (!readIdleList.empty()) { - delete handle; - handle = nullptr; - return; + StorageExecutor *releaseHandle = nullptr; + { + std::unique_lock lock(readMutex_); + std::list &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_; + std::list &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_; + auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle); + if (iter != readUsingList.end()) { + readUsingList.remove(handle); + if (!readIdleList.empty()) { + releaseHandle = handle; + handle = nullptr; + } else { + handle->Reset(); + readIdleList.push_back(handle); + readCondition_.notify_one(); + } } - handle->Reset(); - readIdleList.push_back(handle); - readCondition_.notify_one(); } + delete releaseHandle; } handle = nullptr; } diff --git a/frameworks/libs/distributeddb/storage/src/storage_engine.h b/frameworks/libs/distributeddb/storage/src/storage_engine.h index 8ad5b2df219b962a2e90f97bf00606b3675325c5..a762db38029ecf5c73b3bada3de987fdd50f31cc 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/storage_engine.h @@ -138,6 +138,8 @@ private: StorageExecutor *FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); StorageExecutor *FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); + StorageExecutor *FetchReadStorageExecutor(int &errCode, bool isExternal = false); + virtual void ClearCorruptedFlag(); void PrintDbFileMsg(bool isOpen); @@ -169,6 +171,9 @@ private: std::mutex idleMutex_; std::condition_variable idleCondition_; + std::atomic readPendingCount_; + std::atomic externalReadPendingCount_; + EngineState engineState_; }; } // namespace DistributedDB