pipes.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. /*
  2. * Copyright (c) 2016 Wind River Systems, Inc.
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. /**
  7. * @file
  8. *
  9. * @brief Pipes
  10. */
  11. #include <kernel.h>
  12. #include <kernel_structs.h>
  13. #include <toolchain.h>
  14. #include <ksched.h>
  15. #include <wait_q.h>
  16. #include <init.h>
  17. #include <syscall_handler.h>
  18. #include <kernel_internal.h>
  19. #include <sys/check.h>
  20. struct k_pipe_desc {
  21. unsigned char *buffer; /* Position in src/dest buffer */
  22. size_t bytes_to_xfer; /* # bytes left to transfer */
  23. #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
  24. struct k_mem_block *block; /* Pointer to memory block */
  25. struct k_mem_block copy_block; /* For backwards compatibility */
  26. struct k_sem *sem; /* Semaphore to give if async */
  27. #endif
  28. };
  29. struct k_pipe_async {
  30. struct _thread_base thread; /* Dummy thread object */
  31. struct k_pipe_desc desc; /* Pipe message descriptor */
  32. };
  33. #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
  34. /* stack of unused asynchronous message descriptors */
  35. K_STACK_DEFINE(pipe_async_msgs, CONFIG_NUM_PIPE_ASYNC_MSGS);
  36. #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
  37. #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
  38. /*
  39. * Do run-time initialization of pipe object subsystem.
  40. */
  41. static int init_pipes_module(const struct device *dev)
  42. {
  43. ARG_UNUSED(dev);
  44. /* Array of asynchronous message descriptors */
  45. static struct k_pipe_async __noinit async_msg[CONFIG_NUM_PIPE_ASYNC_MSGS];
  46. #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
  47. /*
  48. * Create pool of asynchronous pipe message descriptors.
  49. *
  50. * A dummy thread requires minimal initialization, since it never gets
  51. * to execute. The _THREAD_DUMMY flag is sufficient to distinguish a
  52. * dummy thread from a real one. The threads are *not* added to the
  53. * kernel's list of known threads.
  54. *
  55. * Once initialized, the address of each descriptor is added to a stack
  56. * that governs access to them.
  57. */
  58. for (int i = 0; i < CONFIG_NUM_PIPE_ASYNC_MSGS; i++) {
  59. async_msg[i].thread.thread_state = _THREAD_DUMMY;
  60. async_msg[i].thread.swap_data = &async_msg[i].desc;
  61. z_init_thread_timeout(&async_msg[i].thread);
  62. k_stack_push(&pipe_async_msgs, (stack_data_t)&async_msg[i]);
  63. }
  64. #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
  65. /* Complete initialization of statically defined mailboxes. */
  66. return 0;
  67. }
  68. SYS_INIT(init_pipes_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
  69. #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS */
  70. void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
  71. {
  72. pipe->buffer = buffer;
  73. pipe->size = size;
  74. pipe->bytes_used = 0;
  75. pipe->read_index = 0;
  76. pipe->write_index = 0;
  77. pipe->lock = (struct k_spinlock){};
  78. z_waitq_init(&pipe->wait_q.writers);
  79. z_waitq_init(&pipe->wait_q.readers);
  80. SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe);
  81. pipe->flags = 0;
  82. z_object_init(pipe);
  83. }
  84. int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
  85. {
  86. void *buffer;
  87. int ret;
  88. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
  89. if (size != 0U) {
  90. buffer = z_thread_malloc(size);
  91. if (buffer != NULL) {
  92. k_pipe_init(pipe, buffer, size);
  93. pipe->flags = K_PIPE_FLAG_ALLOC;
  94. ret = 0;
  95. } else {
  96. ret = -ENOMEM;
  97. }
  98. } else {
  99. k_pipe_init(pipe, NULL, 0);
  100. ret = 0;
  101. }
  102. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
  103. return ret;
  104. }
  105. #ifdef CONFIG_USERSPACE
  106. static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
  107. {
  108. Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
  109. return z_impl_k_pipe_alloc_init(pipe, size);
  110. }
  111. #include <syscalls/k_pipe_alloc_init_mrsh.c>
  112. #endif
  113. int k_pipe_cleanup(struct k_pipe *pipe)
  114. {
  115. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
  116. CHECKIF(z_waitq_head(&pipe->wait_q.readers) != NULL ||
  117. z_waitq_head(&pipe->wait_q.writers) != NULL) {
  118. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
  119. return -EAGAIN;
  120. }
  121. if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
  122. k_free(pipe->buffer);
  123. pipe->buffer = NULL;
  124. pipe->flags &= ~K_PIPE_FLAG_ALLOC;
  125. }
  126. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0);
  127. return 0;
  128. }
  129. /**
  130. * @brief Copy bytes from @a src to @a dest
  131. *
  132. * @return Number of bytes copied
  133. */
  134. static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
  135. const unsigned char *src, size_t src_size)
  136. {
  137. size_t num_bytes = MIN(dest_size, src_size);
  138. const unsigned char *end = src + num_bytes;
  139. while (src != end) {
  140. *dest = *src;
  141. dest++;
  142. src++;
  143. }
  144. return num_bytes;
  145. }
  146. /**
  147. * @brief Put data from @a src into the pipe's circular buffer
  148. *
  149. * Modifies the following fields in @a pipe:
  150. * buffer, bytes_used, write_index
  151. *
  152. * @return Number of bytes written to the pipe's circular buffer
  153. */
  154. static size_t pipe_buffer_put(struct k_pipe *pipe,
  155. const unsigned char *src, size_t src_size)
  156. {
  157. size_t bytes_copied;
  158. size_t run_length;
  159. size_t num_bytes_written = 0;
  160. int i;
  161. for (i = 0; i < 2; i++) {
  162. run_length = MIN(pipe->size - pipe->bytes_used,
  163. pipe->size - pipe->write_index);
  164. bytes_copied = pipe_xfer(pipe->buffer + pipe->write_index,
  165. run_length,
  166. src + num_bytes_written,
  167. src_size - num_bytes_written);
  168. num_bytes_written += bytes_copied;
  169. pipe->bytes_used += bytes_copied;
  170. pipe->write_index += bytes_copied;
  171. if (pipe->write_index == pipe->size) {
  172. pipe->write_index = 0;
  173. }
  174. }
  175. return num_bytes_written;
  176. }
  177. /**
  178. * @brief Get data from the pipe's circular buffer
  179. *
  180. * Modifies the following fields in @a pipe:
  181. * bytes_used, read_index
  182. *
  183. * @return Number of bytes read from the pipe's circular buffer
  184. */
  185. static size_t pipe_buffer_get(struct k_pipe *pipe,
  186. unsigned char *dest, size_t dest_size)
  187. {
  188. size_t bytes_copied;
  189. size_t run_length;
  190. size_t num_bytes_read = 0;
  191. int i;
  192. for (i = 0; i < 2; i++) {
  193. run_length = MIN(pipe->bytes_used,
  194. pipe->size - pipe->read_index);
  195. bytes_copied = pipe_xfer(dest + num_bytes_read,
  196. dest_size - num_bytes_read,
  197. pipe->buffer + pipe->read_index,
  198. run_length);
  199. num_bytes_read += bytes_copied;
  200. pipe->bytes_used -= bytes_copied;
  201. pipe->read_index += bytes_copied;
  202. if (pipe->read_index == pipe->size) {
  203. pipe->read_index = 0;
  204. }
  205. }
  206. return num_bytes_read;
  207. }
  208. /**
  209. * @brief Prepare a working set of readers/writers
  210. *
  211. * Prepare a list of "working threads" into/from which the data
  212. * will be directly copied. This list is useful as it is used to ...
  213. *
  214. * 1. avoid double copying
  215. * 2. minimize interrupt latency as interrupts are unlocked
  216. * while copying data
  217. * 3. ensure a timeout can not make the request impossible to satisfy
  218. *
  219. * The list is populated with previously pended threads that will be ready to
  220. * run after the pipe call is complete.
  221. *
  222. * Important things to remember when reading from the pipe ...
  223. * 1. If there are writers int @a wait_q, then the pipe's buffer is full.
  224. * 2. Conversely if the pipe's buffer is not full, there are no writers.
  225. * 3. The amount of available data in the pipe is the sum the bytes used in
  226. * the pipe (@a pipe_space) and all the requests from the waiting writers.
  227. * 4. Since data is read from the pipe's buffer first, the working set must
  228. * include writers that will (try to) re-fill the pipe's buffer afterwards.
  229. *
  230. * Important things to remember when writing to the pipe ...
  231. * 1. If there are readers in @a wait_q, then the pipe's buffer is empty.
  232. * 2. Conversely if the pipe's buffer is not empty, then there are no readers.
  233. * 3. The amount of space available in the pipe is the sum of the bytes unused
  234. * in the pipe (@a pipe_space) and all the requests from the waiting readers.
  235. *
  236. * @return false if request is unsatisfiable, otherwise true
  237. */
  238. static bool pipe_xfer_prepare(sys_dlist_t *xfer_list,
  239. struct k_thread **waiter,
  240. _wait_q_t *wait_q,
  241. size_t pipe_space,
  242. size_t bytes_to_xfer,
  243. size_t min_xfer,
  244. k_timeout_t timeout)
  245. {
  246. struct k_thread *thread;
  247. struct k_pipe_desc *desc;
  248. size_t num_bytes = 0;
  249. if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
  250. _WAIT_Q_FOR_EACH(wait_q, thread) {
  251. desc = (struct k_pipe_desc *)thread->base.swap_data;
  252. num_bytes += desc->bytes_to_xfer;
  253. if (num_bytes >= bytes_to_xfer) {
  254. break;
  255. }
  256. }
  257. if (num_bytes + pipe_space < min_xfer) {
  258. return false;
  259. }
  260. }
  261. /*
  262. * Either @a timeout is not K_NO_WAIT (so the thread may pend) or
  263. * the entire request can be satisfied. Generate the working list.
  264. */
  265. sys_dlist_init(xfer_list);
  266. num_bytes = 0;
  267. while ((thread = z_waitq_head(wait_q)) != NULL) {
  268. desc = (struct k_pipe_desc *)thread->base.swap_data;
  269. num_bytes += desc->bytes_to_xfer;
  270. if (num_bytes > bytes_to_xfer) {
  271. /*
  272. * This request can not be fully satisfied.
  273. * Do not remove it from the wait_q.
  274. * Do not abort its timeout (if applicable).
  275. * Do not add it to the transfer list
  276. */
  277. break;
  278. }
  279. /*
  280. * This request can be fully satisfied.
  281. * Remove it from the wait_q.
  282. * Abort its timeout.
  283. * Add it to the transfer list.
  284. */
  285. z_unpend_thread(thread);
  286. sys_dlist_append(xfer_list, &thread->base.qnode_dlist);
  287. }
  288. *waiter = (num_bytes > bytes_to_xfer) ? thread : NULL;
  289. return true;
  290. }
  291. /**
  292. * @brief Determine the correct return code
  293. *
  294. * Bytes Xferred No Wait Wait
  295. * >= Minimum 0 0
  296. * < Minimum -EIO* -EAGAIN
  297. *
  298. * * The "-EIO No Wait" case was already checked when the "working set"
  299. * was created in _pipe_xfer_prepare().
  300. *
  301. * @return See table above
  302. */
  303. static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
  304. size_t bytes_requested)
  305. {
  306. if (bytes_requested - bytes_remaining >= min_xfer) {
  307. /*
  308. * At least the minimum number of requested
  309. * bytes have been transferred.
  310. */
  311. return 0;
  312. }
  313. return -EAGAIN;
  314. }
  315. /**
  316. * @brief Ready a pipe thread
  317. *
  318. * If the pipe thread is a real thread, then add it to the ready queue.
  319. * If it is a dummy thread, then finish the asynchronous work.
  320. *
  321. * @return N/A
  322. */
  323. static void pipe_thread_ready(struct k_thread *thread)
  324. {
  325. #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
  326. if ((thread->base.thread_state & _THREAD_DUMMY) != 0U) {
  327. return;
  328. }
  329. #endif
  330. z_ready_thread(thread);
  331. }
  332. /**
  333. * @brief Internal API used to send data to a pipe
  334. */
  335. int z_pipe_put_internal(struct k_pipe *pipe, struct k_pipe_async *async_desc,
  336. unsigned char *data, size_t bytes_to_write,
  337. size_t *bytes_written, size_t min_xfer,
  338. k_timeout_t timeout)
  339. {
  340. struct k_thread *reader;
  341. struct k_pipe_desc *desc;
  342. sys_dlist_t xfer_list;
  343. size_t num_bytes_written = 0;
  344. size_t bytes_copied;
  345. #if (CONFIG_NUM_PIPE_ASYNC_MSGS == 0)
  346. ARG_UNUSED(async_desc);
  347. #endif
  348. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
  349. CHECKIF((min_xfer > bytes_to_write) || bytes_written == NULL) {
  350. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EINVAL);
  351. return -EINVAL;
  352. }
  353. k_spinlock_key_t key = k_spin_lock(&pipe->lock);
  354. /*
  355. * Create a list of "working readers" into which the data will be
  356. * directly copied.
  357. */
  358. if (!pipe_xfer_prepare(&xfer_list, &reader, &pipe->wait_q.readers,
  359. pipe->size - pipe->bytes_used, bytes_to_write,
  360. min_xfer, timeout)) {
  361. k_spin_unlock(&pipe->lock, key);
  362. *bytes_written = 0;
  363. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EIO);
  364. return -EIO;
  365. }
  366. SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
  367. z_sched_lock();
  368. k_spin_unlock(&pipe->lock, key);
  369. /*
  370. * 1. 'xfer_list' currently contains a list of reader threads that can
  371. * have their read requests fulfilled by the current call.
  372. * 2. 'reader' if not NULL points to a thread on the reader wait_q
  373. * that can get some of its requested data.
  374. * 3. Interrupts are unlocked but the scheduler is locked to allow
  375. * ticks to be delivered but no scheduling to occur
  376. * 4. If 'reader' times out while we are copying data, not only do we
  377. * still have a pointer to it, but it can not execute until this call
  378. * is complete so it is still safe to copy data to it.
  379. */
  380. struct k_thread *thread = (struct k_thread *)
  381. sys_dlist_get(&xfer_list);
  382. while (thread != NULL) {
  383. desc = (struct k_pipe_desc *)thread->base.swap_data;
  384. bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
  385. data + num_bytes_written,
  386. bytes_to_write - num_bytes_written);
  387. num_bytes_written += bytes_copied;
  388. desc->buffer += bytes_copied;
  389. desc->bytes_to_xfer -= bytes_copied;
  390. /* The thread's read request has been satisfied. Ready it. */
  391. z_ready_thread(thread);
  392. thread = (struct k_thread *)sys_dlist_get(&xfer_list);
  393. }
  394. /*
  395. * Copy any data to the reader that we left on the wait_q.
  396. * It is possible no data will be copied.
  397. */
  398. if (reader != NULL) {
  399. desc = (struct k_pipe_desc *)reader->base.swap_data;
  400. bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
  401. data + num_bytes_written,
  402. bytes_to_write - num_bytes_written);
  403. num_bytes_written += bytes_copied;
  404. desc->buffer += bytes_copied;
  405. desc->bytes_to_xfer -= bytes_copied;
  406. }
  407. /*
  408. * As much data as possible has been directly copied to any waiting
  409. * readers. Add as much as possible to the pipe's circular buffer.
  410. */
  411. num_bytes_written +=
  412. pipe_buffer_put(pipe, data + num_bytes_written,
  413. bytes_to_write - num_bytes_written);
  414. if (num_bytes_written == bytes_to_write) {
  415. *bytes_written = num_bytes_written;
  416. k_sched_unlock();
  417. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
  418. return 0;
  419. }
  420. if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
  421. && num_bytes_written >= min_xfer
  422. && min_xfer > 0U) {
  423. *bytes_written = num_bytes_written;
  424. k_sched_unlock();
  425. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
  426. return 0;
  427. }
  428. /* Not all data was copied */
  429. struct k_pipe_desc pipe_desc;
  430. pipe_desc.buffer = data + num_bytes_written;
  431. pipe_desc.bytes_to_xfer = bytes_to_write - num_bytes_written;
  432. if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
  433. _current->base.swap_data = &pipe_desc;
  434. /*
  435. * Lock interrupts and unlock the scheduler before
  436. * manipulating the writers wait_q.
  437. */
  438. k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
  439. z_sched_unlock_no_reschedule();
  440. (void)z_pend_curr(&pipe->lock, key2,
  441. &pipe->wait_q.writers, timeout);
  442. } else {
  443. k_sched_unlock();
  444. }
  445. *bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer;
  446. int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
  447. bytes_to_write);
  448. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
  449. return ret;
  450. }
  451. int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
  452. size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
  453. {
  454. struct k_thread *writer;
  455. struct k_pipe_desc *desc;
  456. sys_dlist_t xfer_list;
  457. size_t num_bytes_read = 0;
  458. size_t bytes_copied;
  459. SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
  460. CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
  461. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EINVAL);
  462. return -EINVAL;
  463. }
  464. k_spinlock_key_t key = k_spin_lock(&pipe->lock);
  465. /*
  466. * Create a list of "working readers" into which the data will be
  467. * directly copied.
  468. */
  469. if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
  470. pipe->bytes_used, bytes_to_read,
  471. min_xfer, timeout)) {
  472. k_spin_unlock(&pipe->lock, key);
  473. *bytes_read = 0;
  474. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EIO);
  475. return -EIO;
  476. }
  477. SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
  478. z_sched_lock();
  479. k_spin_unlock(&pipe->lock, key);
  480. num_bytes_read = pipe_buffer_get(pipe, data, bytes_to_read);
  481. /*
  482. * 1. 'xfer_list' currently contains a list of writer threads that can
  483. * have their write requests fulfilled by the current call.
  484. * 2. 'writer' if not NULL points to a thread on the writer wait_q
  485. * that can post some of its requested data.
  486. * 3. Data will be copied from each writer's buffer to either the
  487. * reader's buffer and/or to the pipe's circular buffer.
  488. * 4. Interrupts are unlocked but the scheduler is locked to allow
  489. * ticks to be delivered but no scheduling to occur
  490. * 5. If 'writer' times out while we are copying data, not only do we
  491. * still have a pointer to it, but it can not execute until this
  492. * call is complete so it is still safe to copy data from it.
  493. */
  494. struct k_thread *thread = (struct k_thread *)
  495. sys_dlist_get(&xfer_list);
  496. while ((thread != NULL) && (num_bytes_read < bytes_to_read)) {
  497. desc = (struct k_pipe_desc *)thread->base.swap_data;
  498. bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
  499. bytes_to_read - num_bytes_read,
  500. desc->buffer, desc->bytes_to_xfer);
  501. num_bytes_read += bytes_copied;
  502. desc->buffer += bytes_copied;
  503. desc->bytes_to_xfer -= bytes_copied;
  504. /*
  505. * It is expected that the write request will be satisfied.
  506. * However, if the read request was satisfied before the
  507. * write request was satisfied, then the write request must
  508. * finish later when writing to the pipe's circular buffer.
  509. */
  510. if (num_bytes_read == bytes_to_read) {
  511. break;
  512. }
  513. pipe_thread_ready(thread);
  514. thread = (struct k_thread *)sys_dlist_get(&xfer_list);
  515. }
  516. if ((writer != NULL) && (num_bytes_read < bytes_to_read)) {
  517. desc = (struct k_pipe_desc *)writer->base.swap_data;
  518. bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
  519. bytes_to_read - num_bytes_read,
  520. desc->buffer, desc->bytes_to_xfer);
  521. num_bytes_read += bytes_copied;
  522. desc->buffer += bytes_copied;
  523. desc->bytes_to_xfer -= bytes_copied;
  524. }
  525. /*
  526. * Copy as much data as possible from the writers (if any)
  527. * into the pipe's circular buffer.
  528. */
  529. while (thread != NULL) {
  530. desc = (struct k_pipe_desc *)thread->base.swap_data;
  531. bytes_copied = pipe_buffer_put(pipe, desc->buffer,
  532. desc->bytes_to_xfer);
  533. desc->buffer += bytes_copied;
  534. desc->bytes_to_xfer -= bytes_copied;
  535. /* Write request has been satisfied */
  536. pipe_thread_ready(thread);
  537. thread = (struct k_thread *)sys_dlist_get(&xfer_list);
  538. }
  539. if (writer != NULL) {
  540. desc = (struct k_pipe_desc *)writer->base.swap_data;
  541. bytes_copied = pipe_buffer_put(pipe, desc->buffer,
  542. desc->bytes_to_xfer);
  543. desc->buffer += bytes_copied;
  544. desc->bytes_to_xfer -= bytes_copied;
  545. }
  546. if (num_bytes_read == bytes_to_read) {
  547. k_sched_unlock();
  548. *bytes_read = num_bytes_read;
  549. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
  550. return 0;
  551. }
  552. if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
  553. && num_bytes_read >= min_xfer
  554. && min_xfer > 0U) {
  555. k_sched_unlock();
  556. *bytes_read = num_bytes_read;
  557. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
  558. return 0;
  559. }
  560. /* Not all data was read */
  561. struct k_pipe_desc pipe_desc;
  562. pipe_desc.buffer = (uint8_t *)data + num_bytes_read;
  563. pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
  564. if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
  565. _current->base.swap_data = &pipe_desc;
  566. k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
  567. z_sched_unlock_no_reschedule();
  568. (void)z_pend_curr(&pipe->lock, key2,
  569. &pipe->wait_q.readers, timeout);
  570. } else {
  571. k_sched_unlock();
  572. }
  573. *bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer;
  574. int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
  575. bytes_to_read);
  576. SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
  577. return ret;
  578. }
  579. #ifdef CONFIG_USERSPACE
  580. int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
  581. size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
  582. {
  583. Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
  584. Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
  585. Z_OOPS(Z_SYSCALL_MEMORY_WRITE((void *)data, bytes_to_read));
  586. return z_impl_k_pipe_get((struct k_pipe *)pipe, (void *)data,
  587. bytes_to_read, bytes_read, min_xfer,
  588. timeout);
  589. }
  590. #include <syscalls/k_pipe_get_mrsh.c>
  591. #endif
  592. int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
  593. size_t *bytes_written, size_t min_xfer,
  594. k_timeout_t timeout)
  595. {
  596. return z_pipe_put_internal(pipe, NULL, data,
  597. bytes_to_write, bytes_written,
  598. min_xfer, timeout);
  599. }
  600. #ifdef CONFIG_USERSPACE
  601. int z_vrfy_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
  602. size_t *bytes_written, size_t min_xfer,
  603. k_timeout_t timeout)
  604. {
  605. Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
  606. Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
  607. Z_OOPS(Z_SYSCALL_MEMORY_READ((void *)data, bytes_to_write));
  608. return z_impl_k_pipe_put((struct k_pipe *)pipe, (void *)data,
  609. bytes_to_write, bytes_written, min_xfer,
  610. timeout);
  611. }
  612. #include <syscalls/k_pipe_put_mrsh.c>
  613. #endif
  614. size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
  615. {
  616. size_t res;
  617. k_spinlock_key_t key;
  618. /* Buffer and size are fixed. No need to spin. */
  619. if (pipe->buffer == NULL || pipe->size == 0U) {
  620. res = 0;
  621. goto out;
  622. }
  623. key = k_spin_lock(&pipe->lock);
  624. if (pipe->read_index == pipe->write_index) {
  625. res = pipe->bytes_used;
  626. } else if (pipe->read_index < pipe->write_index) {
  627. res = pipe->write_index - pipe->read_index;
  628. } else {
  629. res = pipe->size - (pipe->read_index - pipe->write_index);
  630. }
  631. k_spin_unlock(&pipe->lock, key);
  632. out:
  633. return res;
  634. }
  635. #ifdef CONFIG_USERSPACE
  636. size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
  637. {
  638. Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
  639. return z_impl_k_pipe_read_avail(pipe);
  640. }
  641. #include <syscalls/k_pipe_read_avail_mrsh.c>
  642. #endif
  643. size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
  644. {
  645. size_t res;
  646. k_spinlock_key_t key;
  647. /* Buffer and size are fixed. No need to spin. */
  648. if (pipe->buffer == NULL || pipe->size == 0U) {
  649. res = 0;
  650. goto out;
  651. }
  652. key = k_spin_lock(&pipe->lock);
  653. if (pipe->write_index == pipe->read_index) {
  654. res = pipe->size - pipe->bytes_used;
  655. } else if (pipe->write_index < pipe->read_index) {
  656. res = pipe->read_index - pipe->write_index;
  657. } else {
  658. res = pipe->size - (pipe->write_index - pipe->read_index);
  659. }
  660. k_spin_unlock(&pipe->lock, key);
  661. out:
  662. return res;
  663. }
  664. #ifdef CONFIG_USERSPACE
  665. size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
  666. {
  667. Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
  668. return z_impl_k_pipe_write_avail(pipe);
  669. }
  670. #include <syscalls/k_pipe_write_avail_mrsh.c>
  671. #endif