From c03a2ff81ff0dfeba665a97899ea1fabb9995d13 Mon Sep 17 00:00:00 2001 From: zhuruigan Date: Fri, 8 Aug 2025 15:32:18 +0800 Subject: [PATCH] fix PeriodicTask Signed-off-by: zhuruigan Change-Id: I6def1a261423c258fb90a9a3d10e4fcdaff13db9 --- js_concurrent_module/taskpool/task.cpp | 92 +++++++++++------- js_concurrent_module/taskpool/task.h | 7 +- js_concurrent_module/taskpool/taskpool.cpp | 103 ++++++++++++--------- js_concurrent_module/taskpool/taskpool.h | 3 +- 4 files changed, 123 insertions(+), 82 deletions(-) diff --git a/js_concurrent_module/taskpool/task.cpp b/js_concurrent_module/taskpool/task.cpp index 77e380c6..c04ad658 100644 --- a/js_concurrent_module/taskpool/task.cpp +++ b/js_concurrent_module/taskpool/task.cpp @@ -264,22 +264,11 @@ napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType task TaskInfo* Task::GetTaskInfo(napi_env env, napi_value napiTask, Priority priority) { - napi_value func = NapiHelper::GetNameProperty(env, napiTask, FUNCTION_STR); - napi_value args = NapiHelper::GetNameProperty(env, napiTask, ARGUMENTS_STR); + auto [func, args, transferList, cloneList] = GetSerializeParams(env, napiTask); if (func == nullptr || args == nullptr) { - std::string errMessage = "taskpool:: task value is error"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); return nullptr; } - napi_value transferList = NapiHelper::GetUndefinedValue(env); - if (NapiHelper::HasNameProperty(env, napiTask, TRANSFERLIST_STR)) { - transferList = NapiHelper::GetNameProperty(env, napiTask, TRANSFERLIST_STR); - } - napi_value cloneList = NapiHelper::GetUndefinedValue(env); - if (NapiHelper::HasNameProperty(env, napiTask, CLONE_LIST_STR)) { - cloneList = NapiHelper::GetNameProperty(env, napiTask, CLONE_LIST_STR); - } + TaskInfo* pendingInfo = GenerateTaskInfo(env, func, args, transferList, cloneList, priority, defaultTransfer_, defaultCloneSendable_); if (pendingInfo == nullptr) { @@ -1077,29 +1066,13 @@ TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value func, napi_value args, bool defaultTransfer, bool defaultCloneSendable) { HILOG_DEBUG("taskpool:: task GenerateTaskInfo"); - napi_value undefined = NapiHelper::GetUndefinedValue(env); - void* serializationFunction = nullptr; - std::string errString = ""; - napi_status status = napi_serialize_inner_with_error(env, func, undefined, undefined, defaultTransfer, - defaultCloneSendable, &serializationFunction, errString); - std::string errMessage = ""; - if (status != napi_ok || serializationFunction == nullptr) { - errMessage = "taskpool: failed to serialize function.\nSerialize error: " + errString; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str()); - return nullptr; - } - void* serializationArguments = nullptr; - errMessage = ""; - status = napi_serialize_inner_with_error(env, args, transferList, cloneList, defaultTransfer, - defaultCloneSendable, &serializationArguments, errString); - if (status != napi_ok || serializationArguments == nullptr) { - errMessage = "taskpool: failed to serialize arguments.\nSerialize error: " + errString; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); + std::tuple params = { + transferList, cloneList, defaultTransfer, defaultCloneSendable + }; + auto [serializationFunction, serializationArguments] = GetSerializeResult(env, func, args, params); + if (serializationFunction == nullptr || serializationArguments == nullptr) { return nullptr; } - TaskInfo* taskInfo = new TaskInfo(); taskInfo->serializationFunction = serializationFunction; taskInfo->serializationArguments = serializationArguments; @@ -1126,7 +1099,6 @@ bool Task::IsReadyToHandle() const void Task::NotifyPendingTask() { HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask", std::to_string(taskId_).c_str()); - TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_); std::lock_guard lock(taskMutex_); delete currentTaskInfo_; if (pendingTaskInfos_.empty()) { @@ -1959,4 +1931,54 @@ bool Task::UpdateTaskStateToEnding() } return false; } + +std::tuple Task::GetSerializeParams(napi_env env, napi_value napiTask) +{ + napi_value func = NapiHelper::GetNameProperty(env, napiTask, FUNCTION_STR); + napi_value args = NapiHelper::GetNameProperty(env, napiTask, ARGUMENTS_STR); + if (func == nullptr || args == nullptr) { + std::string errMessage = "taskpool:: task value is error"; + HILOG_ERROR("%{public}s", errMessage.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); + return {nullptr, nullptr, nullptr, nullptr}; + } + napi_value transferList = NapiHelper::GetUndefinedValue(env); + if (NapiHelper::HasNameProperty(env, napiTask, TRANSFERLIST_STR)) { + transferList = NapiHelper::GetNameProperty(env, napiTask, TRANSFERLIST_STR); + } + napi_value cloneList = NapiHelper::GetUndefinedValue(env); + if (NapiHelper::HasNameProperty(env, napiTask, CLONE_LIST_STR)) { + cloneList = NapiHelper::GetNameProperty(env, napiTask, CLONE_LIST_STR); + } + return {func, args, transferList, cloneList}; +} + +std::tuple Task::GetSerializeResult(napi_env env, napi_value func, napi_value args, + std::tuple transferAndCloneParams) +{ + auto [transferList, cloneList, defaultTransfer, defaultCloneSendable] = transferAndCloneParams; + napi_value undefined = NapiHelper::GetUndefinedValue(env); + void* serializationFunction = nullptr; + std::string errString = ""; + napi_status status = napi_serialize_inner_with_error(env, func, undefined, undefined, defaultTransfer, + defaultCloneSendable, &serializationFunction, errString); + std::string errMessage = ""; + if (status != napi_ok || serializationFunction == nullptr) { + errMessage = "taskpool: failed to serialize function.\nSerialize error: " + errString; + HILOG_ERROR("%{public}s", errMessage.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str()); + return {nullptr, nullptr}; + } + void* serializationArguments = nullptr; + errMessage = ""; + status = napi_serialize_inner_with_error(env, args, transferList, cloneList, defaultTransfer, + defaultCloneSendable, &serializationArguments, errString); + if (status != napi_ok || serializationArguments == nullptr) { + errMessage = "taskpool: failed to serialize arguments.\nSerialize error: " + errString; + HILOG_ERROR("%{public}s", errMessage.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); + return {nullptr, nullptr}; + } + return {serializationFunction, serializationArguments}; +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task.h b/js_concurrent_module/taskpool/task.h index 6fcefab3..c4807403 100644 --- a/js_concurrent_module/taskpool/task.h +++ b/js_concurrent_module/taskpool/task.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -207,6 +208,10 @@ public: bool UpdateTaskStateToFinished(); bool UpdateTaskStateToDelayed(); bool UpdateTaskStateToEnding(); + static std::tuple GetSerializeParams(napi_env env, + napi_value napiTask); + static std::tuple GetSerializeResult(napi_env env, napi_value func, napi_value args, + std::tuple transferAndCloneParams); private: Task(const Task &) = delete; @@ -252,8 +257,6 @@ public: // for periodic task bool isPeriodicTask_ {false}; - // periodic task first Generate TaskInfo - std::atomic isFirstTaskInfo_ {false}; uv_timer_t* timer_ {nullptr}; Priority periodicTaskPriority_ {Priority::DEFAULT}; diff --git a/js_concurrent_module/taskpool/taskpool.cpp b/js_concurrent_module/taskpool/taskpool.cpp index 29989ad4..4efa4702 100644 --- a/js_concurrent_module/taskpool/taskpool.cpp +++ b/js_concurrent_module/taskpool/taskpool.cpp @@ -411,25 +411,9 @@ void TaskPool::HandleTaskResultInner(Task* task) napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult); napi_delete_serialization_data(task->env_, task->result_); - // tag for trace parse: Task PerformTask End - std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_); - std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_); - if (task->taskState_ == ExecuteState::CANCELED) { - strTrace += ", performResult : IsCanceled"; - napiTaskResult = task->IsAsyncRunnerTask() ? TaskManager::GetInstance().CancelError(task->env_, - ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, nullptr, napiTaskResult, task->success_) : - TaskManager::GetInstance().CancelError(task->env_, 0, nullptr, napiTaskResult, task->success_); - } else if (status != napi_ok) { - strTrace += ", performResult : DeserializeFailed"; - taskLog += ", DeserializeFailed"; - } else if (task->success_) { - strTrace += ", performResult : Successful"; - } else { - strTrace += ", performResult : Unsuccessful"; - taskLog += ", Unsuccessful"; - } - HITRACE_HELPER_METER_NAME(strTrace); - HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str()); + bool isCancel = false; + RecordTaskResultLog(task, status, napiTaskResult, isCancel); + if (napiTaskResult == nullptr) { napi_get_undefined(task->env_, &napiTaskResult); } @@ -454,10 +438,10 @@ void TaskPool::HandleTaskResultInner(Task* task) } } NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope)); - TriggerTask(task); + TriggerTask(task, isCancel); } -void TaskPool::TriggerTask(Task* task) +void TaskPool::TriggerTask(Task* task, bool isCancel) { HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str()); if (task->IsGroupTask()) { @@ -472,6 +456,9 @@ void TaskPool::TriggerTask(Task* task) std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str()); } } else if (task->IsCommonTask()) { + if (!isCancel) { + TaskManager::GetInstance().NotifyDependencyTaskInfo(task->taskId_); + } task->NotifyPendingTask(); } else if (task->IsAsyncRunnerTask()) { if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) { @@ -643,30 +630,29 @@ void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) HILOG_DEBUG("taskpool:: the current task is not a periodic task"); return; } else if (task->taskState_ == ExecuteState::CANCELED) { - HILOG_DEBUG("taskpool:: the periodic task has been canceled"); - napi_reference_unref(task->env_, task->taskRef_, nullptr); - task->CancelPendingTask(task->env_); - uv_timer_stop(task->timer_); - ConcurrentHelper::UvHandleClose(task->timer_); + if (task->currentTaskInfo_ == nullptr) { + HILOG_DEBUG("taskpool:: the periodic task has been canceled"); + napi_reference_unref(task->env_, task->taskRef_, nullptr); + task->CancelPendingTask(task->env_); + uv_timer_stop(task->timer_); + ConcurrentHelper::UvHandleClose(task->timer_); + } return; } TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); - if (!task->isFirstTaskInfo_) { - napi_status status = napi_ok; - HandleScope scope(task->env_, status); - if (status != napi_ok) { - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - return; - } - napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_); - TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_); - if (taskInfo == nullptr) { - HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr"); - return; - } + napi_status status = napi_ok; + HandleScope scope(task->env_, status); + if (status != napi_ok) { + HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); + return; + } + napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_); + TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_); + if (taskInfo == nullptr) { + HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr"); + return; } - task->isFirstTaskInfo_ = false; task->IncreaseRefCount(); HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str()); @@ -691,12 +677,17 @@ napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo periodicTask->periodicTaskPriority_ = static_cast(priority); napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_); - TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_); - if (taskInfo == nullptr) { + auto [func, args, transferList, cloneList] = Task::GetSerializeParams(env, napiTask); + if (func == nullptr || args == nullptr) { + return nullptr; + } + std::tuple params = { + transferList, cloneList, periodicTask->defaultTransfer_, periodicTask->defaultCloneSendable_ + }; + auto [serFunction, serArguments] = Task::GetSerializeResult(env, func, args, params); + if (serFunction == nullptr || serArguments == nullptr) { return nullptr; } - - periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo TriggerTimer(env, periodicTask, period); return nullptr; @@ -811,4 +802,28 @@ bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, return true; } + +void TaskPool::RecordTaskResultLog(Task* task, napi_status status, napi_value& napiTaskResult, bool& isCancel) +{ + // tag for trace parse: Task PerformTask End + std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_); + std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_); + if (task->taskState_ == ExecuteState::CANCELED) { + strTrace += ", performResult : IsCanceled"; + napiTaskResult = task->IsAsyncRunnerTask() ? TaskManager::GetInstance().CancelError(task->env_, + ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, nullptr, napiTaskResult, task->success_) : + TaskManager::GetInstance().CancelError(task->env_, 0, nullptr, napiTaskResult, task->success_); + isCancel = true; + } else if (status != napi_ok) { + strTrace += ", performResult : DeserializeFailed"; + taskLog += ", DeserializeFailed"; + } else if (task->success_) { + strTrace += ", performResult : Successful"; + } else { + strTrace += ", performResult : Unsuccessful"; + taskLog += ", Unsuccessful"; + } + HITRACE_HELPER_METER_NAME(strTrace); + HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str()); +} } // namespace Commonlibrary::Concurrent::TaskPoolModule diff --git a/js_concurrent_module/taskpool/taskpool.h b/js_concurrent_module/taskpool/taskpool.h index 2223d8e8..80fa6c6c 100644 --- a/js_concurrent_module/taskpool/taskpool.h +++ b/js_concurrent_module/taskpool/taskpool.h @@ -59,13 +59,14 @@ private: static void ExecuteTask(napi_env env, Task* task, Priority priority = Priority::DEFAULT); static napi_value ExecuteGroup(napi_env env, napi_value taskGroup, Priority priority); - static void TriggerTask(Task* task); + static void TriggerTask(Task* task, bool isCancel); static void TriggerTimer(napi_env env, Task* task, int32_t period); static bool CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t& priority, int32_t& delayTime, Task*& task); static bool CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t& period, uint32_t& priority, Task*& task); static void ExecuteOnReceiveDataCallback(CallbackInfo* callbackInfo, TaskResultInfo* resultInfo); + static void RecordTaskResultLog(Task* task, napi_status status, napi_value& napiTaskResult, bool& isCancel); friend class TaskManager; friend class NativeEngineTest; }; -- Gitee