msg_q.c 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. /*
  2. * Copyright (c) 2016 Wind River Systems, Inc.
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. /**
  7. * @file
  8. * @brief Message queues.
  9. */
  10. #include <kernel.h>
  11. #include <kernel_structs.h>
  12. #include <toolchain.h>
  13. #include <linker/sections.h>
  14. #include <string.h>
  15. #include <ksched.h>
  16. #include <wait_q.h>
  17. #include <sys/dlist.h>
  18. #include <sys/math_extras.h>
  19. #include <init.h>
  20. #include <syscall_handler.h>
  21. #include <kernel_internal.h>
  22. #include <sys/check.h>
  23. #ifdef CONFIG_POLL
  24. static inline void handle_poll_events(struct k_msgq *msgq, uint32_t state)
  25. {
  26. z_handle_obj_poll_events(&msgq->poll_events, state);
  27. }
  28. #endif /* CONFIG_POLL */
  29. void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
  30. uint32_t max_msgs)
  31. {
  32. msgq->msg_size = msg_size;
  33. msgq->max_msgs = max_msgs;
  34. msgq->buffer_start = buffer;
  35. msgq->buffer_end = buffer + (max_msgs * msg_size);
  36. msgq->read_ptr = buffer;
  37. msgq->write_ptr = buffer;
  38. msgq->used_msgs = 0;
  39. msgq->flags = 0;
  40. z_waitq_init(&msgq->wait_q);
  41. msgq->lock = (struct k_spinlock) {};
  42. #ifdef CONFIG_POLL
  43. sys_dlist_init(&msgq->poll_events);
  44. #endif /* CONFIG_POLL */
  45. SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
  46. z_object_init(msgq);
  47. }
  48. int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
  49. uint32_t max_msgs)
  50. {
  51. void *buffer;
  52. int ret;
  53. size_t total_size;
  54. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
  55. if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
  56. ret = -EINVAL;
  57. } else {
  58. buffer = z_thread_malloc(total_size);
  59. if (buffer != NULL) {
  60. k_msgq_init(msgq, buffer, msg_size, max_msgs);
  61. msgq->flags = K_MSGQ_FLAG_ALLOC;
  62. ret = 0;
  63. } else {
  64. ret = -ENOMEM;
  65. }
  66. }
  67. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
  68. return ret;
  69. }
  70. #ifdef CONFIG_USERSPACE
  71. int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
  72. uint32_t max_msgs)
  73. {
  74. Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
  75. return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
  76. }
  77. #include <syscalls/k_msgq_alloc_init_mrsh.c>
  78. #endif
  79. int k_msgq_cleanup(struct k_msgq *msgq)
  80. {
  81. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
  82. CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
  83. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
  84. return -EBUSY;
  85. }
  86. if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
  87. k_free(msgq->buffer_start);
  88. msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
  89. }
  90. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
  91. return 0;
  92. }
  93. int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
  94. {
  95. __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
  96. struct k_thread *pending_thread;
  97. k_spinlock_key_t key;
  98. int result;
  99. key = k_spin_lock(&msgq->lock);
  100. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
  101. if (msgq->used_msgs < msgq->max_msgs) {
  102. /* message queue isn't full */
  103. pending_thread = z_unpend_first_thread(&msgq->wait_q);
  104. if (pending_thread != NULL) {
  105. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
  106. /* give message to waiting thread */
  107. (void)memcpy(pending_thread->base.swap_data, data,
  108. msgq->msg_size);
  109. /* wake up waiting thread */
  110. arch_thread_return_value_set(pending_thread, 0);
  111. z_ready_thread(pending_thread);
  112. z_reschedule(&msgq->lock, key);
  113. return 0;
  114. } else {
  115. /* put message in queue */
  116. (void)memcpy(msgq->write_ptr, data, msgq->msg_size);
  117. msgq->write_ptr += msgq->msg_size;
  118. if (msgq->write_ptr == msgq->buffer_end) {
  119. msgq->write_ptr = msgq->buffer_start;
  120. }
  121. msgq->used_msgs++;
  122. #ifdef CONFIG_POLL
  123. handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
  124. #endif /* CONFIG_POLL */
  125. }
  126. result = 0;
  127. } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
  128. /* don't wait for message space to become available */
  129. result = -ENOMSG;
  130. } else {
  131. SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
  132. /* wait for put message success, failure, or timeout */
  133. _current->base.swap_data = (void *) data;
  134. result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
  135. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
  136. return result;
  137. }
  138. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
  139. k_spin_unlock(&msgq->lock, key);
  140. return result;
  141. }
  142. #ifdef CONFIG_USERSPACE
  143. static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
  144. k_timeout_t timeout)
  145. {
  146. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  147. Z_OOPS(Z_SYSCALL_MEMORY_READ(data, msgq->msg_size));
  148. return z_impl_k_msgq_put(msgq, data, timeout);
  149. }
  150. #include <syscalls/k_msgq_put_mrsh.c>
  151. #endif
  152. void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
  153. {
  154. attrs->msg_size = msgq->msg_size;
  155. attrs->max_msgs = msgq->max_msgs;
  156. attrs->used_msgs = msgq->used_msgs;
  157. }
  158. #ifdef CONFIG_USERSPACE
  159. static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
  160. struct k_msgq_attrs *attrs)
  161. {
  162. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  163. Z_OOPS(Z_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
  164. z_impl_k_msgq_get_attrs(msgq, attrs);
  165. }
  166. #include <syscalls/k_msgq_get_attrs_mrsh.c>
  167. #endif
  168. int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
  169. {
  170. __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
  171. k_spinlock_key_t key;
  172. struct k_thread *pending_thread;
  173. int result;
  174. key = k_spin_lock(&msgq->lock);
  175. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
  176. if (msgq->used_msgs > 0U) {
  177. /* take first available message from queue */
  178. (void)memcpy(data, msgq->read_ptr, msgq->msg_size);
  179. msgq->read_ptr += msgq->msg_size;
  180. if (msgq->read_ptr == msgq->buffer_end) {
  181. msgq->read_ptr = msgq->buffer_start;
  182. }
  183. msgq->used_msgs--;
  184. /* handle first thread waiting to write (if any) */
  185. pending_thread = z_unpend_first_thread(&msgq->wait_q);
  186. if (pending_thread != NULL) {
  187. SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
  188. /* add thread's message to queue */
  189. (void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
  190. msgq->msg_size);
  191. msgq->write_ptr += msgq->msg_size;
  192. if (msgq->write_ptr == msgq->buffer_end) {
  193. msgq->write_ptr = msgq->buffer_start;
  194. }
  195. msgq->used_msgs++;
  196. /* wake up waiting thread */
  197. arch_thread_return_value_set(pending_thread, 0);
  198. z_ready_thread(pending_thread);
  199. z_reschedule(&msgq->lock, key);
  200. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, 0);
  201. return 0;
  202. }
  203. result = 0;
  204. } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
  205. /* don't wait for a message to become available */
  206. result = -ENOMSG;
  207. } else {
  208. SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
  209. /* wait for get message success or timeout */
  210. _current->base.swap_data = data;
  211. result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
  212. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
  213. return result;
  214. }
  215. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
  216. k_spin_unlock(&msgq->lock, key);
  217. return result;
  218. }
  219. #ifdef CONFIG_USERSPACE
  220. static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
  221. k_timeout_t timeout)
  222. {
  223. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  224. Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
  225. return z_impl_k_msgq_get(msgq, data, timeout);
  226. }
  227. #include <syscalls/k_msgq_get_mrsh.c>
  228. #endif
  229. int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
  230. {
  231. k_spinlock_key_t key;
  232. int result;
  233. key = k_spin_lock(&msgq->lock);
  234. if (msgq->used_msgs > 0U) {
  235. /* take first available message from queue */
  236. (void)memcpy(data, msgq->read_ptr, msgq->msg_size);
  237. result = 0;
  238. } else {
  239. /* don't wait for a message to become available */
  240. result = -ENOMSG;
  241. }
  242. SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
  243. k_spin_unlock(&msgq->lock, key);
  244. return result;
  245. }
  246. #ifdef CONFIG_USERSPACE
  247. static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
  248. {
  249. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  250. Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
  251. return z_impl_k_msgq_peek(msgq, data);
  252. }
  253. #include <syscalls/k_msgq_peek_mrsh.c>
  254. #endif
  255. void z_impl_k_msgq_purge(struct k_msgq *msgq)
  256. {
  257. k_spinlock_key_t key;
  258. struct k_thread *pending_thread;
  259. key = k_spin_lock(&msgq->lock);
  260. SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
  261. /* wake up any threads that are waiting to write */
  262. while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
  263. arch_thread_return_value_set(pending_thread, -ENOMSG);
  264. z_ready_thread(pending_thread);
  265. }
  266. msgq->used_msgs = 0;
  267. msgq->read_ptr = msgq->write_ptr;
  268. z_reschedule(&msgq->lock, key);
  269. }
  270. #ifdef CONFIG_USERSPACE
  271. static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
  272. {
  273. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  274. z_impl_k_msgq_purge(msgq);
  275. }
  276. #include <syscalls/k_msgq_purge_mrsh.c>
  277. static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
  278. {
  279. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  280. return z_impl_k_msgq_num_free_get(msgq);
  281. }
  282. #include <syscalls/k_msgq_num_free_get_mrsh.c>
  283. static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
  284. {
  285. Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
  286. return z_impl_k_msgq_num_used_get(msgq);
  287. }
  288. #include <syscalls/k_msgq_num_used_get_mrsh.c>
  289. #endif