diff --git a/src/lib/libubus-cpp/builders/cmake/CMakeLists.txt b/src/lib/libubus-cpp/builders/cmake/CMakeLists.txt index 3100dc59..65b5b3bb 100644 --- a/src/lib/libubus-cpp/builders/cmake/CMakeLists.txt +++ b/src/lib/libubus-cpp/builders/cmake/CMakeLists.txt @@ -5,6 +5,10 @@ project (libubus-cpp) set (CMAKE_MODULE_PATH "${MODULE_PATH}") set (CMAKE_CXX_STANDARD 11) +set(UBUS_MAX_MSGLEN 1048576) +add_definitions(-DUBUS_MAX_MSGLEN=${UBUS_MAX_MSGLEN}) +add_definitions(-DUBUS_UNIX_SOCKET="/tmp/ubus.sock") + include (br) include_directories (${workspaceRoot}/src/lib/libubus-cpp/include/) @@ -15,6 +19,7 @@ file( ${workspaceRoot}/src/lib/libubus-cpp/src/ubus-call.cpp ${workspaceRoot}/src/lib/libubus-cpp/src/ubus-object.cpp ${workspaceRoot}/src/lib/libubus-cpp/src/uloop-timer.cpp + ${workspaceRoot}/src/lib/libubus-cpp/src/ubus-sync.c ) add_library( diff --git a/src/lib/libubus-cpp/src/ubus-call.cpp b/src/lib/libubus-cpp/src/ubus-call.cpp index 392b0724..8b964e94 100644 --- a/src/lib/libubus-cpp/src/ubus-call.cpp +++ b/src/lib/libubus-cpp/src/ubus-call.cpp @@ -1,5 +1,5 @@ /*! - * UbusCall.cpp + * ubus-call.cpp * * Copyright (c) 2015-2018, NADAL Jean-Baptiste. All rights reserved. * @@ -30,13 +30,18 @@ extern "C" { #include } +extern "C" { +#include "ubus-sync.h" +} + #include "common.h" #include "ubus-cpp/ubus-call.h" #define kDefaultTimeoutInSecond 5 +// #define UBUS_PROFILE 1 -static void receive_call_result_data (struct ubus_request *aReq, int aType, struct blob_attr *aMsg); +static void receive_call_result_data (struct ubus_request *a_req, int a_type, struct blob_attr *a_msg); /*! ---------------------------------------------------------------------------- * @fn UBusCall @@ -57,48 +62,21 @@ m_timeout(kDefaultTimeoutInSecond) int UBusCall::exec (const std::string &a_path, const std::string &a_method, const std::string &a_parameter, std::string &a_result) { - int the_ret = 0; - uint32_t the_id; - - struct blob_buf the_buf = { 0 }; - struct ubus_context *the_ctx = ubus_connect(NULL); - - if (the_ctx == NULL) { - - fprintf(stderr, "%s - Failed to create an ubus context.\n", __PRETTY_FUNCTION__); - return -1; - } - - blob_buf_init(&the_buf, 0); - - if (!a_parameter.empty()) { - - if (!blobmsg_add_json_from_string(&the_buf, a_parameter.c_str())) { - - fprintf(stderr, "%s - Failed to parse message data\n", __PRETTY_FUNCTION__); - return -1; - } - } - - the_ret = ubus_lookup_id(the_ctx, a_path.c_str(), &the_id); - if (the_ret) { - - fprintf(stderr, "%s - ubus_lookup_id error = '%d'\n", __PRETTY_FUNCTION__, the_ret); - ubus_free(the_ctx); - blob_buf_free(&the_buf); - return the_ret; - } - - the_ret = ubus_invoke(the_ctx, the_id, a_method.c_str(), the_buf.head, receive_call_result_data, this, m_timeout * 1000); - - ubus_free(the_ctx); - blob_buf_free(&the_buf); + int the_ret = 0; +#ifdef UBUS_PROFILE + struct timeval the_start_request, the_current_timeval; + gettimeofday(&the_start_request, NULL); +#endif + the_ret = ubus_sync_exec(a_path.c_str(), a_method.c_str(), a_parameter.c_str(), receive_call_result_data, this); if (!m_data.empty()) { - - a_result = m_data; + a_result = m_data.c_str(); } +#ifdef UBUS_PROFILE + gettimeofday(&the_current_timeval, NULL); + printf("ubus_exec [%s]-[%s] duree: %.0lf ms\n", a_path.c_str(), a_method.c_str(), time_diff(the_start_request, the_current_timeval) / 1000); +#endif return the_ret; } @@ -120,14 +98,14 @@ int UBusCall::set_result (const std::string &a_result) * * @brief UBus received call back. */ -static void receive_call_result_data (struct ubus_request *aReq, int aType, struct blob_attr *aMsg) +static void receive_call_result_data (struct ubus_request *a_req, int a_type, struct blob_attr *a_msg) { - UNUSED_PARAMETER (aType); + UNUSED_PARAMETER (a_type); char *the_str; - UBusCall *anUBusObject = static_cast(aReq->priv); - the_str = blobmsg_format_json(aMsg, true); + UBusCall *an_ubus_object = static_cast(a_req->priv); + the_str = blobmsg_format_json(a_msg, true); - anUBusObject->set_result(std::string(the_str)); + an_ubus_object->set_result(std::string(the_str)); free(the_str); } diff --git a/src/lib/libubus-cpp/src/ubus-sync.c b/src/lib/libubus-cpp/src/ubus-sync.c new file mode 100644 index 00000000..b88c2465 --- /dev/null +++ b/src/lib/libubus-cpp/src/ubus-sync.c @@ -0,0 +1,1258 @@ +/*! + * (C) Copyright 2003-2018 Awox SA. All rights reserved. + * This work contains confidential trade secrets of Awox. + * Use, examination, copying, transfer and disclosure to others + * are prohibited, except with the express written agreement of Awox. + * + * @Author: Awox + * @Date: 09/02/2018 + */ + +/*------------------------------- INCLUDES ----------------------------------*/ + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include + +#include // read +#include +#include + +#include + +#include "ubus-sync.h" + + +struct ubus_sync_context { + uint16_t request_seq; + int fd; + bool eof; + uint32_t local_id; + struct ubus_msghdr_buf msgbuf; + uint32_t msgbuf_data_len; + struct list_head requests; + struct avl_tree objects; +}; + +#define STATIC_IOV(_var) { .iov_base = (char *) &(_var), .iov_len = sizeof(_var) } + +struct blob_buf b = {}; + +static const struct blob_attr_info ubus_policy[UBUS_ATTR_MAX] = { + [UBUS_ATTR_STATUS] = { .type = BLOB_ATTR_INT32 }, + [UBUS_ATTR_OBJID] = { .type = BLOB_ATTR_INT32 }, + [UBUS_ATTR_OBJPATH] = { .type = BLOB_ATTR_STRING }, + [UBUS_ATTR_METHOD] = { .type = BLOB_ATTR_STRING }, + [UBUS_ATTR_ACTIVE] = { .type = BLOB_ATTR_INT8 }, + [UBUS_ATTR_NO_REPLY] = { .type = BLOB_ATTR_INT8 }, + [UBUS_ATTR_SUBSCRIBERS] = { .type = BLOB_ATTR_NESTED }, +}; + +static struct blob_attr *attrbuf[UBUS_ATTR_MAX]; + +struct ubus_pending_data { + struct list_head list; + int type; + struct blob_attr data[]; +}; + +/* --------------------------------------------------------------------------- */ + +void aw_ubus_handle_data (struct ubus_sync_context *ctx, unsigned int events); +static bool aw_get_next_msg (struct ubus_sync_context *ctx, int *recv_fd); +static int recv_retry (int fd, struct iovec *iov, bool wait, int *recv_fd); +static void aw_ubus_set_req_status (struct ubus_request *req, int ret); +static void aw_ubus_process_req_data (struct ubus_request *req, struct ubus_msghdr_buf *buf); +static void aw_ubus_req_complete_cb (struct ubus_request *req); +static void req_data_cb (struct ubus_request *req, int type, struct blob_attr *data); + +/* --------------------------------------------------------------------------- */ + + +double time_diff(struct timeval x , struct timeval y) +{ + double x_ms , y_ms , diff; + + x_ms = (double)x.tv_sec*1000000 + (double)x.tv_usec; + y_ms = (double)y.tv_sec*1000000 + (double)y.tv_usec; + + diff = (double)y_ms - (double)x_ms; + + return diff; +} + +/*! ---------------------------------------------------------------------------- + * @fn rows_eq + * + * @brief check if a row is eq or not. + */ +int rows_eq (int *a, int *b) +{ + int i; + + for (i=0; i<16; i++) { + if (a[i] != b[i]) { + return 0; + } + } + + return 1; +} + + +/*! ---------------------------------------------------------------------------- + * @fn dump_row + * + * @brief dump row + */ +void dump_row (long a_count, int a_numinrow, int *a_chs) +{ + int i; + + printf("%08lX:", a_count - a_numinrow); + + if (a_numinrow > 0) { + + for (i = 0; i < a_numinrow; i++) { + if (i == 8) { + printf(" :"); + } + printf(" %02X", a_chs[i]); + } + + for (i = a_numinrow; i < 16; i++) { + if (i == 8) { + printf(" :"); + } + printf(" "); + } + printf(" "); + for (i = 0; i < a_numinrow; i++) { + if (isprint(a_chs[i])) { + printf("%c", a_chs[i]); + } else { + printf("."); + } + } + } + printf("\n"); +} + + +/*! ---------------------------------------------------------------------------- + * @fn dump + * + * @brief dump on the console a memory block. + */ +void dump (void const *a_buffer, size_t a_len) +{ + unsigned char *the_buf = (unsigned char *) a_buffer; + long the_count = 0; + int the_numinrow = 0; + int the_chs[16]; + int the_oldchs[16] = { 0 }; + int the_showed_dots = 0; + size_t i; + + for (i = 0; i < a_len; i++) { + int the_ch = the_buf[i]; + + if (the_numinrow == 16) { + + int j; + + if (rows_eq(the_oldchs, the_chs)) { + if (!the_showed_dots) { + the_showed_dots = 1; + printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); + } + } else { + the_showed_dots = 0; + dump_row(the_count, the_numinrow, the_chs); + } + + for (j=0; j<16; j++) { + the_oldchs[j] = the_chs[j]; + } + + the_numinrow = 0; + } + + the_count++; + the_chs[the_numinrow++] = the_ch; + } + + dump_row(the_count, the_numinrow, the_chs); + + if (the_numinrow != 0) { + printf("%08lX:\n", the_count); + } +} + +/* --------------------------------------------------------------------------- */ + +struct blob_attr **aw_ubus_parse_msg (struct blob_attr *msg) +{ + int the_ret; + // printf ("==> aw_ubus_parse_msg\n"); + the_ret = blob_parse(msg, attrbuf, ubus_policy, UBUS_ATTR_MAX); + // printf ("<== aw_ubus_parse_msg the_ret: %d\n", the_ret); + return attrbuf; +} + + +/* --------------------------------------------------------------------------- */ + +static void ubus_lookup_id_cb (struct ubus_request *req, int type, struct blob_attr *msg) +{ + struct blob_attr **attr; + uint32_t *id = req->priv; + + // printf ("==> ubus_lookup_id_cb\n"); + attr = aw_ubus_parse_msg(msg); + + if (!attr[UBUS_ATTR_OBJID]) + return; + + *id = blob_get_u32(attr[UBUS_ATTR_OBJID]); + // printf ("<== ubus_lookup_id_cb (%d)\n", *id); +} + +/* --------------------------------------------------------------------------- */ + +static void aw_ubus_sync_req_cb (struct ubus_request *req, int ret) +{ + // printf ("==> aw_ubus_sync_req_cb\n"); + req->status_msg = true; + req->status_code = ret; + // uloop_end(); +} + + +/* --------------------------------------------------------------------------- */ + +static bool aw_ubus_validate_hdr (struct ubus_msghdr *hdr) +{ + struct blob_attr *data = (struct blob_attr *) (hdr + 1); + // printf ("==> aw_ubus_validate_hdr\n"); + + if (hdr->version != 0) + return false; + + if (blob_raw_len(data) < sizeof(*data)) + return false; + + if (blob_pad_len(data) > UBUS_MAX_MSGLEN) + return false; + + return true; +} + +/* --------------------------------------------------------------------------- */ + +static int ubus_find_notify_id (struct ubus_notify_request *n, uint32_t objid) +{ + uint32_t pending = n->pending; + int i; + // printf ("==> ubus_find_notify_id\n"); + + for (i = 0; pending; i++, pending >>= 1) { + if (!(pending & 1)) + continue; + + if (n->id[i] == objid) + return i; + } + + return -1; +} + +/* --------------------------------------------------------------------------- */ + +static struct ubus_request *ubus_find_request (struct ubus_sync_context *ctx, uint32_t seq, uint32_t peer, int *id) +{ + struct ubus_request *req; + // printf ("==> ubus_find_request ctx: %p, seq: %d peer:%d\n", ctx, seq, peer); + + list_for_each_entry(req, &ctx->requests, list) { + // printf (". req = %p seq: %d next: %p peer: %d\n", req, req->seq, req->list.next, req->peer); + struct ubus_notify_request *nreq; + nreq = container_of(req, struct ubus_notify_request, req); + + if (seq != req->seq) + continue; + + if (req->notify) { + + if (!nreq->pending) + continue; + + *id = ubus_find_notify_id(nreq, peer); + if (*id < 0) + continue; + } else if (peer != req->peer) + continue; + + // printf ("<== ubus_find_request found: %p\n", req); + return req; + } + // printf ("<== ubus_find_request NULL\n"); + return NULL; +} + +/* --------------------------------------------------------------------------- */ + +static bool aw_ubus_get_status (struct ubus_msghdr_buf *buf, int *ret) +{ + struct blob_attr **attrbuf = aw_ubus_parse_msg(buf->data); + // printf ("==> aw_ubus_get_status\n"); + + if (!attrbuf[UBUS_ATTR_STATUS]) + return false; + + *ret = blob_get_u32(attrbuf[UBUS_ATTR_STATUS]); + return true; +} + +/* --------------------------------------------------------------------------- */ + +static void ubus_process_notify_status (struct ubus_request *req, int id, struct ubus_msghdr_buf *buf) +{ + struct ubus_notify_request *nreq; + struct blob_attr **tb; + struct blob_attr *cur; + int rem, idx = 1; + int ret = 0; + // printf ("==> ubus_process_notify_status\n"); + + nreq = container_of(req, struct ubus_notify_request, req); + nreq->pending &= ~(1 << id); + + if (!id) { + /* first id: ubusd's status message with a list of ids */ + tb = aw_ubus_parse_msg(buf->data); + if (tb[UBUS_ATTR_SUBSCRIBERS]) { + blob_for_each_attr(cur, tb[UBUS_ATTR_SUBSCRIBERS], rem) { + if (!blob_check_type(blob_data(cur), blob_len(cur), BLOB_ATTR_INT32)) + continue; + + nreq->pending |= (1 << idx); + nreq->id[idx] = blob_get_int32(cur); + idx++; + + if (idx == UBUS_MAX_NOTIFY_PEERS + 1) + break; + } + } + } else { + aw_ubus_get_status(buf, &ret); + if (nreq->status_cb) + nreq->status_cb(nreq, id, ret); + } + + if (!nreq->pending) + aw_ubus_set_req_status(req, 0); +} + +/* --------------------------------------------------------------------------- */ + +static int ubus_process_req_status (struct ubus_request *req, struct ubus_msghdr_buf *buf) +{ + int ret = UBUS_STATUS_INVALID_ARGUMENT; + // printf ("==> ubus_process_req_status\n"); + + aw_ubus_get_status(buf, &ret); + req->peer = buf->hdr.peer; + aw_ubus_set_req_status(req, ret); + + return ret; +} + +/* --------------------------------------------------------------------------- */ + +void ubus_process_req_msg (struct ubus_sync_context *ctx, struct ubus_msghdr_buf *buf, int fd) +{ + struct ubus_msghdr *hdr = &buf->hdr; + struct ubus_request *req; + int id = -1; + // printf ("==> ubus_process_req_msg\n"); + switch(hdr->type) { + case UBUS_MSG_STATUS: + // printf (" - type: UBUS_MSG_STATUS\n"); + req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id); + if (!req) + break; + + if (fd >= 0) { + if (req->fd_cb) + req->fd_cb(req, fd); + else + close(fd); + } + + if (id >= 0) + ubus_process_notify_status(req, id, buf); + else + ubus_process_req_status(req, buf); + + break; + + case UBUS_MSG_DATA: + //printf (" - type: UBUS_MSG_DATA req (%p)\n", req); + req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id); + if (req && (req->data_cb || req->raw_data_cb)) + aw_ubus_process_req_data(req, buf); + break; + } +} + +/* --------------------------------------------------------------------------- */ + +static void __ubus_process_req_data (struct ubus_request *req) +{ + struct ubus_pending_data *data; + // printf ("==> __ubus_process_req_data\n"); + + while (!list_empty(&req->pending)) { + data = list_first_entry(&req->pending, + struct ubus_pending_data, list); + list_del(&data->list); + if (!req->cancelled) + req_data_cb(req, data->type, data->data); + free(data); + } +} + +/* --------------------------------------------------------------------------- */ + +static void req_data_cb (struct ubus_request *req, int type, struct blob_attr *data) +{ + struct blob_attr **attr; + // printf ("==> req_data_cb\n"); + if (req->raw_data_cb) + req->raw_data_cb(req, type, data); + + if (!req->data_cb) + return; + + attr = aw_ubus_parse_msg(data); + req->data_cb(req, type, attr[UBUS_ATTR_DATA]); +} + +/* --------------------------------------------------------------------------- */ + +static void aw_ubus_process_req_data (struct ubus_request *req, struct ubus_msghdr_buf *buf) +{ + struct ubus_pending_data *data; + int len; + // printf ("==> aw_ubus_process_req_data\n"); + + if (!req->blocked) { + req->blocked = true; + req_data_cb(req, buf->hdr.type, buf->data); + __ubus_process_req_data(req); + req->blocked = false; + + if (req->status_msg) + aw_ubus_req_complete_cb(req); + + return; + } + + len = blob_raw_len(buf->data); + data = calloc(1, sizeof(*data) + len); + if (!data) + return; + + data->type = buf->hdr.type; + memcpy(data->data, buf->data, len); + list_add(&data->list, &req->pending); +} + +/* --------------------------------------------------------------------------- */ + +void ubus_process_obj_msg (struct ubus_sync_context *ctx, struct ubus_msghdr_buf *buf) +{ + void (*cb)(struct ubus_sync_context *, struct ubus_msghdr *, + struct ubus_object *, struct blob_attr **); + struct ubus_msghdr *hdr = &buf->hdr; + struct blob_attr **attrbuf; + struct ubus_object *obj; + uint32_t objid; + void *prev_data = NULL; + // printf ("==> ubus_process_obj_msg... (TODO)\n"); + attrbuf = aw_ubus_parse_msg(buf->data); + if (!attrbuf[UBUS_ATTR_OBJID]) + return; + + objid = blob_get_u32(attrbuf[UBUS_ATTR_OBJID]); + obj = avl_find_element(&ctx->objects, &objid, obj, avl); + + switch (hdr->type) { + case UBUS_MSG_INVOKE: + printf ("==> UBUS_MSG_INVOKE (TODO)\n"); + //cb = ubus_process_invoke; + break; + case UBUS_MSG_UNSUBSCRIBE: + printf ("==> UBUS_MSG_UNSUBSCRIBE (TODO)\n"); + //cb = ubus_process_unsubscribe; + break; + case UBUS_MSG_NOTIFY: + printf ("==> UBUS_MSG_NOTIFY (TODO)\n"); + //cb = ubus_process_notify; + break; + default: + return; + } + + if (buf == &ctx->msgbuf) { + prev_data = buf->data; + buf->data = NULL; + } + + //cb(ctx, hdr, obj, attrbuf); + + if (prev_data) { + if (buf->data) + free(prev_data); + else + buf->data = prev_data; + } +} + +/* --------------------------------------------------------------------------- */ + +void ubus_process_msg (struct ubus_sync_context *ctx, struct ubus_msghdr_buf *buf, int fd) +{ + // printf ("==> ubus_process_msg : %d\n", buf->hdr.type); + switch(buf->hdr.type) { + case UBUS_MSG_STATUS: + case UBUS_MSG_DATA: + ubus_process_req_msg(ctx, buf, fd); + break; + + case UBUS_MSG_INVOKE: + case UBUS_MSG_UNSUBSCRIBE: + case UBUS_MSG_NOTIFY: + //if (ctx->stack_depth) { + // ubus_queue_msg(ctx, buf); + // break; + //} + + ubus_process_obj_msg(ctx, buf); + break; + } +} + +/* --------------------------------------------------------------------------- */ + +static int64_t aw_get_time_msec (void) +{ + struct timespec ts; + int64_t val; + + clock_gettime(CLOCK_MONOTONIC, &ts); + val = (int64_t) ts.tv_sec * 1000LL; + val += ts.tv_nsec / 1000000LL; + return val; +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_reconnect (struct ubus_sync_context *ctx, const char *path) +{ + // printf ("==> aw_ubus_reconnect\n"); + struct { + struct ubus_msghdr hdr; + struct blob_attr data; + } hdr; + struct blob_attr *buf; + int ret = UBUS_STATUS_UNKNOWN_ERROR; + + if (!path) + path = UBUS_UNIX_SOCKET; + + /*if (ctx->sock.fd >= 0) { + if (ctx->sock.registered) + uloop_fd_delete(&ctx->sock); + + close(ctx->sock.fd); + }*/ + + ctx->fd = usock(USOCK_UNIX, path, NULL); + if (ctx->fd < 0) + return UBUS_STATUS_CONNECTION_FAILED; + + if (read(ctx->fd, &hdr, sizeof(hdr)) != sizeof(hdr)) + goto out_close; + + if (!aw_ubus_validate_hdr(&hdr.hdr)) + goto out_close; + + if (hdr.hdr.type != UBUS_MSG_HELLO) + goto out_close; + + buf = calloc(1, blob_raw_len(&hdr.data)); + if (!buf) + goto out_close; + + memcpy(buf, &hdr.data, sizeof(hdr.data)); + if (read(ctx->fd, blob_data(buf), blob_len(buf)) != blob_len(buf)) + goto out_free; + + ctx->local_id = hdr.hdr.peer; + if (!ctx->local_id) + goto out_free; + + ret = UBUS_STATUS_OK; + //fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC); + // AWOX M2: Should add F_SETFD: + fcntl(ctx->fd, F_SETFL, fcntl(ctx->fd, F_GETFL) | O_NONBLOCK); + + // ubus_refresh_state(ctx); + +out_free: + free(buf); +out_close: + if (ret) + close(ctx->fd); + + return ret; +} + +/* --------------------------------------------------------------------------- */ + +static int aw_ubus_connect (struct ubus_sync_context *ctx, const char *path) +{ + // printf ("==> aw_ubus_connect ctx: %p\n", ctx); + ctx->fd = -1; + //ctx->sock.cb = ubus_handle_data; + //ctx->connection_lost = ubus_default_connection_lost; + //ctx->pending_timer.cb = ubus_process_pending_msg; + + ctx->msgbuf.data = calloc(UBUS_MSG_CHUNK_SIZE, sizeof(char)); + if (!ctx->msgbuf.data) + return -1; + ctx->msgbuf_data_len = UBUS_MSG_CHUNK_SIZE; + + INIT_LIST_HEAD(&ctx->requests); + //INIT_LIST_HEAD(&ctx->pending); + //avl_init(&ctx->objects, ubus_cmp_id, false, NULL); + if (aw_ubus_reconnect(ctx, path)) { + free(ctx->msgbuf.data); + return -1; + } + + return 0; +} + +/* --------------------------------------------------------------------------- */ + +struct ubus_sync_context *aw_new_ubus_connect (void) +{ + struct ubus_sync_context *ctx; + // printf ("==> aw_new_ubus_connect\n"); + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) + return NULL; + + if (aw_ubus_connect(ctx, NULL)) { + free(ctx); + ctx = NULL; + } + + return ctx; +} + +/* --------------------------------------------------------------------------- */ + +void aw_ubus_free (struct ubus_sync_context *ctx) +{ + // printf ("aw_ubus_free ctx: %p\n", ctx); + blob_buf_free(&b); + close(ctx->fd); + free(ctx->msgbuf.data); + free(ctx); +} + + +/* --------------------------------------------------------------------------- */ + +static void aw_wait_data (int fd, bool write) +{ + struct pollfd pfd = { .fd = fd }; + // printf ("==>aw_wait_data\n"); + pfd.events = write ? POLLOUT : POLLIN; + poll(&pfd, 1, -1); +} + +/* --------------------------------------------------------------------------- */ + +void aw_ubus_poll_data (struct ubus_sync_context *ctx, int timeout) +{ + // printf ("==> aw_ubus_poll_data\n"); + struct pollfd pfd = { + .fd = ctx->fd, + .events = POLLIN | POLLERR, + }; + + poll(&pfd, 1, timeout); + aw_ubus_handle_data (ctx, ULOOP_READ); +} + +/* --------------------------------------------------------------------------- */ + +void aw_ubus_handle_data (struct ubus_sync_context *ctx, unsigned int events) +{ + // printf ("==> aw_ubus_handle_data\n"); + // struct ubus_context *ctx = container_of(u, struct ubus_context, sock); + int recv_fd = -1; + + while (aw_get_next_msg(ctx, &recv_fd)) { + ubus_process_msg(ctx, &ctx->msgbuf, recv_fd); + //if (uloop_cancelled) + // break; + } + + //if (u->eof) + // ctx->connection_lost(ctx); +} + +/* --------------------------------------------------------------------------- */ + +static bool aw_alloc_msg_buf (struct ubus_sync_context *ctx, int len) +{ + void *ptr; + int buf_len = ctx->msgbuf_data_len; + int rem; + // printf ("alloc_msg_buf len: %d\n", len); + // printf (" - buf_len: %d\n", buf_len); + + if (!ctx->msgbuf.data) + buf_len = 0; + + rem = (len % UBUS_MSG_CHUNK_SIZE); + // printf (" - rem: %d\n", rem); + if (rem > 0) + len += UBUS_MSG_CHUNK_SIZE - rem; + // printf (" - len: %d\n", len); + /* + if (len < buf_len && + ++ctx->msgbuf_reduction_counter > UBUS_MSGBUF_REDUCTION_INTERVAL) { + ctx->msgbuf_reduction_counter = 0; + buf_len = 0; + } + */ + if (len <= buf_len) + return true; + // printf (" - realloc len: %d\n", len); + ptr = realloc(ctx->msgbuf.data, len); + if (!ptr) + return false; + + ctx->msgbuf.data = ptr; + return true; +} + +/* --------------------------------------------------------------------------- */ + +static bool aw_get_next_msg (struct ubus_sync_context *ctx, int *recv_fd) +{ + struct { + struct ubus_msghdr hdr; + struct blob_attr data; + } hdrbuf; + struct iovec iov = STATIC_IOV(hdrbuf); + int len; + int r; + // printf ("==> get_next_msg\n"); + + /* receive header + start attribute */ + r = recv_retry(ctx->fd, &iov, false, recv_fd); + if (r <= 0) { + if (r < 0) + ctx->eof = true; + + return false; + } + + hdrbuf.hdr.seq = be16_to_cpu(hdrbuf.hdr.seq); + hdrbuf.hdr.peer = be32_to_cpu(hdrbuf.hdr.peer); + + if (!aw_ubus_validate_hdr(&hdrbuf.hdr)) + return false; + + len = blob_raw_len(&hdrbuf.data); + if (!aw_alloc_msg_buf(ctx, len)) + return false; + + memcpy(&ctx->msgbuf.hdr, &hdrbuf.hdr, sizeof(hdrbuf.hdr)); + memcpy(ctx->msgbuf.data, &hdrbuf.data, sizeof(hdrbuf.data)); + + iov.iov_base = (char *)ctx->msgbuf.data + sizeof(hdrbuf.data); + iov.iov_len = blob_len(ctx->msgbuf.data); + if (iov.iov_len > 0 && + recv_retry(ctx->fd, &iov, true, NULL) <= 0) + return false; + + return true; +} + + +/* --------------------------------------------------------------------------- */ + +static int aw_writev_retry (int fd, struct iovec *iov, int iov_len, int sock_fd) +{ + // printf ("==>aw_writev_retry\n"); + static struct { + struct cmsghdr h; + int fd; + } fd_buf = { + .h = { + .cmsg_len = sizeof(fd_buf), + .cmsg_level = SOL_SOCKET, + .cmsg_type = SCM_RIGHTS, + } + }; + struct msghdr msghdr = { + .msg_iov = iov, + .msg_iovlen = iov_len, + .msg_control = &fd_buf, + .msg_controllen = sizeof(fd_buf), + }; + int len = 0; + + do { + int cur_len; + + if (sock_fd < 0) { + msghdr.msg_control = NULL; + msghdr.msg_controllen = 0; + } else { + fd_buf.fd = sock_fd; + } + + cur_len = sendmsg(fd, &msghdr, 0); + if (cur_len < 0) { + switch(errno) { + case EAGAIN: + aw_wait_data(fd, true); + break; + case EINTR: + break; + default: + return -1; + } + continue; + } + + if (len > 0) + sock_fd = -1; + + len += cur_len; + while (cur_len >= iov->iov_len) { + cur_len -= iov->iov_len; + iov_len--; + iov++; + if (!iov_len) + return len; + } + iov->iov_base += cur_len; + iov->iov_len -= cur_len; + msghdr.msg_iov = iov; + msghdr.msg_iovlen = iov_len; + } while (1); + + /* Should never reach here */ + return -1; +} + +/* --------------------------------------------------------------------------- */ + +static int recv_retry(int fd, struct iovec *iov, bool wait, int *recv_fd) +{ + int bytes, total = 0; + static struct { + struct cmsghdr h; + int fd; + } fd_buf = { + .h = { + .cmsg_type = SCM_RIGHTS, + .cmsg_level = SOL_SOCKET, + .cmsg_len = sizeof(fd_buf), + }, + }; + struct msghdr msghdr = { + .msg_iov = iov, + .msg_iovlen = 1, + }; + + while (iov->iov_len > 0) { + if (wait) + aw_wait_data(fd, false); + + if (recv_fd) { + msghdr.msg_control = &fd_buf; + msghdr.msg_controllen = sizeof(fd_buf); + } else { + msghdr.msg_control = NULL; + msghdr.msg_controllen = 0; + } + + fd_buf.fd = -1; + bytes = recvmsg(fd, &msghdr, 0); + if (!bytes) + return -1; + + if (bytes < 0) { + bytes = 0; + //if (uloop_cancelled) + // return 0; + if (errno == EINTR) + continue; + + if (errno != EAGAIN) + return -1; + } + if (!wait && !bytes) + return 0; + + if (recv_fd) + *recv_fd = fd_buf.fd; + + recv_fd = NULL; + + wait = true; + iov->iov_len -= bytes; + iov->iov_base += bytes; + total += bytes; + } + + return total; +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_send_msg (struct ubus_sync_context *ctx, uint32_t seq, + struct blob_attr *msg, int cmd, uint32_t peer, int fd) +{ + struct ubus_msghdr hdr; + struct iovec iov[2] = { + STATIC_IOV(hdr) + }; + int ret; + // printf ("==>aw_ubus_send_msg\n"); + hdr.version = 0; + hdr.type = cmd; + hdr.seq = cpu_to_be16(seq); + hdr.peer = cpu_to_be32(peer); + + if (!msg) { + blob_buf_init(&b, 0); + msg = b.head; + } + + iov[1].iov_base = (char *) msg; + iov[1].iov_len = blob_raw_len(msg); + + ret = aw_writev_retry(ctx->fd, iov, ARRAY_SIZE(iov), fd); + if (ret < 0) + ctx->eof = true; + + if (fd >= 0) + close(fd); + + return ret; +} + +/* --------------------------------------------------------------------------- */ + +void aw_ubus_complete_request_async (struct ubus_sync_context *ctx, struct ubus_request *req) +{ + // printf ("==> aw_ubus_complete_request_async...\n"); + if (!list_empty(&req->list)) + return; + + list_add(&req->list, &ctx->requests); +} + +/* --------------------------------------------------------------------------- */ + +static void aw_ubus_req_complete_cb (struct ubus_request *req) +{ + // printf ("==> aw_ubus_req_complete_cb\n"); + ubus_complete_handler_t cb = req->complete_cb; + + if (!cb) + return; + + req->complete_cb = NULL; + cb(req, req->status_code); +} + +/* --------------------------------------------------------------------------- */ + +static void aw_ubus_set_req_status (struct ubus_request *req, int ret) +{ + // printf ("==> aw_ubus_set_req_status\n"); + //if (!list_empty(&req->list)) + // list_del_init(&req->list); + + req->status_msg = true; + req->status_code = ret; + //if (!req->blocked) + aw_ubus_req_complete_cb(req); +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_start_request (struct ubus_sync_context *ctx, struct ubus_request *req, + struct blob_attr *msg, int cmd, uint32_t peer) +{ + memset(req, 0, sizeof(*req)); + // printf ("==> aw_ubus_start_request blob_pad_len(msg) %d\n", blob_pad_len(msg)); + + if (msg && blob_pad_len(msg) > UBUS_MAX_MSGLEN) + return -1; + + INIT_LIST_HEAD(&req->list); + INIT_LIST_HEAD(&req->pending); + //req->ctx = ctx; + req->peer = peer; + req->seq = ++ctx->request_seq; + // printf ("ubus_start_request -> req->seq:%d cmd: %d peer: %d\n", req->seq, cmd, peer); + return aw_ubus_send_msg(ctx, req->seq, msg, cmd, peer, -1); +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_complete_request (struct ubus_sync_context *ctx, struct ubus_request *req, + int req_timeout) +{ + ubus_complete_handler_t complete_cb = req->complete_cb; + //bool registered = ctx->sock.registered; + int status = UBUS_STATUS_NO_DATA; + int64_t timeout = -1, time_end = 0; + // printf ("==> aw_ubus_complete_request %p\n", req); +/* + if (!registered) { + uloop_init(); + ubus_add_uloop(ctx); + } +*/ + if (req_timeout) + time_end = aw_get_time_msec() + req_timeout; + + aw_ubus_complete_request_async(ctx, req); + req->complete_cb = aw_ubus_sync_req_cb; + + // ctx->stack_depth++; + // printf ("wait response ....\n"); + while (!req->status_msg) { + bool cancelled = uloop_cancelled; + + uloop_cancelled = false; + if (req_timeout) { + timeout = time_end - aw_get_time_msec(); + if (timeout <= 0) { + aw_ubus_set_req_status(req, UBUS_STATUS_TIMEOUT); + uloop_cancelled = cancelled; + break; + } + } + aw_ubus_poll_data(ctx, (unsigned int) timeout); + + uloop_cancelled = cancelled; + } +/* + ctx->stack_depth--; + if (ctx->stack_depth) + uloop_cancelled = true; +*/ + + if (req->status_msg) + status = req->status_code; + + req->complete_cb = complete_cb; + // printf ("call complete (%p)...\n", req->complete_cb); + if (req->complete_cb) + req->complete_cb(req, status); + +/* + if (!registered) { + uloop_fd_delete(&ctx->sock); + + if (ctx->stack_depth) + ctx->pending_timer.cb(&ctx->pending_timer); + } +*/ + return status; +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_invoke_async (struct ubus_sync_context *ctx, uint32_t obj, const char *method, + struct blob_attr *msg, struct ubus_request *req) +{ + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, obj); + blob_put_string(&b, UBUS_ATTR_METHOD, method); + // printf ("==> aw_ubus_invoke_async\n"); + if (msg) + blob_put(&b, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); + + if (aw_ubus_start_request(ctx, req, b.head, UBUS_MSG_INVOKE, obj) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return 0; +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_lookup_id (struct ubus_sync_context *a_ctx, const char *a_path, uint32_t *an_id) +{ + struct ubus_request req; + int the_ret; + // printf ("==> aw_ubus_lookup_id\n"); + blob_buf_init(&b, 0); + if (a_path) + blob_put_string(&b, UBUS_ATTR_OBJPATH, a_path); + + // printf ("================================================================== aw_ubus_lookup_id 1 \n"); + if (aw_ubus_start_request(a_ctx, &req, b.head, UBUS_MSG_LOOKUP, 0) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + req.raw_data_cb = ubus_lookup_id_cb; + req.priv = an_id; + + // printf ("================================================================== aw_ubus_lookup_id 2 req: %p\n", &req); + + the_ret = aw_ubus_complete_request(a_ctx, &req, 5000); + // Awox Remomve infinite timeout: return ubus_complete_request(ctx, &req, 0); + // printf ("================================================================== aw_ubus_lookup_id 3 ret: %d\n", the_ret); + return the_ret; +} + +/* --------------------------------------------------------------------------- */ + +// ubus_data_handler_t cb, void *priv, +int aw_ubus_invoke (struct ubus_sync_context *ctx, uint32_t obj, const char *method, + struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout) +{ + struct ubus_request req; + int rc; + // printf ("==> aw_ubus_invoke\n"); + rc = aw_ubus_invoke_async(ctx, obj, method, msg, &req); + if (rc) + return rc; + + req.data_cb = cb; + req.priv = priv; + + rc = aw_ubus_complete_request(ctx, &req, timeout); + // printf ("<== aw_ubus_invoke (%d)\n", rc); + return rc; +} + +/* --------------------------------------------------------------------------- */ + +int aw_ubus_send_event (struct ubus_sync_context *ctx, const char *id, struct blob_attr *data) +{ + struct ubus_request req; + void *s; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, UBUS_SYSTEM_OBJECT_EVENT); + blob_put_string(&b, UBUS_ATTR_METHOD, "send"); + s = blob_nest_start(&b, UBUS_ATTR_DATA); + blobmsg_add_string(&b, "id", id); + blobmsg_add_field(&b, BLOBMSG_TYPE_TABLE, "data", blob_data(data), blob_len(data)); + blob_nest_end(&b, s); + + if (aw_ubus_start_request(ctx, &req, b.head, UBUS_MSG_INVOKE, UBUS_SYSTEM_OBJECT_EVENT) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return aw_ubus_complete_request(ctx, &req, 0); +} + +/* --------------------------------------------------------------------------- */ + +int ubus_sync_exec (const char *a_path, const char *a_method, const char *a_parameter, ubus_data_handler_t cb, void *priv) +{ + uint32_t the_id; + int the_ret; + + struct blob_buf the_buf = { 0 }; + struct ubus_sync_context *the_ctx; + + blob_buf_init(&the_buf, 0); + // printf ("==> ubus_exec: path:%s method:%s, parameter:%s\n", a_path, a_method, a_parameter); + if (strlen(a_parameter) != 0) { + if (!blobmsg_add_json_from_string (&the_buf, a_parameter)) { + + fprintf (stderr, "%s - Failed to parse message data\n", __PRETTY_FUNCTION__); + return -1; + } + } + + the_ctx = aw_new_ubus_connect(); + + the_ret = aw_ubus_lookup_id (the_ctx, a_path, &the_id); + if (the_ret) { + + fprintf (stderr, "%s - ubus_lookup_id error: '%d' [path: %s method: %s]\n", __PRETTY_FUNCTION__, the_ret, a_path, a_method); + aw_ubus_free(the_ctx); + blob_buf_free (&the_buf); + return the_ret; + } + // printf ("====> ID ici: %d\n", the_id); + + the_ret = aw_ubus_invoke (the_ctx, the_id, a_method, the_buf.head, cb, priv, 5 * 1000); + + aw_ubus_free(the_ctx); + blob_buf_free (&the_buf); + // printf ("<== ubus_exec:\n"); + return the_ret; +} + +/* --------------------------------------------------------------------------- */ + +int ubus_sync_send_event (const char *an_event, const char *a_data) +{ + int the_ret = 0; + struct blob_buf the_buf = { 0 }; + struct ubus_sync_context *the_ctx; + + // printf("%s => (%s>\n", __PRETTY_FUNCTION__, a_data); + + the_ctx = aw_new_ubus_connect(); + + if (the_ctx == NULL) { + + fprintf (stderr ,"%s - Failed to create an ubus context.\n", __PRETTY_FUNCTION__); + return -1; + } + + blob_buf_init(&the_buf, 0); + + blobmsg_add_json_from_string(&the_buf, a_data); + + aw_ubus_send_event(the_ctx, an_event, the_buf.head); + + blob_buf_free(&the_buf); + + aw_ubus_free(the_ctx); + + return the_ret; +} diff --git a/src/lib/libubus-cpp/src/ubus-sync.h b/src/lib/libubus-cpp/src/ubus-sync.h new file mode 100644 index 00000000..84a48ee9 --- /dev/null +++ b/src/lib/libubus-cpp/src/ubus-sync.h @@ -0,0 +1,25 @@ +/*! + * (C) Copyright 2003-2018 Awox SA. All rights reserved. + * This work contains confidential trade secrets of Awox. + * Use, examination, copying, transfer and disclosure to others + * are prohibited, except with the express written agreement of Awox. + * + * @Author: Awox + * @Date: 09/02/2018 + */ + +/*------------------------------- INCLUDES ----------------------------------*/ + +#ifndef _AW_UBUS_SYNC_H +#define _AW_UBUS_SYNC_H + +#include + +#include // temporary for ubus_data_handler_t + +double time_diff (struct timeval x , struct timeval y); + +int ubus_sync_exec (const char *a_path, const char* a_method, const char *a_parameter, ubus_data_handler_t cb, void *priv); +int ubus_sync_send_event (const char *an_event, const char *a_data); + +#endif /* _AW_UBUS_SYNC_H */