diff --git a/frameworks/js/napi/socket/async_work/include/socket_async_work.h b/frameworks/js/napi/socket/async_work/include/socket_async_work.h index 7c037a6c166e5ed9f557feab148f60c28bb5489c..181e34bfe85850345aff912010a7f72b86c4041c 100644 --- a/frameworks/js/napi/socket/async_work/include/socket_async_work.h +++ b/frameworks/js/napi/socket/async_work/include/socket_async_work.h @@ -25,10 +25,12 @@ public: ACE_DISALLOW_COPY_AND_MOVE(SocketAsyncWork); /* executor */ - static void ExecBind(napi_env env, void *data); + static void ExecUdpBind(napi_env env, void *data); static void ExecUdpSend(napi_env env, void *data); + static void ExecTcpBind(napi_env env, void *data); + static void ExecConnect(napi_env env, void *data); static void ExecTcpSend(napi_env env, void *data); diff --git a/frameworks/js/napi/socket/async_work/src/socket_async_work.cpp b/frameworks/js/napi/socket/async_work/src/socket_async_work.cpp index 65f85713a1d7c47f82eb87d8ae1ad2075ab4a496..5d35d4490dfab42a78adbbaeab34bf81e4d937fe 100644 --- a/frameworks/js/napi/socket/async_work/src/socket_async_work.cpp +++ b/frameworks/js/napi/socket/async_work/src/socket_async_work.cpp @@ -19,9 +19,9 @@ #include "socket_exec.h" namespace OHOS::NetStack { -void SocketAsyncWork::ExecBind(napi_env env, void *data) +void SocketAsyncWork::ExecUdpBind(napi_env env, void *data) { - BaseAsyncWork::ExecAsyncWork(env, data); + BaseAsyncWork::ExecAsyncWork(env, data); } void SocketAsyncWork::ExecUdpSend(napi_env env, void *data) @@ -29,6 +29,11 @@ void SocketAsyncWork::ExecUdpSend(napi_env env, void *data) BaseAsyncWork::ExecAsyncWork(env, data); } +void SocketAsyncWork::ExecTcpBind(napi_env env, void *data) +{ + BaseAsyncWork::ExecAsyncWork(env, data); +} + void SocketAsyncWork::ExecConnect(napi_env env, void *data) { BaseAsyncWork::ExecAsyncWork(env, data); diff --git a/frameworks/js/napi/socket/socket_exec/include/socket_exec.h b/frameworks/js/napi/socket/socket_exec/include/socket_exec.h index 377dfa7f28420b38c47b01bd68d106dab02280a7..0100761e1c6e7fec86156350351590b78ff73b66 100644 --- a/frameworks/js/napi/socket/socket_exec/include/socket_exec.h +++ b/frameworks/js/napi/socket/socket_exec/include/socket_exec.h @@ -30,10 +30,12 @@ int MakeTcpSocket(sa_family_t family); int MakeUdpSocket(sa_family_t family); /* async work execute */ -bool ExecBind(BindContext *context); +bool ExecUdpBind(BindContext *context); bool ExecUdpSend(UdpSendContext *context); +bool ExecTcpBind(BindContext *context); + bool ExecConnect(ConnectContext *context); bool ExecTcpSend(TcpSendContext *context); diff --git a/frameworks/js/napi/socket/socket_exec/src/socket_exec.cpp b/frameworks/js/napi/socket/socket_exec/src/socket_exec.cpp index 694575cbe2e83eb8967e6a8a5d4a3178ae8e4ef5..1fd54945cf178652fe3d4973e0e6d7913affd9f2 100644 --- a/frameworks/js/napi/socket/socket_exec/src/socket_exec.cpp +++ b/frameworks/js/napi/socket/socket_exec/src/socket_exec.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "context_key.h" @@ -36,7 +37,24 @@ static constexpr const int DEFAULT_POLL_TIMEOUT = 500; // 0.5 Seconds static constexpr const int ADDRESS_INVALID = -1; +static constexpr const int NO_MEMORY = -2; + namespace OHOS::NetStack::SocketExec { +struct MessageData { + MessageData() = delete; + MessageData(void *d, size_t l, const SocketRemoteInfo &info) : data(d), len(l), remoteInfo(info) {} + ~MessageData() + { + if (data) { + free(data); + } + } + + void *data; + size_t len; + SocketRemoteInfo remoteInfo; +}; + static void SetIsBound(sa_family_t family, GetStateContext *context, const sockaddr_in *addr4, const sockaddr_in6 *addr6) { @@ -57,11 +75,36 @@ static void } } -static void EmitError(BaseContext *context, int32_t errorNum) +template static void CallbackTemplate(uv_work_t *work, int status) { - napi_value error = NapiUtils::CreateObject(context->GetEnv()); - NapiUtils::SetInt32Property(context->GetEnv(), error, KEY_ERROR_CODE, errorNum); - context->Emit(EVENT_ERROR, std::make_pair(NapiUtils::GetUndefined(context->GetEnv()), error)); + (void)status; + + auto workWrapper = static_cast(work->data); + napi_env env = workWrapper->env; + auto closeScope = [env](napi_handle_scope scope) { NapiUtils::CloseScope(env, scope); }; + std::unique_ptr scope(NapiUtils::OpenScope(env), closeScope); + + napi_value obj = MakeJsValue(env, workWrapper->data); + + std::pair arg = {NapiUtils::GetUndefined(workWrapper->env), obj}; + workWrapper->manager->Emit(workWrapper->type, arg); + + delete workWrapper; + delete work; +} + +static napi_value MakeError(napi_env env, void *errCode) +{ + auto code = reinterpret_cast(errCode); + auto deleter = [](const int32_t *p) { delete p; }; + std::unique_ptr handler(code, deleter); + + napi_value err = NapiUtils::CreateObject(env); + if (NapiUtils::GetValueType(env, err) != napi_object) { + return NapiUtils::GetUndefined(env); + } + NapiUtils::SetInt32Property(env, err, KEY_ERROR_CODE, *code); + return err; } static std::string MakeAddressString(sockaddr *addr) @@ -90,7 +133,9 @@ static napi_value MakeJsMessageParam(napi_env env, napi_value msgBuffer, SocketR if (NapiUtils::GetValueType(env, obj) != napi_object) { return nullptr; } - NapiUtils::SetNamedProperty(env, obj, KEY_MESSAGE, msgBuffer); + if (NapiUtils::ValueIsArrayBuffer(env, msgBuffer)) { + NapiUtils::SetNamedProperty(env, obj, KEY_MESSAGE, msgBuffer); + } napi_value jsRemoteInfo = NapiUtils::CreateObject(env); if (NapiUtils::GetValueType(env, jsRemoteInfo) != napi_object) { return nullptr; @@ -104,42 +149,51 @@ static napi_value MakeJsMessageParam(napi_env env, napi_value msgBuffer, SocketR return obj; } -static void OnRecvMessage(BaseContext *context, void *data, size_t len, sockaddr *addr) +static napi_value MakeMessage(napi_env env, void *para) { - if (data == nullptr || len <= 0) { - return; + auto messageData = reinterpret_cast(para); + auto deleter = [](const MessageData *p) { delete p; }; + std::unique_ptr handler(messageData, deleter); + + if (messageData->data == nullptr || messageData->len == 0) { + return MakeJsMessageParam(env, NapiUtils::GetUndefined(env), &messageData->remoteInfo); } void *dataHandle = nullptr; - napi_value msgBuffer = NapiUtils::CreateArrayBuffer(context->GetEnv(), len, &dataHandle); - if (dataHandle != nullptr) { - (void)memcpy_s(dataHandle, len, data, len); + napi_value msgBuffer = NapiUtils::CreateArrayBuffer(env, messageData->len, &dataHandle); + if (dataHandle == nullptr || !NapiUtils::ValueIsArrayBuffer(env, msgBuffer)) { + return MakeJsMessageParam(env, NapiUtils::GetUndefined(env), &messageData->remoteInfo); + } - SocketRemoteInfo remoteInfo; - std::string address = MakeAddressString(addr); - if (address.empty()) { - EmitError(context, ADDRESS_INVALID); - return; - } - remoteInfo.SetAddress(address); - remoteInfo.SetFamily(addr->sa_family); - if (addr->sa_family == AF_INET) { - auto *addr4 = reinterpret_cast(addr); - remoteInfo.SetPort(ntohs(addr4->sin_port)); - } else if (addr->sa_family == AF_INET6) { - auto *addr6 = reinterpret_cast(addr); - remoteInfo.SetPort(ntohs(addr6->sin6_port)); - } - remoteInfo.SetSize(len); + NETSTACK_LOGI("copy ret %{public}d", memcpy_s(dataHandle, messageData->len, messageData->data, messageData->len)); - napi_value obj = MakeJsMessageParam(context->GetEnv(), msgBuffer, &remoteInfo); - if (NapiUtils::GetValueType(context->GetEnv(), obj) != napi_object) { - return; - } + return MakeJsMessageParam(env, msgBuffer, &messageData->remoteInfo); +} + +static void OnRecvMessage(EventManager *manager, void *data, size_t len, sockaddr *addr) +{ + if (data == nullptr || len <= 0) { + return; + } - napi_value undefined = NapiUtils::GetUndefined(context->GetEnv()); - context->Emit(EVENT_MESSAGE, std::make_pair(undefined, obj)); + SocketRemoteInfo remoteInfo; + std::string address = MakeAddressString(addr); + if (address.empty()) { + manager->EmitByUv(EVENT_ERROR, new int32_t(ADDRESS_INVALID), CallbackTemplate); + return; + } + remoteInfo.SetAddress(address); + remoteInfo.SetFamily(addr->sa_family); + if (addr->sa_family == AF_INET) { + auto *addr4 = reinterpret_cast(addr); + remoteInfo.SetPort(ntohs(addr4->sin_port)); + } else if (addr->sa_family == AF_INET6) { + auto *addr6 = reinterpret_cast(addr); + remoteInfo.SetPort(ntohs(addr6->sin6_port)); } + remoteInfo.SetSize(len); + + manager->EmitByUv(EVENT_MESSAGE, new MessageData(data, len, remoteInfo), CallbackTemplate); } class MessageCallback { @@ -148,23 +202,30 @@ public: virtual ~MessageCallback() = default; - explicit MessageCallback(BaseContext *context) : context_(context) {} + explicit MessageCallback(EventManager *manager) : manager_(manager) {} + + virtual void OnError(int err) const = 0; - virtual void operator()(int sock, void *data, size_t dataLen, sockaddr *addr) const = 0; + virtual void OnMessage(int sock, void *data, size_t dataLen, sockaddr *addr) const = 0; protected: - BaseContext *context_; + EventManager *manager_; }; class TcpMessageCallback final : public MessageCallback { public: TcpMessageCallback() = delete; - ~TcpMessageCallback() = default; + ~TcpMessageCallback() override = default; - explicit TcpMessageCallback(BaseContext *context) : MessageCallback(context) {} + explicit TcpMessageCallback(EventManager *manager) : MessageCallback(manager) {} - void operator()(int sock, void *data, size_t dataLen, sockaddr *addr) const override + void OnError(int err) const override + { + manager_->EmitByUv(EVENT_ERROR, new int(err), CallbackTemplate); + } + + void OnMessage(int sock, void *data, size_t dataLen, sockaddr *addr) const override { (void)addr; @@ -183,7 +244,7 @@ public: if (ret < 0) { return; } - OnRecvMessage(context_, data, dataLen, reinterpret_cast(&addr4)); + OnRecvMessage(manager_, data, dataLen, reinterpret_cast(&addr4)); return; } else if (family == AF_INET6) { sockaddr_in6 addr6 = {0}; @@ -193,7 +254,7 @@ public: if (ret < 0) { return; } - OnRecvMessage(context_, data, dataLen, reinterpret_cast(&addr6)); + OnRecvMessage(manager_, data, dataLen, reinterpret_cast(&addr6)); return; } } @@ -203,13 +264,18 @@ class UdpMessageCallback final : public MessageCallback { public: UdpMessageCallback() = delete; - ~UdpMessageCallback() = default; + ~UdpMessageCallback() override = default; - explicit UdpMessageCallback(BaseContext *context) : MessageCallback(context) {} + explicit UdpMessageCallback(EventManager *manager) : MessageCallback(manager) {} - void operator()(int sock, void *data, size_t dataLen, sockaddr *addr) const override + void OnError(int err) const override { - OnRecvMessage(context_, data, dataLen, addr); + manager_->EmitByUv(EVENT_ERROR, new int(err), CallbackTemplate); + } + + void OnMessage(int sock, void *data, size_t dataLen, sockaddr *addr) const override + { + OnRecvMessage(manager_, data, dataLen, addr); } }; @@ -291,7 +357,7 @@ static bool PollSendData(int sock, const char *data, size_t size, sockaddr *addr return true; } -static bool PollRecvData(int sock, sockaddr *addr, socklen_t addrLen, const MessageCallback &callback) +static void PollRecvData(int sock, sockaddr *addr, socklen_t addrLen, const MessageCallback &callback) { int bufferSize = DEFAULT_BUFFER_SIZE; int opt = 0; @@ -303,6 +369,9 @@ static bool PollRecvData(int sock, sockaddr *addr, socklen_t addrLen, const Mess auto deleter = [](char *s) { free(reinterpret_cast(s)); }; std::unique_ptr buf(reinterpret_cast(malloc(bufferSize)), deleter); + auto addrDeleter = [](sockaddr *a) { free(reinterpret_cast(a)); }; + std::unique_ptr pAddr(addr, addrDeleter); + nfds_t num = 1; pollfd fds[1] = {{0}}; fds[0].fd = sock; @@ -311,12 +380,13 @@ static bool PollRecvData(int sock, sockaddr *addr, socklen_t addrLen, const Mess while (true) { int ret = poll(fds, num, DEFAULT_POLL_TIMEOUT); - if (ret == -1) { + if (ret < 0) { NETSTACK_LOGE("poll to recv failed %{public}s", strerror(errno)); - return false; + callback.OnError(errno); + return; } if (ret == 0) { - break; + continue; } (void)memset_s(buf.get(), bufferSize, 0, bufferSize); socklen_t tempAddrLen = addrLen; @@ -326,15 +396,20 @@ static bool PollRecvData(int sock, sockaddr *addr, socklen_t addrLen, const Mess continue; } NETSTACK_LOGE("recv failed %{public}s", strerror(errno)); - return false; + return; } if (recvLen == 0) { - break; + continue; } - callback(sock, buf.get(), recvLen, addr); - } - return true; + void *data = malloc(recvLen); + if (data == nullptr) { + callback.OnError(NO_MEMORY); + return; + } + NETSTACK_LOGI("copy ret = %{public}d", memcpy_s(data, recvLen, buf.get(), recvLen)); + callback.OnMessage(sock, data, recvLen, addr); + } } static bool NonBlockConnect(int sock, sockaddr *addr, socklen_t addrLen, uint32_t timeoutSec) @@ -499,9 +574,43 @@ bool ExecBind(BindContext *context) return false; } NETSTACK_LOGI("rebind success"); - return true; } NETSTACK_LOGI("bind success"); + + return true; +} + +bool ExecUdpBind(BindContext *context) +{ + if (!ExecBind(context)) { + return false; + } + + sockaddr_in addr4 = {0}; + sockaddr_in6 addr6 = {0}; + sockaddr *addr = nullptr; + socklen_t len; + GetAddr(&context->address, &addr4, &addr6, &addr, &len); + if (addr == nullptr) { + NETSTACK_LOGE("addr family error"); + context->SetErrorCode(ADDRESS_INVALID); + return false; + } + + if (addr->sa_family == AF_INET) { + auto pAddr4 = reinterpret_cast(malloc(sizeof(addr4))); + NETSTACK_LOGI("copy ret = %{public}d", memcpy_s(pAddr4, sizeof(addr4), &addr4, sizeof(addr4))); + std::thread serviceThread(PollRecvData, context->GetSocketFd(), pAddr4, sizeof(addr4), + UdpMessageCallback(context->GetManager())); + serviceThread.detach(); + } else if (addr->sa_family == AF_INET6) { + auto pAddr6 = reinterpret_cast(malloc(sizeof(addr6))); + NETSTACK_LOGI("copy ret = %{public}d", memcpy_s(pAddr6, sizeof(addr6), &addr6, sizeof(addr6))); + std::thread serviceThread(PollRecvData, context->GetSocketFd(), pAddr6, sizeof(addr6), + UdpMessageCallback(context->GetManager())); + serviceThread.detach(); + } + return true; } @@ -526,6 +635,11 @@ bool ExecUdpSend(UdpSendContext *context) return true; } +bool ExecTcpBind(BindContext *context) +{ + return ExecBind(context); +} + bool ExecConnect(ConnectContext *context) { sockaddr_in addr4 = {0}; @@ -546,6 +660,9 @@ bool ExecConnect(ConnectContext *context) } NETSTACK_LOGI("connect success"); + std::thread serviceThread(PollRecvData, context->GetSocketFd(), nullptr, 0, + TcpMessageCallback(context->GetManager())); + serviceThread.detach(); return true; } @@ -785,28 +902,6 @@ napi_value BindCallback(BindContext *context) napi_value UdpSendCallback(UdpSendContext *context) { - sa_family_t family; - socklen_t len = sizeof(family); - int ret = getsockname(context->GetSocketFd(), reinterpret_cast(&family), &len); - if (ret < 0) { - return NapiUtils::GetUndefined(context->GetEnv()); - } - if (family == AF_INET) { - sockaddr_in addr4 = {0}; - if (!PollRecvData(context->GetSocketFd(), reinterpret_cast(&addr4), sizeof(addr4), - UdpMessageCallback(context))) { - EmitError(context, errno); - } - return NapiUtils::GetUndefined(context->GetEnv()); - } else if (family == AF_INET6) { - sockaddr_in6 addr6 = {0}; - if (!PollRecvData(context->GetSocketFd(), reinterpret_cast(&addr6), sizeof(addr6), - UdpMessageCallback(context))) { - EmitError(context, errno); - } - return NapiUtils::GetUndefined(context->GetEnv()); - } - return NapiUtils::GetUndefined(context->GetEnv()); } @@ -814,17 +909,11 @@ napi_value ConnectCallback(ConnectContext *context) { context->Emit(EVENT_CONNECT, std::make_pair(NapiUtils::GetUndefined(context->GetEnv()), NapiUtils::GetUndefined(context->GetEnv()))); - if (!PollRecvData(context->GetSocketFd(), nullptr, 0, TcpMessageCallback(context))) { - EmitError(context, errno); - } return NapiUtils::GetUndefined(context->GetEnv()); } napi_value TcpSendCallback(TcpSendContext *context) { - if (!PollRecvData(context->GetSocketFd(), nullptr, 0, TcpMessageCallback(context))) { - EmitError(context, errno); - } return NapiUtils::GetUndefined(context->GetEnv()); } diff --git a/frameworks/js/napi/socket/socket_module/src/socket_module.cpp b/frameworks/js/napi/socket/socket_module/src/socket_module.cpp index f2f761325d2cd1aa47c4e0b1c9025bb7c3638aa5..6699e25f6da21a9b835255df499255dbea5db0a0 100644 --- a/frameworks/js/napi/socket/socket_module/src/socket_module.cpp +++ b/frameworks/js/napi/socket/socket_module/src/socket_module.cpp @@ -181,7 +181,7 @@ void SocketModuleExports::InitSocketProperties(napi_env env, napi_value exports) /* udp async works */ napi_value SocketModuleExports::UDPSocket::Bind(napi_env env, napi_callback_info info) { - return SOCKET_INTERFACE(BindContext, ExecBind, BindCallback, MakeUdpSocket, BIND, UDP_BIND_NAME); + return SOCKET_INTERFACE(BindContext, ExecUdpBind, BindCallback, MakeUdpSocket, BIND, UDP_BIND_NAME); } napi_value SocketModuleExports::UDPSocket::Send(napi_env env, napi_callback_info info) @@ -219,7 +219,7 @@ napi_value SocketModuleExports::UDPSocket::Off(napi_env env, napi_callback_info /* tcp async works */ napi_value SocketModuleExports::TCPSocket::Bind(napi_env env, napi_callback_info info) { - return SOCKET_INTERFACE(BindContext, ExecBind, BindCallback, MakeTcpSocket, BIND, TCP_BIND_NAME); + return SOCKET_INTERFACE(BindContext, ExecTcpBind, BindCallback, MakeTcpSocket, BIND, TCP_BIND_NAME); } napi_value SocketModuleExports::TCPSocket::Connect(napi_env env, napi_callback_info info) diff --git a/frameworks/js/napi/socket/task_queue/src/task_queue.cpp b/frameworks/js/napi/socket/task_queue/src/task_queue.cpp index 5ed466dc22ae36e593752f637ebac38dc1daed94..9ecce01761030f4b55ccecc6067d37a05ec1f8e7 100644 --- a/frameworks/js/napi/socket/task_queue/src/task_queue.cpp +++ b/frameworks/js/napi/socket/task_queue/src/task_queue.cpp @@ -24,7 +24,7 @@ namespace OHOS::NetStack::Task { class Task { public: - Task() = delete; + Task() : executor(nullptr), callback(nullptr), data(nullptr), priority_(TaskPriority::CLOSE) {} Task(TaskPriority priority, AsyncWorkExecutor exec, AsyncWorkCallback call, void *d) : executor(exec), callback(call), data(d), priority_(priority) @@ -50,30 +50,36 @@ std::priority_queue g_taskExecutorQueue; /* NOLINT */ std::priority_queue g_taskCallbackQueue; /* NOLINT */ -std::mutex g_mutex; +std::mutex EXEC_MUTEX; + +std::mutex CALLBACK_MUTEX; void Executor(napi_env env, void *data) { - std::lock_guard lock(g_mutex); - - if (g_taskExecutorQueue.empty()) { - NETSTACK_LOGI("queue is empty"); - return; - } + Task task; + do { + std::lock_guard lock(EXEC_MUTEX); + + if (g_taskExecutorQueue.empty()) { + NETSTACK_LOGI("g_taskExecutorQueue is empty"); + return; + } + task = g_taskExecutorQueue.top(); + g_taskExecutorQueue.pop(); + } while (false); auto context = static_cast(data); context->SetExecOK(true); + if (task.executor && task.data) { + task.executor(env, task.data); + } - Task task = g_taskExecutorQueue.top(); - g_taskExecutorQueue.pop(); - task.executor(env, task.data); + std::lock_guard lock(CALLBACK_MUTEX); g_taskCallbackQueue.push(task); } void Callback(napi_env env, napi_status status, void *data) { - std::lock_guard lock(g_mutex); - (void)status; auto deleter = [](BaseContext *context) { delete context; }; @@ -85,18 +91,26 @@ void Callback(napi_env env, napi_status status, void *data) again->CreateAsyncWork(context->GetAsyncWorkName(), Executor, Callback); } - if (g_taskCallbackQueue.empty()) { - return; - } + Task task; + do { + std::lock_guard lock(CALLBACK_MUTEX); - Task task = g_taskCallbackQueue.top(); - g_taskCallbackQueue.pop(); - task.callback(env, napi_ok, task.data); + if (g_taskCallbackQueue.empty()) { + NETSTACK_LOGI("g_taskCallbackQueue is empty"); + return; + } + task = g_taskCallbackQueue.top(); + g_taskCallbackQueue.pop(); + } while (false); + + if (task.callback && task.data) { + task.callback(env, napi_ok, task.data); + } } void PushTask(TaskPriority priority, AsyncWorkExecutor executor, AsyncWorkCallback callback, void *data) { - std::lock_guard lock(g_mutex); + std::lock_guard lock(EXEC_MUTEX); g_taskExecutorQueue.push(Task(priority, executor, callback, data)); } diff --git a/utils/napi_utils/src/netstack_napi_utils.cpp b/utils/napi_utils/src/netstack_napi_utils.cpp index 171c66c533259ad17a4c5f9b9151bae277dee286..77abd6540eedbea087392866d4db28a6e9791506 100644 --- a/utils/napi_utils/src/netstack_napi_utils.cpp +++ b/utils/napi_utils/src/netstack_napi_utils.cpp @@ -198,6 +198,9 @@ void SetStringPropertyUtf8(napi_env env, napi_value object, const std::string &n /* array buffer */ bool ValueIsArrayBuffer(napi_env env, napi_value value) { + if (value == nullptr) { + return false; + } bool isArrayBuffer = false; NAPI_CALL_BASE(env, napi_is_arraybuffer(env, value, &isArrayBuffer), false); return isArrayBuffer;