diff --git a/js_concurrent_module/taskpool/task.cpp b/js_concurrent_module/taskpool/task.cpp index 333040f10137f0c4be14ecc61e8426b409d920e3..0985ff2b0df3ac31549efb6b7c86aae05a731c93 100644 --- a/js_concurrent_module/taskpool/task.cpp +++ b/js_concurrent_module/taskpool/task.cpp @@ -145,7 +145,6 @@ void Task::CleanupHookFunc(void* arg) { std::lock_guard lock(task->taskMutex_); ConcurrentHelper::UvHandleClose(task->onStartCancelSignal_); - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); ConcurrentHelper::UvHandleClose(task->onStartDiscardSignal_); if (task->IsFunctionTask()) { task->SetValid(false); @@ -505,7 +504,7 @@ napi_value Task::SendData(napi_env env, napi_callback_info cbinfo) } TaskResultInfo* resultInfo = new TaskResultInfo(env, task->GetTaskId(), serializationArgs); - TaskManager::GetInstance().ExecuteSendData(env, resultInfo, task); + TaskManager::GetInstance().ExecuteSendData(env, resultInfo, task->GetTaskId()); return nullptr; } @@ -841,17 +840,6 @@ napi_value Task::OnStartExecution(napi_env env, napi_callback_info cbinfo) napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1); task->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); task->onStartExecutionCallBackInfo_->type_ = "onStartExecution"; -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (!task->IsMainThreadTask()) { - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, - Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_); - } -#else - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, - Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_); -#endif return nullptr; } @@ -1550,28 +1538,10 @@ bool Task::CheckStartExecution(Priority priority) Task::StartExecutionTask(task->onStartExecutionCallBackInfo_); }; TaskManager::GetInstance().PostTask(onStartExecutionTask, "TaskPoolOnStartExecutionTask", priority); - } else { - if (onStartExecutionSignal_ == nullptr) { - return true; - } - std::lock_guard lock(taskMutex_); - if (!IsValid()) { - return false; - } - ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_); - } - return true; -#else - if (onStartExecutionSignal_ == nullptr) { return true; } - std::lock_guard lock(taskMutex_); - if (!IsValid()) { - return false; - } - ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_); - return true; #endif + return TaskManager::GetInstance().ExecuteTaskStartExecution(taskId_, priority); } void Task::SetValid(bool isValid) diff --git a/js_concurrent_module/taskpool/task.h b/js_concurrent_module/taskpool/task.h index b415d7725621aab96096313d6f498e6832da3d8a..4ea7428df25f83146e9078ffe02624aa0528b548 100644 --- a/js_concurrent_module/taskpool/task.h +++ b/js_concurrent_module/taskpool/task.h @@ -249,7 +249,6 @@ public: bool defaultCloneSendable_ {false}; std::atomic isValid_ {true}; std::atomic lifecycleCount_ {0}; // when lifecycleCount_ is 0, the task pointer can be deleted - uv_async_t* onStartExecutionSignal_ = nullptr; uv_async_t* onStartCancelSignal_ = nullptr; uv_async_t* onStartDiscardSignal_ = nullptr; ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr; diff --git a/js_concurrent_module/taskpool/task_manager.cpp b/js_concurrent_module/taskpool/task_manager.cpp index f1e40caf5c21d398d1cb2b8cd6bfade6dcb745d5..fc051e5515caa777560ccddeed8ef826a46b26e8 100644 --- a/js_concurrent_module/taskpool/task_manager.cpp +++ b/js_concurrent_module/taskpool/task_manager.cpp @@ -56,6 +56,7 @@ static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger reche [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread static constexpr char ON_CALLBACK_STR[] = "TaskPoolOnCallbackTask"; static constexpr char ON_ENQUEUE_STR[] = "TaskPoolOnEnqueueTask"; +static constexpr char ON_START_STR[] = "TaskPoolOnStartTask"; static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min #if defined(ENABLE_TASKPOOL_EVENTHANDLER) @@ -1062,10 +1063,24 @@ void TaskManager::DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* } } -void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task) +void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, uint32_t taskId) { + auto task = GetTask(taskId); + if (task == nullptr) { + HILOG_ERROR("taskpool:: the task is nullptr"); + delete resultInfo; + return; + } + napi_env hostEnv = nullptr; + napi_event_priority priority = napi_eprio_high; + { + std::lock_guard lock(task->taskMutex_); + auto worker = task->GetWorker(); + hostEnv = task->GetEnv(); + priority = g_napiPriorityMap.at(worker->GetPriority()); + } std::lock_guard lock(callbackMutex_); - auto iter = callbackTable_.find(task->GetTaskId()); + auto iter = callbackTable_.find(taskId); if (iter == callbackTable_.end() || iter->second == nullptr) { HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side"); ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED); @@ -1086,9 +1101,6 @@ void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task } TaskPool::ExecuteOnReceiveDataCallback(callbackInfo.get(), resultInfo); }; - auto worker = task->GetWorker(); - auto hostEnv = task->GetEnv(); - auto priority = g_napiPriorityMap.at(worker->GetPriority()); uint64_t handleId = 0; napi_status status = napi_send_cancelable_event(hostEnv, onCallbackTask, nullptr, priority, &handleId, ON_CALLBACK_STR); @@ -1439,26 +1451,6 @@ void TaskManager::ReleaseCallBackInfo(Task* task) delete task->onExecutionSucceededCallBackInfo_; task->onExecutionSucceededCallBackInfo_ = nullptr; } - -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) { - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); - } else { - delete task->onStartExecutionSignal_; - task->onStartExecutionSignal_ = nullptr; - } - } -#else - if (task->onStartExecutionSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) { - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); - } else { - delete task->onStartExecutionSignal_; - task->onStartExecutionSignal_ = nullptr; - } - } -#endif } void TaskManager::StoreTask(Task* task) @@ -1770,4 +1762,38 @@ void TaskManager::AddCountTraceForWorkerLog(bool needLog, int64_t threadNum, int std::to_string(idleThreadNum).c_str(), std::to_string(timeoutThreadNum).c_str()); } } + +bool TaskManager::ExecuteTaskStartExecution(uint32_t taskId, Priority priority) +{ + Task* task = GetTask(taskId); + if (task == nullptr) { + return false; + } + std::lock_guard lock(task->taskMutex_); + if (!task->IsValid()) { + return false; + } + if (task->onStartExecutionCallBackInfo_ == nullptr) { + return true; + } + auto hostEnv = task->GetEnv(); + auto workerEngine = reinterpret_cast(hostEnv); + workerEngine->IncreaseListeningCounter(); + auto onStartTask = [taskId]([[maybe_unused]] void* data) { + Task* task = TaskManager::GetInstance().GetTask(taskId); + if (task == nullptr || task->onStartExecutionCallBackInfo_ == nullptr) { + return; + } + Task::StartExecutionTask(task->onStartExecutionCallBackInfo_); + }; + auto napiPrio = g_napiPriorityMap.at(priority); + uint64_t handleId = 0; + napi_status status = napi_send_cancelable_event(hostEnv, onStartTask, nullptr, napiPrio, + &handleId, ON_START_STR); + if (status != napi_ok) { // LOCV_EXCL_BR_LINE + HILOG_ERROR("taskpool:: failed to send event to the host side"); + workerEngine->DecreaseListeningCounter(); + } + return true; +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task_manager.h b/js_concurrent_module/taskpool/task_manager.h index 3776d03c5725d95f143453f6806a9d36dbc16c31..a8037fe2831aeece93607461bcdea085a219ecdc 100644 --- a/js_concurrent_module/taskpool/task_manager.h +++ b/js_concurrent_module/taskpool/task_manager.h @@ -104,7 +104,7 @@ public: const std::string& type); void IncreaseSendDataRefCount(uint32_t taskId); void DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* task = nullptr); - void ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task); + void ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, uint32_t taskId); // for task dependency bool IsDependendByTaskId(uint32_t taskId); @@ -158,6 +158,7 @@ public: bool IsPerformIdle() const; uint32_t GetNonIdleTaskNum(); uint32_t GetTotalTaskNum() const; + bool ExecuteTaskStartExecution(uint32_t taskId, Priority priority); private: TaskManager(); diff --git a/js_concurrent_module/taskpool/test/test.cpp b/js_concurrent_module/taskpool/test/test.cpp index ca5d1b550ef201df0ea3762d0384749d07ab7eec..875c8e8ef90bc6bf3065c71490b6bcebde2d39a2 100755 --- a/js_concurrent_module/taskpool/test/test.cpp +++ b/js_concurrent_module/taskpool/test/test.cpp @@ -584,9 +584,6 @@ void NativeEngineTest::ReleaseTaskData(napi_env env) taskManager.ReleaseCallBackInfo(task2); task2->isMainThreadTask_ = false; taskManager.ReleaseCallBackInfo(task2); - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task2); - taskManager.ReleaseCallBackInfo(task2); } void NativeEngineTest::CheckTask(napi_env env) diff --git a/js_concurrent_module/taskpool/test/test_taskpool.cpp b/js_concurrent_module/taskpool/test/test_taskpool.cpp index 7fc7aead9938619e0638da0c6a01d71761cd678a..475328f2976ce784b5e989459d2d1a965edc3e7b 100644 --- a/js_concurrent_module/taskpool/test/test_taskpool.cpp +++ b/js_concurrent_module/taskpool/test/test_taskpool.cpp @@ -4671,17 +4671,12 @@ HWTEST_F(NativeEngineTest, TaskpoolTest229, testing::ext::TestSize.Level0) napi_env env = (napi_env)engine_; ExceptionScope scope(env); Task* task = new Task(); - uint32_t taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); - task->taskId_ = taskId; - napi_value thisValue = NapiHelper::CreateObject(env); - napi_ref ref = NapiHelper::CreateReference(env, thisValue, 0); - - CallbackInfo* cbInfo = new CallbackInfo(env, 1, ref); - + TaskManager::GetInstance().StoreTask(task); Worker* worker = reinterpret_cast(NativeEngineTest::WorkerConstructor(env)); + task->worker_ = worker; void* args = nullptr; - TaskResultInfo* resultInfo = new TaskResultInfo(env, taskId, args); - TaskManager::GetInstance().ExecuteSendData(env, resultInfo, task); + TaskResultInfo* resultInfo = new TaskResultInfo(env, task->taskId_, args); + TaskManager::GetInstance().ExecuteSendData(env, resultInfo, task->taskId_); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception != nullptr); @@ -4855,8 +4850,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest240, testing::ext::TestSize.Level0) Task* task = new Task(); uint32_t taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); task->taskId_ = taskId; - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); task->taskType_ = TaskType::FUNCTION_TASK; Task::CleanupHookFunc(task); napi_value exception = nullptr; @@ -4932,14 +4925,22 @@ HWTEST_F(NativeEngineTest, TaskpoolTest245, testing::ext::TestSize.Level0) task->taskId_ = taskId; task->env_ = env; task->isMainThreadTask_ = false; - task->onStartExecutionSignal_ = nullptr; - task->CheckStartExecution(Priority::DEFAULT); - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); - task->SetValid(false); - task->CheckStartExecution(Priority::DEFAULT); - task->SetValid(true); task->CheckStartExecution(Priority::DEFAULT); + Task* task2 = new Task(); + TaskManager::GetInstance().StoreTask(task2); + task2->env_ = env; + task2->isMainThreadTask_ = false; + task2->SetValid(false); + task2->CheckStartExecution(Priority::DEFAULT); + task2->SetValid(true); + task2->CheckStartExecution(Priority::DEFAULT); + + Worker* worker = reinterpret_cast(NativeEngineTest::WorkerConstructor(env)); + task2->worker_ = worker; + napi_value obj = NapiHelper::CreateObject(env); + napi_ref callbackRef = NapiHelper::CreateReference(env, obj, 1); + task2->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); + task2->CheckStartExecution(Priority::DEFAULT); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception == nullptr); @@ -4968,8 +4969,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest247, testing::ext::TestSize.Level0) task->taskId_ = taskId; task->env_ = env; task->isMainThreadTask_ = false; - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); TaskManager::GetInstance().ReleaseCallBackInfo(task); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); @@ -6996,4 +6995,70 @@ HWTEST_F(NativeEngineTest, TaskpoolTest337, testing::ext::TestSize.Level0) taskManager.EnqueueTaskId(task4->taskId_); napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception == nullptr); +} + +HWTEST_F(NativeEngineTest, TaskpoolTest338, testing::ext::TestSize.Level0) +{ + napi_env env = (napi_env)engine_; + ExceptionScope scope(env); + TaskManager &taskManager = TaskManager::GetInstance(); + taskManager.ExecuteSendData(env, nullptr, 338); + napi_value exception = nullptr; + napi_get_and_clear_last_exception(env, &exception); + ASSERT_TRUE(exception == nullptr); + + Task* task = new Task(); + taskManager.StoreTask(task); + Worker* worker = reinterpret_cast(NativeEngineTest::WorkerConstructor(env)); + task->worker_ = worker; + taskManager.RegisterCallback(env, task->taskId_, nullptr, "OnReceiveData"); + taskManager.ExecuteSendData(env, nullptr, task->taskId_); + exception = nullptr; + napi_get_and_clear_last_exception(env, &exception); + ASSERT_TRUE(exception != nullptr); + + napi_value thisValue = NapiHelper::CreateObject(env); + auto callbackInfo = std::make_shared(env, 1, nullptr); + taskManager.RegisterCallback(env, task->taskId_, callbackInfo, "TaskpoolTest338"); + void* args = nullptr; + TaskResultInfo* resultInfo = new TaskResultInfo(env, task->taskId_, args); + TaskManager::GetInstance().ExecuteSendData(env, resultInfo, task->taskId_); + exception = nullptr; + napi_get_and_clear_last_exception(env, &exception); + ASSERT_TRUE(exception == nullptr); +} + +HWTEST_F(NativeEngineTest, TaskpoolTest339, testing::ext::TestSize.Level0) +{ + napi_env env = (napi_env)engine_; + Task* task = new Task(); + TaskManager::GetInstance().StoreTask(task); + task->env_ = env; + Worker* worker = reinterpret_cast(NativeEngineTest::WorkerConstructor(env)); + task->worker_ = worker; + napi_value obj = NapiHelper::CreateObject(env); + napi_ref callbackRef = NapiHelper::CreateReference(env, obj, 1); + task->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); + TaskManager::GetInstance().ExecuteTaskStartExecution(task->taskId_, Priority::DEFAULT); + napi_value exception = nullptr; + napi_get_and_clear_last_exception(env, &exception); + ASSERT_TRUE(exception == nullptr); +} + +HWTEST_F(NativeEngineTest, TaskpoolTest340, testing::ext::TestSize.Level0) +{ + napi_env env = (napi_env)engine_; + Task* task = new Task(); + TaskManager::GetInstance().StoreTask(task); + task->env_ = env; + Worker* worker = reinterpret_cast(NativeEngineTest::WorkerConstructor(env)); + task->worker_ = worker; + napi_value obj = NapiHelper::CreateObject(env); + napi_ref callbackRef = NapiHelper::CreateReference(env, obj, 1); + task->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); + TaskManager::GetInstance().ExecuteTaskStartExecution(task->taskId_, Priority::DEFAULT); + task->onStartExecutionCallBackInfo_ = nullptr; + napi_value exception = nullptr; + napi_get_and_clear_last_exception(env, &exception); + ASSERT_TRUE(exception == nullptr); } \ No newline at end of file