From 6007a425b5803c3a8764e87c655ebc122cb93ef9 Mon Sep 17 00:00:00 2001 From: lexiaoyao Date: Tue, 25 Nov 2025 22:11:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0nats=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=AF=B9=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + CMakeLists.txt | 9 +++++++ include/NatsConnect.hpp | 59 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 include/NatsConnect.hpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..796b96d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/build diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..8a34bc0 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 3.16) +project(NatsConnect) +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +add_executable(${PROJECT_NAME} test.cpp) +target_link_directories(${PROJECT_NAME} PUBLIC lib) +target_link_libraries(${PROJECT_NAME} PRIVATE nats) +target_include_directories(${PROJECT_NAME} PUBLIC include) diff --git a/include/NatsConnect.hpp b/include/NatsConnect.hpp new file mode 100644 index 0000000..084d273 --- /dev/null +++ b/include/NatsConnect.hpp @@ -0,0 +1,59 @@ +#pragma once + +#include + +#include + +typedef void(*cb)(const char*, size_t); +class NatsConnect { + natsConnection* m_con = nullptr; + cb m_cb; +public: + + NatsConnect() = default; + explicit NatsConnect(const char* url) { connect(url); } + ~NatsConnect(); + + void connect(const char* url) noexcept; + void disconnect() noexcept; + void reconnect() noexcept; + void publish(const char* subject, const char* data) noexcept; + operator void*() const { return m_con; } + + void subscribe(const char* subject, cb f) noexcept; + static void handle(const char* msg, size_t len, NatsConnect* obj); +}; + + +inline NatsConnect::~NatsConnect() { + if (m_con) { + natsConnection_Close(m_con); + natsConnection_Destroy(m_con); + } +} + +inline void NatsConnect::connect(const char* url) noexcept { + nats_CheckCompatibility(); + const natsStatus s = natsConnection_ConnectTo(&m_con, url); + if (s != NATS_OK) { + printf("Error: %s\n", natsStatus_GetText(s)); + return; + } +} + +inline void NatsConnect::subscribe(const char *subject, cb f) noexcept { + m_cb = f; + natsSubscription *sub = nullptr; + const natsStatus s = natsConnection_Subscribe(&sub, m_con, subject, [](natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { + handle(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), (NatsConnect*)closure); + }, this); + if (s != NATS_OK) { + printf("Error: %s\n", natsStatus_GetText(s)); + return; + } + natsSubscription_SetPendingLimits(sub, 1024, 1024); +} + +inline void NatsConnect::handle(const char *msg, size_t len, NatsConnect *obj) { + obj->m_cb(msg, len); +}