123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- /*
- * Copyright (c) 2018 Intel Corporation
- *
- * SPDX-License-Identifier: Apache-2.0
- */
- #include <kernel.h>
- #include <errno.h>
- #include <string.h>
- #include <sys/atomic.h>
- #include <posix/time.h>
- #include <posix/mqueue.h>
- typedef struct mqueue_object {
- sys_snode_t snode;
- char *mem_buffer;
- char *mem_obj;
- struct k_msgq queue;
- atomic_t ref_count;
- char *name;
- } mqueue_object;
- typedef struct mqueue_desc {
- char *mem_desc;
- mqueue_object *mqueue;
- uint32_t flags;
- } mqueue_desc;
- K_SEM_DEFINE(mq_sem, 1, 1);
- /* Initialize the list */
- sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
- int64_t timespec_to_timeoutms(const struct timespec *abstime);
- static mqueue_object *find_in_list(const char *name);
- static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
- k_timeout_t timeout);
- static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
- k_timeout_t timeout);
- static void remove_mq(mqueue_object *msg_queue);
- #if defined(__sparc__)
- /*
- * mode_t is defined as "unsigned short" on SPARC newlib. This type is promoted
- * to "int" when passed through '...' so we should pass the promoted type to
- * va_arg().
- */
- #define PROMOTED_MODE_T int
- #else
- #define PROMOTED_MODE_T mode_t
- #endif
- /**
- * @brief Open a message queue.
- *
- * Number of message queue and descriptor to message queue are limited by
- * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
- *
- * See IEEE 1003.1
- */
- mqd_t mq_open(const char *name, int oflags, ...)
- {
- va_list va;
- mode_t mode;
- mq_attr *attrs = NULL;
- long msg_size = 0U, max_msgs = 0U;
- mqueue_object *msg_queue;
- mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
- char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
- va_start(va, oflags);
- if ((oflags & O_CREAT) != 0) {
- mode = va_arg(va, PROMOTED_MODE_T);
- attrs = va_arg(va, mq_attr*);
- }
- va_end(va);
- if (attrs != NULL) {
- msg_size = attrs->mq_msgsize;
- max_msgs = attrs->mq_maxmsg;
- }
- if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
- max_msgs <= 0))) {
- errno = EINVAL;
- return (mqd_t)mqd;
- }
- if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
- errno = ENAMETOOLONG;
- return (mqd_t)mqd;
- }
- /* Check if queue already exists */
- k_sem_take(&mq_sem, K_FOREVER);
- msg_queue = find_in_list(name);
- k_sem_give(&mq_sem);
- if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
- (oflags & O_EXCL) != 0) {
- /* Message queue has alreadey been opened and O_EXCL is set */
- errno = EEXIST;
- return (mqd_t)mqd;
- }
- if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
- errno = ENOENT;
- return (mqd_t)mqd;
- }
- mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
- if (mq_desc_ptr != NULL) {
- (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
- msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
- msg_queue_desc->mem_desc = mq_desc_ptr;
- } else {
- goto free_mq_desc;
- }
- /* Allocate mqueue object for new message queue */
- if (msg_queue == NULL) {
- /* Check for message quantity and size in message queue */
- if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
- attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) {
- goto free_mq_desc;
- }
- mq_obj_ptr = k_malloc(sizeof(mqueue_object));
- if (mq_obj_ptr != NULL) {
- (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
- msg_queue = (mqueue_object *)mq_obj_ptr;
- msg_queue->mem_obj = mq_obj_ptr;
- } else {
- goto free_mq_object;
- }
- mq_name_ptr = k_malloc(strlen(name) + 1);
- if (mq_name_ptr != NULL) {
- (void)memset(mq_name_ptr, 0, strlen(name) + 1);
- msg_queue->name = mq_name_ptr;
- } else {
- goto free_mq_name;
- }
- strcpy(msg_queue->name, name);
- mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
- if (mq_buf_ptr != NULL) {
- (void)memset(mq_buf_ptr, 0,
- msg_size * max_msgs * sizeof(uint8_t));
- msg_queue->mem_buffer = mq_buf_ptr;
- } else {
- goto free_mq_buffer;
- }
- (void)atomic_set(&msg_queue->ref_count, 1);
- /* initialize zephyr message queue */
- k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
- max_msgs);
- k_sem_take(&mq_sem, K_FOREVER);
- sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
- k_sem_give(&mq_sem);
- } else {
- atomic_inc(&msg_queue->ref_count);
- }
- msg_queue_desc->mqueue = msg_queue;
- msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
- return (mqd_t)msg_queue_desc;
- free_mq_buffer:
- k_free(mq_name_ptr);
- free_mq_name:
- k_free(mq_obj_ptr);
- free_mq_object:
- k_free(mq_desc_ptr);
- free_mq_desc:
- errno = ENOSPC;
- return (mqd_t)mqd;
- }
- /**
- * @brief Close a message queue descriptor.
- *
- * See IEEE 1003.1
- */
- int mq_close(mqd_t mqdes)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- if (mqd == NULL) {
- errno = EBADF;
- return -1;
- }
- atomic_dec(&mqd->mqueue->ref_count);
- /* remove mq if marked for unlink */
- if (mqd->mqueue->name == NULL) {
- remove_mq(mqd->mqueue);
- }
- k_free(mqd->mem_desc);
- return 0;
- }
- /**
- * @brief Remove a message queue.
- *
- * See IEEE 1003.1
- */
- int mq_unlink(const char *name)
- {
- mqueue_object *msg_queue;
- k_sem_take(&mq_sem, K_FOREVER);
- msg_queue = find_in_list(name);
- if (msg_queue == NULL) {
- k_sem_give(&mq_sem);
- errno = EBADF;
- return -1;
- }
- k_free(msg_queue->name);
- msg_queue->name = NULL;
- k_sem_give(&mq_sem);
- remove_mq(msg_queue);
- return 0;
- }
- /**
- * @brief Send a message to a message queue.
- *
- * All messages in message queue are of equal priority.
- *
- * See IEEE 1003.1
- */
- int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
- unsigned int msg_prio)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
- }
- /**
- * @brief Send message to a message queue within abstime time.
- *
- * All messages in message queue are of equal priority.
- *
- * See IEEE 1003.1
- */
- int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
- unsigned int msg_prio, const struct timespec *abstime)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
- return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
- }
- /**
- * @brief Receive a message from a message queue.
- *
- * All messages in message queue are of equal priority.
- *
- * See IEEE 1003.1
- */
- int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
- unsigned int *msg_prio)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
- }
- /**
- * @brief Receive message from a message queue within abstime time.
- *
- * All messages in message queue are of equal priority.
- *
- * See IEEE 1003.1
- */
- int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
- unsigned int *msg_prio, const struct timespec *abstime)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
- return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
- }
- /**
- * @brief Get message queue attributes.
- *
- * See IEEE 1003.1
- */
- int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- struct k_msgq_attrs attrs;
- if (mqd == NULL) {
- errno = EBADF;
- return -1;
- }
- k_sem_take(&mq_sem, K_FOREVER);
- k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
- mqstat->mq_flags = mqd->flags;
- mqstat->mq_maxmsg = attrs.max_msgs;
- mqstat->mq_msgsize = attrs.msg_size;
- mqstat->mq_curmsgs = attrs.used_msgs;
- k_sem_give(&mq_sem);
- return 0;
- }
- /**
- * @brief Set message queue attributes.
- *
- * See IEEE 1003.1
- */
- int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
- struct mq_attr *omqstat)
- {
- mqueue_desc *mqd = (mqueue_desc *)mqdes;
- if (mqd == NULL) {
- errno = EBADF;
- return -1;
- }
- if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
- errno = EINVAL;
- return -1;
- }
- if (omqstat != NULL) {
- mq_getattr(mqdes, omqstat);
- }
- k_sem_take(&mq_sem, K_FOREVER);
- mqd->flags = mqstat->mq_flags;
- k_sem_give(&mq_sem);
- return 0;
- }
- /* Internal functions */
- static mqueue_object *find_in_list(const char *name)
- {
- sys_snode_t *mq;
- mqueue_object *msg_queue;
- mq = mq_list.head;
- while (mq != NULL) {
- msg_queue = (mqueue_object *)mq;
- if (strcmp(msg_queue->name, name) == 0) {
- return msg_queue;
- }
- mq = mq->next;
- }
- return NULL;
- }
- static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
- k_timeout_t timeout)
- {
- int32_t ret = -1;
- if (mqd == NULL) {
- errno = EBADF;
- return ret;
- }
- if ((mqd->flags & O_NONBLOCK) != 0U) {
- timeout = K_NO_WAIT;
- }
- if (msg_len > mqd->mqueue->queue.msg_size) {
- errno = EMSGSIZE;
- return ret;
- }
- if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
- errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
- return ret;
- }
- return 0;
- }
- static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
- k_timeout_t timeout)
- {
- int ret = -1;
- if (mqd == NULL) {
- errno = EBADF;
- return ret;
- }
- if (msg_len < mqd->mqueue->queue.msg_size) {
- errno = EMSGSIZE;
- return ret;
- }
- if ((mqd->flags & O_NONBLOCK) != 0U) {
- timeout = K_NO_WAIT;
- }
- if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
- errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
- } else {
- ret = mqd->mqueue->queue.msg_size;
- }
- return ret;
- }
- static void remove_mq(mqueue_object *msg_queue)
- {
- if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
- k_sem_take(&mq_sem, K_FOREVER);
- sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
- k_sem_give(&mq_sem);
- /* Free mq buffer and pbject */
- k_free(msg_queue->mem_buffer);
- k_free(msg_queue->mem_obj);
- }
- }
|