From fa67d7101cbe6be3db807987d95104c549ca883b Mon Sep 17 00:00:00 2001 From: zhuruigan Date: Tue, 24 Jun 2025 19:19:17 +0800 Subject: [PATCH] optimize workersMutex use Signed-off-by: zhuruigan Change-Id: Ib9916cfcafca31f15288e4aa4756562024576f95 --- .../taskpool/task_manager.cpp | 7 ++++- js_concurrent_module/taskpool/task_manager.h | 1 + js_concurrent_module/taskpool/test/test.cpp | 28 +++++++++++++++++++ js_concurrent_module/taskpool/test/test.h | 1 + .../taskpool/test/test_taskpool.cpp | 18 ++++++++++++ js_concurrent_module/taskpool/worker.cpp | 9 +++--- 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/js_concurrent_module/taskpool/task_manager.cpp b/js_concurrent_module/taskpool/task_manager.cpp index e000d82c..15d7fd63 100644 --- a/js_concurrent_module/taskpool/task_manager.cpp +++ b/js_concurrent_module/taskpool/task_manager.cpp @@ -137,6 +137,11 @@ TaskManager::~TaskManager() void TaskManager::CountTraceForWorker() { std::lock_guard lock(workersMutex_); + CountTraceForWorkerWithoutLock(); +} + +void TaskManager::CountTraceForWorkerWithoutLock() +{ int64_t threadNum = static_cast(workers_.size()); int64_t idleWorkers = static_cast(idleWorkers_.size()); int64_t timeoutWorkers = static_cast(timeoutWorkers_.size()); @@ -744,7 +749,7 @@ void TaskManager::NotifyWorkerRunning(Worker* worker) { std::lock_guard lock(workersMutex_); idleWorkers_.erase(worker); - CountTraceForWorker(); + CountTraceForWorkerWithoutLock(); } uint32_t TaskManager::GetRunningWorkers() diff --git a/js_concurrent_module/taskpool/task_manager.h b/js_concurrent_module/taskpool/task_manager.h index 97f9524c..d76b4ed3 100644 --- a/js_concurrent_module/taskpool/task_manager.h +++ b/js_concurrent_module/taskpool/task_manager.h @@ -98,6 +98,7 @@ public: // for countTrace for worker void CountTraceForWorker(); + void CountTraceForWorkerWithoutLock(); void RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr callbackInfo); void IncreaseSendDataRefCount(uint32_t taskId); diff --git a/js_concurrent_module/taskpool/test/test.cpp b/js_concurrent_module/taskpool/test/test.cpp index cd2aeffb..0370dc10 100755 --- a/js_concurrent_module/taskpool/test/test.cpp +++ b/js_concurrent_module/taskpool/test/test.cpp @@ -990,4 +990,32 @@ bool NativeEngineTest::FindTaskId(Worker* worker, uint32_t taskId) auto& container = worker->currentTaskId_; return std::find(container.begin(), container.end(), taskId) != container.end(); } + +void NativeEngineTest::PerformTask(napi_env env, void* data) +{ + ExceptionScope scope(env); + TaskManager& taskManager = TaskManager::GetInstance(); + uint32_t id = 0; + for (size_t i = 0; i < taskManager.taskQueues_.size(); i++) { + id = taskManager.taskQueues_[i]->DequeueTaskId(); + while (id != 0) { + id = taskManager.taskQueues_[i]->DequeueTaskId(); + } + } + Worker* worker = reinterpret_cast(WorkerConstructor(env)); + napi_env workerEnv = nullptr; + napi_create_runtime(env, &workerEnv); + worker->workerEnv_ = workerEnv; + + Task* task = reinterpret_cast(data); + taskManager.StoreTask(task); + Priority priority = Priority::DEFAULT; + auto& mediumTaskQueue = taskManager.taskQueues_[priority]; + mediumTaskQueue->EnqueueTaskId(task->taskId_); + + uv_async_t* req = new uv_async_t; + req->data = worker; + Worker::PerformTask(req); + usleep(100000); // 100000: is sleep 100ms +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/test/test.h b/js_concurrent_module/taskpool/test/test.h index dc25e230..d4a13135 100644 --- a/js_concurrent_module/taskpool/test/test.h +++ b/js_concurrent_module/taskpool/test/test.h @@ -75,6 +75,7 @@ public: static void ReleaseWorkerHandles(napi_env env); static void DebuggerOnPostTask(napi_env env); static void PerformTask(napi_env env); + static void PerformTask(napi_env env, void* data); static void NotifyHandleTaskResult(napi_env env); static void TaskResultCallback(napi_env env); static void HandleFunctionResult(napi_env env); diff --git a/js_concurrent_module/taskpool/test/test_taskpool.cpp b/js_concurrent_module/taskpool/test/test_taskpool.cpp index 6f7c8d32..75c8cb36 100644 --- a/js_concurrent_module/taskpool/test/test_taskpool.cpp +++ b/js_concurrent_module/taskpool/test/test_taskpool.cpp @@ -579,6 +579,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest037, testing::ext::TestSize.Level0) auto callbackInfo = std::make_shared(env, 1, nullptr); taskManager.RegisterCallback(env, taskId, callbackInfo); auto res = callbackInfo->refCount; + taskManager.DecreaseSendDataRefCount(env, taskId); ASSERT_EQ(res, 1); } @@ -2043,6 +2044,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest131, testing::ext::TestSize.Level0) TaskManager &taskManager = TaskManager::GetInstance(); taskManager.RegisterCallback(env, taskId, nullptr); + taskManager.DecreaseSendDataRefCount(env, taskId); ASSERT_TRUE(true); } @@ -2074,6 +2076,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest132, testing::ext::TestSize.Level0) napi_ref callbackRef = NapiHelper::CreateReference(env, argv[0], 1); std::shared_ptr callbackInfo = std::make_shared(env, 1, callbackRef); taskManager.RegisterCallback(env, taskId, callbackInfo); + taskManager.DecreaseSendDataRefCount(env, taskId, task); ASSERT_TRUE(true); } @@ -2115,6 +2118,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest133, testing::ext::TestSize.Level0) napi_ref callbackRef = NapiHelper::CreateReference(env, argv1[0], 1); std::shared_ptr callbackInfo = std::make_shared(env, 1, callbackRef); taskManager.RegisterCallback(env, taskId, callbackInfo); + taskManager.DecreaseSendDataRefCount(env, taskId, task); ASSERT_TRUE(true); } @@ -6373,4 +6377,18 @@ HWTEST_F(NativeEngineTest, TaskpoolTest313, testing::ext::TestSize.Level0) napi_set_named_property(env, obj, "error", result2); error = TaskManager::GetInstance().CancelError(env, 0, nullptr, obj); ASSERT_TRUE(NapiHelper::IsNotUndefined(env, error)); +} + +HWTEST_F(NativeEngineTest, TaskpoolTest314, testing::ext::TestSize.Level0) +{ + napi_env env = (napi_env)engine_; + ExceptionScope scope(env); + TaskManager &taskManager = TaskManager::GetInstance(); + Task* task = new Task(); + uint32_t taskId = taskManager.CalculateTaskId(reinterpret_cast(task)); + task->taskId_ = taskId; + task->isValid_ = false; + void* data = reinterpret_cast(task); + NativeEngineTest::PerformTask(env, data); + ASSERT_TRUE(true); } \ No newline at end of file diff --git a/js_concurrent_module/taskpool/worker.cpp b/js_concurrent_module/taskpool/worker.cpp index c2274a4e..82184bdb 100644 --- a/js_concurrent_module/taskpool/worker.cpp +++ b/js_concurrent_module/taskpool/worker.cpp @@ -412,16 +412,15 @@ bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired) void Worker::PerformTask(const uv_async_t* req) { + auto taskInfo = TaskManager::GetInstance().DequeueTaskId(); + if (taskInfo.first == 0) { + return; + } uint64_t startTime = ConcurrentHelper::GetMilliseconds(); auto worker = static_cast(req->data); worker->UpdateWorkerWakeUpTime(); napi_env env = worker->workerEnv_; TaskManager::GetInstance().NotifyWorkerRunning(worker); - auto taskInfo = TaskManager::GetInstance().DequeueTaskId(); - if (taskInfo.first == 0) { - worker->NotifyIdle(); - return; - } RunningScope runningScope(worker); WorkerRunningScope workerRunningScope(env); PriorityScope priorityScope(worker, taskInfo.second); -- Gitee