Files
domo-iot/src/broker/nats-broker.cpp

147 lines
4.2 KiB
C++

/*!
* nats-broker.cpp
*
* Copyright (c) 2015-2019, NADAL Jean-Baptiste. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*
* @Author: NADAL Jean-Baptiste
* @Date: 13/11/2019
*
*/
// This is an independent project of an individual developer. Dear PVS-Studio, please check it.
// PVS-Studio Static Code Analyzer for C, C++, C#, and Java: http://www.viva64.com
/*------------------------------- INCLUDES ----------------------------------*/
#include <adapters/libuv.h>
#include "nats-broker.h"
#include <nats.h>
#define kNatsServerURL "nats.nadal-fr.com"
static void onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
/*! ----------------------------------------------------------------------------
* @fn NatsBroker
*
* @brief Constructor of the Nats Broker.
*/
NatsBroker::NatsBroker(void) : m_conn(NULL), m_opts(NULL), m_sub(NULL)
{
}
/*! ----------------------------------------------------------------------------
* @fn ~NatsBroker
*
* @brief Destructor of the Nats.
*/
NatsBroker::~NatsBroker(void)
{
}
/*! ----------------------------------------------------------------------------
* @fn setup
*
* @brief Setup the Broker.
*/
int NatsBroker::setup(uv_loop_t *an_evt_loop)
{
natsStatus the_status;
nats_Open(-1);
// One time initialization of things that we need.
natsLibuv_Init();
// Libuv is not thread-safe. Almost all calls to libuv need to
// occur from the thread where the loop is running. NATS library
// may have to call into the event loop from different threads.
// This call allows natsLibuv APIs to know if they are executing
// from the event loop thread or not.
natsLibuv_SetThreadLocalLoop(an_evt_loop);
if (natsOptions_Create(&m_opts) != NATS_OK)
return -1;
if (natsOptions_UseGlobalMessageDelivery(m_opts, true) != NATS_OK)
return -2;
// Indicate which loop and callbacks to use once connected.
if (natsOptions_SetEventLoop(m_opts, (void *)an_evt_loop,
natsLibuv_Attach,
natsLibuv_Read,
natsLibuv_Write,
natsLibuv_Detach) != NATS_OK)
return -3;
if (natsOptions_SetURL(m_opts, kNatsServerURL) != NATS_OK)
return -4;
return 0;
}
/*! ----------------------------------------------------------------------------
* @fn terminate
*
* @brief Terminate the Broker.
*/
int NatsBroker::terminate(void)
{
// Destroy all our objects to avoid report of memory leak
if (m_sub != NULL)
natsSubscription_Destroy(m_sub);
if (m_conn != NULL)
natsConnection_Destroy(m_conn);
if (m_opts != NULL)
natsOptions_Destroy(m_opts);
return 0;
}
/*! ----------------------------------------------------------------------------
* @fn connect
*
* @brief Ccnnect to the Broker server.
*/
int NatsBroker::connect(void)
{
if (natsConnection_Connect(&m_conn, m_opts) != NATS_OK)
return -1;
if (natsConnection_Subscribe(&m_sub, m_conn, "foo", onMsg, NULL) != NATS_OK)
return -2;
// For maximum performance, set no limit on the number of pending messages.
if (natsSubscription_SetPendingLimits(m_sub, -1, -1)!= NATS_OK)
return -3;
return 0;
}