mqueue.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*
  2. * Copyright (c) 2018 Intel Corporation
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. #include <kernel.h>
  7. #include <errno.h>
  8. #include <string.h>
  9. #include <sys/atomic.h>
  10. #include <posix/time.h>
  11. #include <posix/mqueue.h>
  12. typedef struct mqueue_object {
  13. sys_snode_t snode;
  14. char *mem_buffer;
  15. char *mem_obj;
  16. struct k_msgq queue;
  17. atomic_t ref_count;
  18. char *name;
  19. } mqueue_object;
  20. typedef struct mqueue_desc {
  21. char *mem_desc;
  22. mqueue_object *mqueue;
  23. uint32_t flags;
  24. } mqueue_desc;
  25. K_SEM_DEFINE(mq_sem, 1, 1);
  26. /* Initialize the list */
  27. sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
  28. int64_t timespec_to_timeoutms(const struct timespec *abstime);
  29. static mqueue_object *find_in_list(const char *name);
  30. static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
  31. k_timeout_t timeout);
  32. static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
  33. k_timeout_t timeout);
  34. static void remove_mq(mqueue_object *msg_queue);
  35. #if defined(__sparc__)
  36. /*
  37. * mode_t is defined as "unsigned short" on SPARC newlib. This type is promoted
  38. * to "int" when passed through '...' so we should pass the promoted type to
  39. * va_arg().
  40. */
  41. #define PROMOTED_MODE_T int
  42. #else
  43. #define PROMOTED_MODE_T mode_t
  44. #endif
  45. /**
  46. * @brief Open a message queue.
  47. *
  48. * Number of message queue and descriptor to message queue are limited by
  49. * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
  50. *
  51. * See IEEE 1003.1
  52. */
  53. mqd_t mq_open(const char *name, int oflags, ...)
  54. {
  55. va_list va;
  56. mode_t mode;
  57. mq_attr *attrs = NULL;
  58. long msg_size = 0U, max_msgs = 0U;
  59. mqueue_object *msg_queue;
  60. mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
  61. char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
  62. va_start(va, oflags);
  63. if ((oflags & O_CREAT) != 0) {
  64. mode = va_arg(va, PROMOTED_MODE_T);
  65. attrs = va_arg(va, mq_attr*);
  66. }
  67. va_end(va);
  68. if (attrs != NULL) {
  69. msg_size = attrs->mq_msgsize;
  70. max_msgs = attrs->mq_maxmsg;
  71. }
  72. if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
  73. max_msgs <= 0))) {
  74. errno = EINVAL;
  75. return (mqd_t)mqd;
  76. }
  77. if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
  78. errno = ENAMETOOLONG;
  79. return (mqd_t)mqd;
  80. }
  81. /* Check if queue already exists */
  82. k_sem_take(&mq_sem, K_FOREVER);
  83. msg_queue = find_in_list(name);
  84. k_sem_give(&mq_sem);
  85. if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
  86. (oflags & O_EXCL) != 0) {
  87. /* Message queue has alreadey been opened and O_EXCL is set */
  88. errno = EEXIST;
  89. return (mqd_t)mqd;
  90. }
  91. if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
  92. errno = ENOENT;
  93. return (mqd_t)mqd;
  94. }
  95. mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
  96. if (mq_desc_ptr != NULL) {
  97. (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
  98. msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
  99. msg_queue_desc->mem_desc = mq_desc_ptr;
  100. } else {
  101. goto free_mq_desc;
  102. }
  103. /* Allocate mqueue object for new message queue */
  104. if (msg_queue == NULL) {
  105. /* Check for message quantity and size in message queue */
  106. if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
  107. attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) {
  108. goto free_mq_desc;
  109. }
  110. mq_obj_ptr = k_malloc(sizeof(mqueue_object));
  111. if (mq_obj_ptr != NULL) {
  112. (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
  113. msg_queue = (mqueue_object *)mq_obj_ptr;
  114. msg_queue->mem_obj = mq_obj_ptr;
  115. } else {
  116. goto free_mq_object;
  117. }
  118. mq_name_ptr = k_malloc(strlen(name) + 1);
  119. if (mq_name_ptr != NULL) {
  120. (void)memset(mq_name_ptr, 0, strlen(name) + 1);
  121. msg_queue->name = mq_name_ptr;
  122. } else {
  123. goto free_mq_name;
  124. }
  125. strcpy(msg_queue->name, name);
  126. mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
  127. if (mq_buf_ptr != NULL) {
  128. (void)memset(mq_buf_ptr, 0,
  129. msg_size * max_msgs * sizeof(uint8_t));
  130. msg_queue->mem_buffer = mq_buf_ptr;
  131. } else {
  132. goto free_mq_buffer;
  133. }
  134. (void)atomic_set(&msg_queue->ref_count, 1);
  135. /* initialize zephyr message queue */
  136. k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
  137. max_msgs);
  138. k_sem_take(&mq_sem, K_FOREVER);
  139. sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
  140. k_sem_give(&mq_sem);
  141. } else {
  142. atomic_inc(&msg_queue->ref_count);
  143. }
  144. msg_queue_desc->mqueue = msg_queue;
  145. msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
  146. return (mqd_t)msg_queue_desc;
  147. free_mq_buffer:
  148. k_free(mq_name_ptr);
  149. free_mq_name:
  150. k_free(mq_obj_ptr);
  151. free_mq_object:
  152. k_free(mq_desc_ptr);
  153. free_mq_desc:
  154. errno = ENOSPC;
  155. return (mqd_t)mqd;
  156. }
  157. /**
  158. * @brief Close a message queue descriptor.
  159. *
  160. * See IEEE 1003.1
  161. */
  162. int mq_close(mqd_t mqdes)
  163. {
  164. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  165. if (mqd == NULL) {
  166. errno = EBADF;
  167. return -1;
  168. }
  169. atomic_dec(&mqd->mqueue->ref_count);
  170. /* remove mq if marked for unlink */
  171. if (mqd->mqueue->name == NULL) {
  172. remove_mq(mqd->mqueue);
  173. }
  174. k_free(mqd->mem_desc);
  175. return 0;
  176. }
  177. /**
  178. * @brief Remove a message queue.
  179. *
  180. * See IEEE 1003.1
  181. */
  182. int mq_unlink(const char *name)
  183. {
  184. mqueue_object *msg_queue;
  185. k_sem_take(&mq_sem, K_FOREVER);
  186. msg_queue = find_in_list(name);
  187. if (msg_queue == NULL) {
  188. k_sem_give(&mq_sem);
  189. errno = EBADF;
  190. return -1;
  191. }
  192. k_free(msg_queue->name);
  193. msg_queue->name = NULL;
  194. k_sem_give(&mq_sem);
  195. remove_mq(msg_queue);
  196. return 0;
  197. }
  198. /**
  199. * @brief Send a message to a message queue.
  200. *
  201. * All messages in message queue are of equal priority.
  202. *
  203. * See IEEE 1003.1
  204. */
  205. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
  206. unsigned int msg_prio)
  207. {
  208. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  209. return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
  210. }
  211. /**
  212. * @brief Send message to a message queue within abstime time.
  213. *
  214. * All messages in message queue are of equal priority.
  215. *
  216. * See IEEE 1003.1
  217. */
  218. int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
  219. unsigned int msg_prio, const struct timespec *abstime)
  220. {
  221. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  222. int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
  223. return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
  224. }
  225. /**
  226. * @brief Receive a message from a message queue.
  227. *
  228. * All messages in message queue are of equal priority.
  229. *
  230. * See IEEE 1003.1
  231. */
  232. int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  233. unsigned int *msg_prio)
  234. {
  235. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  236. return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
  237. }
  238. /**
  239. * @brief Receive message from a message queue within abstime time.
  240. *
  241. * All messages in message queue are of equal priority.
  242. *
  243. * See IEEE 1003.1
  244. */
  245. int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  246. unsigned int *msg_prio, const struct timespec *abstime)
  247. {
  248. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  249. int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
  250. return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
  251. }
  252. /**
  253. * @brief Get message queue attributes.
  254. *
  255. * See IEEE 1003.1
  256. */
  257. int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
  258. {
  259. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  260. struct k_msgq_attrs attrs;
  261. if (mqd == NULL) {
  262. errno = EBADF;
  263. return -1;
  264. }
  265. k_sem_take(&mq_sem, K_FOREVER);
  266. k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
  267. mqstat->mq_flags = mqd->flags;
  268. mqstat->mq_maxmsg = attrs.max_msgs;
  269. mqstat->mq_msgsize = attrs.msg_size;
  270. mqstat->mq_curmsgs = attrs.used_msgs;
  271. k_sem_give(&mq_sem);
  272. return 0;
  273. }
  274. /**
  275. * @brief Set message queue attributes.
  276. *
  277. * See IEEE 1003.1
  278. */
  279. int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
  280. struct mq_attr *omqstat)
  281. {
  282. mqueue_desc *mqd = (mqueue_desc *)mqdes;
  283. if (mqd == NULL) {
  284. errno = EBADF;
  285. return -1;
  286. }
  287. if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
  288. errno = EINVAL;
  289. return -1;
  290. }
  291. if (omqstat != NULL) {
  292. mq_getattr(mqdes, omqstat);
  293. }
  294. k_sem_take(&mq_sem, K_FOREVER);
  295. mqd->flags = mqstat->mq_flags;
  296. k_sem_give(&mq_sem);
  297. return 0;
  298. }
  299. /* Internal functions */
  300. static mqueue_object *find_in_list(const char *name)
  301. {
  302. sys_snode_t *mq;
  303. mqueue_object *msg_queue;
  304. mq = mq_list.head;
  305. while (mq != NULL) {
  306. msg_queue = (mqueue_object *)mq;
  307. if (strcmp(msg_queue->name, name) == 0) {
  308. return msg_queue;
  309. }
  310. mq = mq->next;
  311. }
  312. return NULL;
  313. }
  314. static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
  315. k_timeout_t timeout)
  316. {
  317. int32_t ret = -1;
  318. if (mqd == NULL) {
  319. errno = EBADF;
  320. return ret;
  321. }
  322. if ((mqd->flags & O_NONBLOCK) != 0U) {
  323. timeout = K_NO_WAIT;
  324. }
  325. if (msg_len > mqd->mqueue->queue.msg_size) {
  326. errno = EMSGSIZE;
  327. return ret;
  328. }
  329. if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
  330. errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
  331. return ret;
  332. }
  333. return 0;
  334. }
  335. static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
  336. k_timeout_t timeout)
  337. {
  338. int ret = -1;
  339. if (mqd == NULL) {
  340. errno = EBADF;
  341. return ret;
  342. }
  343. if (msg_len < mqd->mqueue->queue.msg_size) {
  344. errno = EMSGSIZE;
  345. return ret;
  346. }
  347. if ((mqd->flags & O_NONBLOCK) != 0U) {
  348. timeout = K_NO_WAIT;
  349. }
  350. if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
  351. errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
  352. } else {
  353. ret = mqd->mqueue->queue.msg_size;
  354. }
  355. return ret;
  356. }
  357. static void remove_mq(mqueue_object *msg_queue)
  358. {
  359. if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
  360. k_sem_take(&mq_sem, K_FOREVER);
  361. sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
  362. k_sem_give(&mq_sem);
  363. /* Free mq buffer and pbject */
  364. k_free(msg_queue->mem_buffer);
  365. k_free(msg_queue->mem_obj);
  366. }
  367. }