mpsc_pbuf.c 11 KB


  1. /*
  2. * Copyright (c) 2021 Nordic Semiconductor
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. #include <sys/mpsc_pbuf.h>
  7. #define MPSC_PBUF_DEBUG 0
  8. #define MPSC_PBUF_DBG(buffer, ...) do { \
  9. if (MPSC_PBUF_DEBUG) { \
  10. printk(__VA_ARGS__); \
  11. if (buffer) { \
  12. mpsc_state_print(buffer); \
  13. } \
  14. } \
  15. } while (0)
  16. static inline void mpsc_state_print(struct mpsc_pbuf_buffer *buffer)
  17. {
  18. if (MPSC_PBUF_DEBUG) {
  19. printk("wr:%d/%d, rd:%d/%d\n",
  20. buffer->wr_idx, buffer->tmp_wr_idx,
  21. buffer->rd_idx, buffer->tmp_rd_idx);
  22. }
  23. }
  24. void mpsc_pbuf_init(struct mpsc_pbuf_buffer *buffer,
  25. const struct mpsc_pbuf_buffer_config *cfg)
  26. {
  27. int err;
  28. memset(buffer, 0, offsetof(struct mpsc_pbuf_buffer, buf));
  29. buffer->get_wlen = cfg->get_wlen;
  30. buffer->notify_drop = cfg->notify_drop;
  31. buffer->buf = cfg->buf;
  32. buffer->size = cfg->size;
  33. buffer->flags = cfg->flags;
  34. if (is_power_of_two(buffer->size)) {
  35. buffer->flags |= MPSC_PBUF_SIZE_POW2;
  36. }
  37. err = k_sem_init(&buffer->sem, 0, 1);
  38. __ASSERT_NO_MSG(err == 0);
  39. }
  40. static inline bool free_space(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
  41. {
  42. if (buffer->rd_idx > buffer->tmp_wr_idx) {
  43. *res = buffer->rd_idx - buffer->tmp_wr_idx - 1;
  44. return false;
  45. } else if (!buffer->rd_idx) {
  46. *res = buffer->size - buffer->tmp_wr_idx - 1;
  47. return false;
  48. }
  49. *res = buffer->size - buffer->tmp_wr_idx;
  50. return true;
  51. }
  52. static inline bool available(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
  53. {
  54. if (buffer->tmp_rd_idx <= buffer->wr_idx) {
  55. *res = (buffer->wr_idx - buffer->tmp_rd_idx);
  56. return false;
  57. }
  58. *res = buffer->size - buffer->tmp_rd_idx;
  59. return true;
  60. }
  61. static inline bool is_valid(union mpsc_pbuf_generic *item)
  62. {
  63. return item->hdr.valid;
  64. }
  65. static inline bool is_invalid(union mpsc_pbuf_generic *item)
  66. {
  67. return !item->hdr.valid && !item->hdr.busy;
  68. }
  69. static inline uint32_t idx_inc(struct mpsc_pbuf_buffer *buffer,
  70. uint32_t idx, uint32_t val)
  71. {
  72. uint32_t i = idx + val;
  73. if (buffer->flags & MPSC_PBUF_SIZE_POW2) {
  74. return i & (buffer->size - 1);
  75. }
  76. return (i >= buffer->size) ? i - buffer->size : i;
  77. }
  78. static inline uint32_t idx_dec(struct mpsc_pbuf_buffer *buffer,
  79. uint32_t idx, uint32_t val)
  80. {
  81. uint32_t i = idx - val;
  82. if (buffer->flags & MPSC_PBUF_SIZE_POW2) {
  83. return idx & (buffer->size - 1);
  84. }
  85. return (i >= buffer->size) ? i + buffer->size : i;
  86. }
  87. static inline uint32_t get_skip(union mpsc_pbuf_generic *item)
  88. {
  89. if (item->hdr.busy && !item->hdr.valid) {
  90. return item->skip.len;
  91. }
  92. return 0;
  93. }
  94. static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
  95. {
  96. union mpsc_pbuf_generic skip = {
  97. .skip = { .valid = 0, .busy = 1, .len = wlen }
  98. };
  99. buffer->buf[buffer->tmp_wr_idx] = skip.raw;
  100. buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen);
  101. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
  102. }
  103. /* Attempts to drop a packet. If user packets dropping is allowed then any
  104. * type of packet is dropped. Otherwise only skip packets (internal padding).
  105. *
  106. * If user packet was dropped @p user_packet is set to true. Function returns
  107. * a pointer to a dropped packet or null if nothing was dropped. It may point
  108. * to user packet (@p user_packet set to true) or internal, skip packet.
  109. */
  110. static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer,
  111. uint32_t free_wlen,
  112. bool allow_drop,
  113. bool *user_packet)
  114. {
  115. union mpsc_pbuf_generic *item;
  116. uint32_t rd_wlen;
  117. uint32_t skip_wlen;
  118. *user_packet = false;
  119. item = (union mpsc_pbuf_generic *)&buffer->buf[buffer->rd_idx];
  120. skip_wlen = get_skip(item);
  121. rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item);
  122. if (skip_wlen) {
  123. allow_drop = true;
  124. } else if (allow_drop) {
  125. if (item->hdr.busy) {
  126. /* item is currently processed and cannot be overwritten. */
  127. add_skip_item(buffer, free_wlen + 1);
  128. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen);
  129. buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, rd_wlen);
  130. /* Get next itme followed the busy one. */
  131. uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
  132. item = (union mpsc_pbuf_generic *)&buffer->buf[next_rd_idx];
  133. skip_wlen = get_skip(item);
  134. if (skip_wlen) {
  135. rd_wlen += skip_wlen;
  136. } else {
  137. rd_wlen += buffer->get_wlen(item);
  138. *user_packet = true;
  139. }
  140. } else {
  141. *user_packet = true;
  142. }
  143. } else {
  144. item = NULL;
  145. }
  146. if (allow_drop) {
  147. buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
  148. buffer->tmp_rd_idx = buffer->rd_idx;
  149. }
  150. return item;
  151. }
  152. void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
  153. const union mpsc_pbuf_generic item)
  154. {
  155. bool cont;
  156. uint32_t free_wlen;
  157. k_spinlock_key_t key;
  158. union mpsc_pbuf_generic *dropped_item = NULL;
  159. bool valid_drop;
  160. do {
  161. cont = false;
  162. key = k_spin_lock(&buffer->lock);
  163. (void)free_space(buffer, &free_wlen);
  164. if (free_wlen) {
  165. buffer->buf[buffer->tmp_wr_idx] = item.raw;
  166. buffer->tmp_wr_idx = idx_inc(buffer,
  167. buffer->tmp_wr_idx, 1);
  168. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1);
  169. } else {
  170. bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
  171. dropped_item = drop_item_locked(buffer, free_wlen,
  172. user_drop, &valid_drop);
  173. cont = dropped_item != NULL;
  174. }
  175. k_spin_unlock(&buffer->lock, key);
  176. if (cont && valid_drop) {
  177. /* Notify about item being dropped. */
  178. buffer->notify_drop(buffer, dropped_item);
  179. }
  180. } while (cont);
  181. }
  182. union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
  183. size_t wlen, k_timeout_t timeout)
  184. {
  185. union mpsc_pbuf_generic *item = NULL;
  186. union mpsc_pbuf_generic *dropped_item = NULL;
  187. bool cont;
  188. uint32_t free_wlen;
  189. bool valid_drop;
  190. MPSC_PBUF_DBG(buffer, "alloc %d words, ", (int)wlen);
  191. if (wlen > (buffer->size - 1)) {
  192. MPSC_PBUF_DBG(buffer, "Failed to alloc, ");
  193. return NULL;
  194. }
  195. do {
  196. k_spinlock_key_t key;
  197. bool wrap;
  198. cont = false;
  199. key = k_spin_lock(&buffer->lock);
  200. wrap = free_space(buffer, &free_wlen);
  201. if (free_wlen >= wlen) {
  202. item =
  203. (union mpsc_pbuf_generic *)&buffer->buf[buffer->tmp_wr_idx];
  204. item->hdr.valid = 0;
  205. item->hdr.busy = 0;
  206. buffer->tmp_wr_idx = idx_inc(buffer,
  207. buffer->tmp_wr_idx, wlen);
  208. } else if (wrap) {
  209. add_skip_item(buffer, free_wlen);
  210. cont = true;
  211. } else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) &&
  212. !k_is_in_isr()) {
  213. int err;
  214. k_spin_unlock(&buffer->lock, key);
  215. err = k_sem_take(&buffer->sem, timeout);
  216. key = k_spin_lock(&buffer->lock);
  217. if (err == 0) {
  218. cont = true;
  219. }
  220. } else {
  221. bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
  222. dropped_item = drop_item_locked(buffer, free_wlen,
  223. user_drop, &valid_drop);
  224. cont = dropped_item != NULL;
  225. }
  226. k_spin_unlock(&buffer->lock, key);
  227. if (cont && dropped_item && valid_drop) {
  228. /* Notify about item being dropped. */
  229. buffer->notify_drop(buffer, dropped_item);
  230. dropped_item = NULL;
  231. }
  232. } while (cont);
  233. MPSC_PBUF_DBG(buffer, "allocated %p ", item);
  234. if (IS_ENABLED(CONFIG_MPSC_CLEAR_ALLOCATED) && item) {
  235. /* During test fill with 0's to simplify message comparison */
  236. memset(item, 0, sizeof(int) * wlen);
  237. }
  238. return item;
  239. }
  240. void mpsc_pbuf_commit(struct mpsc_pbuf_buffer *buffer,
  241. union mpsc_pbuf_generic *item)
  242. {
  243. uint32_t wlen = buffer->get_wlen(item);
  244. k_spinlock_key_t key = k_spin_lock(&buffer->lock);
  245. item->hdr.valid = 1;
  246. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
  247. k_spin_unlock(&buffer->lock, key);
  248. MPSC_PBUF_DBG(buffer, "committed %p ", item);
  249. }
  250. void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer,
  251. const union mpsc_pbuf_generic item,
  252. const void *data)
  253. {
  254. static const size_t l =
  255. (sizeof(item) + sizeof(data)) / sizeof(uint32_t);
  256. union mpsc_pbuf_generic *dropped_item = NULL;
  257. bool cont;
  258. bool valid_drop;
  259. do {
  260. k_spinlock_key_t key;
  261. uint32_t free_wlen;
  262. bool wrap;
  263. cont = false;
  264. key = k_spin_lock(&buffer->lock);
  265. wrap = free_space(buffer, &free_wlen);
  266. if (free_wlen >= l) {
  267. buffer->buf[buffer->tmp_wr_idx] = item.raw;
  268. void **p =
  269. (void **)&buffer->buf[buffer->tmp_wr_idx + 1];
  270. *p = (void *)data;
  271. buffer->tmp_wr_idx =
  272. idx_inc(buffer, buffer->tmp_wr_idx, l);
  273. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l);
  274. } else if (wrap) {
  275. add_skip_item(buffer, free_wlen);
  276. cont = true;
  277. } else {
  278. bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
  279. dropped_item = drop_item_locked(buffer, free_wlen,
  280. user_drop, &valid_drop);
  281. cont = dropped_item != NULL;
  282. }
  283. k_spin_unlock(&buffer->lock, key);
  284. if (cont && dropped_item && valid_drop) {
  285. /* Notify about item being dropped. */
  286. buffer->notify_drop(buffer, dropped_item);
  287. dropped_item = NULL;
  288. }
  289. } while (cont);
  290. }
  291. void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data,
  292. size_t wlen)
  293. {
  294. bool cont;
  295. union mpsc_pbuf_generic *dropped_item = NULL;
  296. bool valid_drop;
  297. do {
  298. uint32_t free_wlen;
  299. k_spinlock_key_t key;
  300. bool wrap;
  301. cont = false;
  302. key = k_spin_lock(&buffer->lock);
  303. wrap = free_space(buffer, &free_wlen);
  304. if (free_wlen >= wlen) {
  305. memcpy(&buffer->buf[buffer->tmp_wr_idx], data,
  306. wlen * sizeof(uint32_t));
  307. buffer->tmp_wr_idx =
  308. idx_inc(buffer, buffer->tmp_wr_idx, wlen);
  309. buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
  310. } else if (wrap) {
  311. add_skip_item(buffer, free_wlen);
  312. cont = true;
  313. } else {
  314. bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
  315. dropped_item = drop_item_locked(buffer, free_wlen,
  316. user_drop, &valid_drop);
  317. cont = dropped_item != NULL;
  318. }
  319. k_spin_unlock(&buffer->lock, key);
  320. if (cont && dropped_item && valid_drop) {
  321. /* Notify about item being dropped. */
  322. buffer->notify_drop(buffer, dropped_item);
  323. dropped_item = NULL;
  324. }
  325. } while (cont);
  326. }
  327. const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
  328. {
  329. union mpsc_pbuf_generic *item;
  330. bool cont;
  331. do {
  332. uint32_t a;
  333. k_spinlock_key_t key;
  334. bool wrap;
  335. cont = false;
  336. key = k_spin_lock(&buffer->lock);
  337. wrap = available(buffer, &a);
  338. item = (union mpsc_pbuf_generic *)
  339. &buffer->buf[buffer->tmp_rd_idx];
  340. if (!a || is_invalid(item)) {
  341. item = NULL;
  342. } else {
  343. uint32_t skip = get_skip(item);
  344. if (skip || !is_valid(item)) {
  345. uint32_t inc =
  346. skip ? skip : buffer->get_wlen(item);
  347. buffer->tmp_rd_idx =
  348. idx_inc(buffer, buffer->tmp_rd_idx, inc);
  349. buffer->rd_idx =
  350. idx_inc(buffer, buffer->rd_idx, inc);
  351. cont = true;
  352. } else {
  353. item->hdr.busy = 1;
  354. buffer->tmp_rd_idx =
  355. idx_inc(buffer, buffer->tmp_rd_idx,
  356. buffer->get_wlen(item));
  357. }
  358. }
  359. if (!cont) {
  360. MPSC_PBUF_DBG(buffer, "claimed: %p ", item);
  361. }
  362. k_spin_unlock(&buffer->lock, key);
  363. } while (cont);
  364. return item;
  365. }
  366. void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer,
  367. union mpsc_pbuf_generic *item)
  368. {
  369. uint32_t wlen = buffer->get_wlen(item);
  370. k_spinlock_key_t key = k_spin_lock(&buffer->lock);
  371. item->hdr.valid = 0;
  372. if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) ||
  373. ((uint32_t *)item == &buffer->buf[buffer->rd_idx])) {
  374. item->hdr.busy = 0;
  375. buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen);
  376. } else {
  377. item->skip.len = wlen;
  378. }
  379. MPSC_PBUF_DBG(buffer, "freed: %p ", item);
  380. k_spin_unlock(&buffer->lock, key);
  381. k_sem_give(&buffer->sem);
  382. }
  383. bool mpsc_pbuf_is_pending(struct mpsc_pbuf_buffer *buffer)
  384. {
  385. uint32_t a;
  386. (void)available(buffer, &a);
  387. return a ? true : false;
  388. }