stream.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. /*
  2. * Copyright (c) 2018 Actions Semiconductor Co., Ltd
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. /**
  7. * @file stream interface
  8. */
  9. #define SYS_LOG_DOMAIN "stream"
  10. #include <os_common_api.h>
  11. #include <mem_manager.h>
  12. #include <msg_manager.h>
  13. #include <string.h>
  14. #include <stdlib.h>
  15. #include <stdio.h>
  16. #include "stream_internal.h"
  17. static bool _stream_check_handle_state(io_stream_t handle, uint8_t need_state)
  18. {
  19. if (handle == NULL) {
  20. SYS_LOG_INF("handle is null\n");
  21. return false;
  22. }
  23. if (handle->state != need_state) {
  24. /*SYS_LOG_INF("stream state error current state %d need_state %d handle %p \n",
  25. handle->state,need_state,handle);*/
  26. return false;
  27. }
  28. return true;
  29. }
  30. io_stream_t stream_create(const stream_ops_t *ops, void *init_param)
  31. {
  32. int ret = 0;
  33. io_stream_t stream = NULL;
  34. stream = mem_malloc(sizeof(struct __stream));
  35. if (!stream) {
  36. goto exit;
  37. }
  38. /*init state */
  39. stream->state = STATE_INIT;
  40. stream->rofs = 0;
  41. stream->wofs = 0;
  42. stream->ops = ops;
  43. os_mutex_init(&stream->attach_lock);
  44. if (stream->ops->init) {
  45. ret = stream->ops->init(stream, init_param);
  46. }
  47. if (ret) {
  48. SYS_LOG_ERR("create failed 0x%p \n", stream);
  49. mem_free(stream);
  50. stream = NULL;
  51. }
  52. exit:
  53. SYS_LOG_DBG(" 0x%p \n",stream);
  54. return stream;
  55. }
  56. int stream_open(io_stream_t handle, stream_mode mode)
  57. {
  58. int res;
  59. if (!_stream_check_handle_state(handle,STATE_INIT)) {
  60. if (!_stream_check_handle_state(handle,STATE_CLOSE)) {
  61. return -ENOSYS;
  62. }
  63. }
  64. if (((mode & MODE_IN) && !handle->ops->read) ||
  65. ((mode & MODE_OUT) && !handle->ops->write)) {
  66. SYS_LOG_ERR("mode %d not permitted\n", mode);
  67. return -EPERM;
  68. }
  69. res = handle->ops->open(handle,mode);
  70. if (res) {
  71. SYS_LOG_ERR("open error %d\n", res);
  72. return res;
  73. }
  74. if((mode & (MODE_READ_BLOCK | MODE_WRITE_BLOCK))){
  75. handle->sync_sem = mem_malloc(sizeof(os_sem));
  76. if (!handle->sync_sem) {
  77. return -ENOMEM;
  78. }
  79. os_sem_init(handle->sync_sem, 0, 1);
  80. handle->write_finished = 0;
  81. }
  82. handle->mode = mode;
  83. handle->state = STATE_OPEN;
  84. return res;
  85. }
  86. int stream_read(io_stream_t handle, void *buf, int num)
  87. {
  88. int i;
  89. int brw;
  90. int try_cnt = 0;
  91. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  92. return -ENOSYS;
  93. }
  94. if (!(handle->mode & MODE_IN)) {
  95. return -EPERM;
  96. }
  97. if ((handle->mode & MODE_READ_BLOCK)) {
  98. while (stream_get_length(handle) < num) {
  99. if((handle->mode & MODE_BLOCK_TIMEOUT)){
  100. if (try_cnt ++ > 20) {
  101. SYS_LOG_INF("time out 1s");
  102. handle->write_finished = 1;
  103. return 0;
  104. }
  105. }
  106. os_sem_take(handle->sync_sem, 50);
  107. if(!_stream_check_handle_state(handle,STATE_OPEN)) {
  108. return -ENOSYS;
  109. }
  110. if (handle->write_finished) {
  111. break;
  112. }
  113. }
  114. }
  115. brw = handle->ops->read(handle, buf, num);
  116. if (brw < 0) {
  117. SYS_LOG_DBG("read failed [%d]\n", brw);
  118. brw = 0;
  119. return brw;
  120. }
  121. if (handle->sync_sem) {
  122. os_sem_give(handle->sync_sem);
  123. }
  124. if (!os_is_in_isr()) {
  125. os_mutex_lock(&handle->attach_lock, OS_FOREVER);
  126. }
  127. /**data read to attached stream */
  128. for (i = 0; i < ARRAY_SIZE(handle->attach_stream); i++) {
  129. if (handle->attach_mode[i] != MODE_IN)
  130. continue;
  131. if (!handle->attach_stream[i])
  132. continue;
  133. brw = handle->attach_stream[i]->ops->write(handle->attach_stream[i], buf, num);
  134. if (brw != num) {
  135. if (!os_is_in_isr()) {
  136. os_mutex_unlock(&handle->attach_lock);
  137. }
  138. return brw;
  139. }
  140. }
  141. if (!os_is_in_isr()) {
  142. os_mutex_unlock(&handle->attach_lock);
  143. }
  144. for (i = 0; i < ARRAY_SIZE(handle->observer_notify); i++) {
  145. if (handle->observer_notify[i] && (handle->observer_type[i] & STREAM_NOTIFY_READ)) {
  146. handle->observer_notify[i](handle->observer[i], handle->rofs,
  147. handle->wofs, handle->total_size, buf, brw, STREAM_NOTIFY_READ);
  148. }
  149. }
  150. return brw;
  151. }
  152. int stream_seek(io_stream_t handle, int offset, seek_dir origin)
  153. {
  154. int i;
  155. int brw = 0;
  156. int target_off = offset;
  157. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  158. return -ENOSYS;
  159. }
  160. if (!handle->ops->seek) {
  161. return -ENOSYS;
  162. }
  163. switch(origin) {
  164. case SEEK_DIR_BEG:
  165. target_off = offset;
  166. break;
  167. case SEEK_DIR_CUR:
  168. if ((handle->mode & MODE_IN_OUT) == MODE_OUT) {
  169. target_off = handle->wofs + offset;
  170. } else {
  171. target_off = handle->rofs + offset;
  172. }
  173. break;
  174. case SEEK_DIR_END:
  175. target_off = handle->total_size + offset;
  176. break;
  177. default:
  178. SYS_LOG_ERR("mode not support 0x%x \n", origin);
  179. return -1;
  180. }
  181. if ((handle->mode & MODE_IN_OUT) == MODE_IN_OUT) {
  182. while (target_off > handle->wofs) {
  183. os_sem_take(handle->sync_sem, 50);
  184. if(!_stream_check_handle_state(handle,STATE_OPEN)) {
  185. return -ENOSYS;
  186. }
  187. }
  188. }
  189. brw = handle->ops->seek(handle, target_off, SEEK_DIR_BEG);
  190. if (brw < 0) {
  191. SYS_LOG_ERR("seek failed [%d]\n", brw);
  192. return brw;
  193. }
  194. for (i = 0; i < ARRAY_SIZE(handle->observer_notify); i++) {
  195. if (handle->observer_notify[i] && (handle->observer_type[i] & STREAM_NOTIFY_SEEK)) {
  196. handle->observer_notify[i](handle->observer[i], handle->rofs,
  197. handle->wofs, handle->total_size, NULL, 0, STREAM_NOTIFY_SEEK);
  198. }
  199. }
  200. return brw;
  201. }
  202. int stream_tell(io_stream_t handle)
  203. {
  204. int brw;
  205. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  206. return -ENOSYS;
  207. }
  208. if (!handle->ops->tell) {
  209. return -ENOSYS;
  210. }
  211. brw = handle->ops->tell(handle);
  212. if (brw < 0) {
  213. SYS_LOG_ERR("tell failed [%d]\n", brw);
  214. return brw;
  215. }
  216. return brw;
  217. }
  218. int stream_write(io_stream_t handle, const void *buf, int num)
  219. {
  220. int brw;
  221. int i;
  222. int try_cnt = 0;
  223. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  224. return -ENOSYS;
  225. }
  226. if (!(handle->mode & MODE_OUT)) {
  227. return -EPERM;
  228. }
  229. if ((handle->mode & MODE_WRITE_BLOCK)) {
  230. while (stream_get_space(handle) < num) {
  231. if ((handle->mode & MODE_BLOCK_TIMEOUT)) {
  232. if (try_cnt ++ > 20) {
  233. SYS_LOG_INF("time out 1s");
  234. handle->write_finished = 1;
  235. return 0;
  236. }
  237. }
  238. os_sem_take(handle->sync_sem, 50);
  239. if(!_stream_check_handle_state(handle,STATE_OPEN)) {
  240. return -ENOSYS;
  241. }
  242. }
  243. }
  244. for (i = 0; i < ARRAY_SIZE(handle->observer_notify); i++) {
  245. if (handle->observer_notify[i] && (handle->observer_type[i] & STREAM_NOTIFY_PRE_WRITE)) {
  246. handle->observer_notify[i](handle->observer[i], handle->rofs,
  247. handle->wofs, handle->total_size, (void *)buf, num, STREAM_NOTIFY_PRE_WRITE);
  248. }
  249. }
  250. brw = handle->ops->write(handle, (void *)buf, num);
  251. if (brw != num) {
  252. //SYS_LOG_ERR("Failed writing to stream [%d]\n", brw);
  253. return brw;
  254. }
  255. if (!num) {
  256. handle->write_finished = 1;
  257. }
  258. if (handle->sync_sem)
  259. os_sem_give(handle->sync_sem);
  260. if (!os_is_in_isr()) {
  261. os_mutex_lock(&handle->attach_lock, OS_FOREVER);
  262. }
  263. /**data write to attached stream */
  264. for (i = 0; i < ARRAY_SIZE(handle->attach_stream); i++) {
  265. if (handle->attach_mode[i] != MODE_OUT)
  266. continue;
  267. if (!handle->attach_stream[i])
  268. continue;
  269. brw = handle->attach_stream[i]->ops->write(handle->attach_stream[i], (void *)buf, num);
  270. if (brw != num) {
  271. //SYS_LOG_ERR("Failed writing to stream [%d]\n", brw);
  272. if (!os_is_in_isr()) {
  273. os_mutex_unlock(&handle->attach_lock);
  274. }
  275. return brw;
  276. }
  277. }
  278. if (!os_is_in_isr()) {
  279. os_mutex_unlock(&handle->attach_lock);
  280. }
  281. for (i = 0; i < ARRAY_SIZE(handle->observer_notify); i++) {
  282. if (handle->observer_notify[i] && (handle->observer_type[i] & STREAM_NOTIFY_WRITE)) {
  283. handle->observer_notify[i](handle->observer[i], handle->rofs,
  284. handle->wofs, handle->total_size, (void *)buf, num, STREAM_NOTIFY_WRITE);
  285. }
  286. }
  287. return brw;
  288. }
  289. int stream_flush(io_stream_t handle)
  290. {
  291. int brw;
  292. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  293. return -ENOSYS;
  294. }
  295. /** same stream not support flush */
  296. if (!handle->ops->flush) {
  297. return 0;
  298. }
  299. brw = handle->ops->flush(handle);
  300. if (brw < 0) {
  301. SYS_LOG_ERR("failed [%d]\n", brw);
  302. return brw;
  303. }
  304. return 0;
  305. }
  306. int stream_close(io_stream_t handle)
  307. {
  308. int res;
  309. if (!_stream_check_handle_state(handle, STATE_OPEN)) {
  310. SYS_LOG_ERR("state error\n");
  311. return -ENOSYS;
  312. }
  313. if (handle->attached_stream) {
  314. stream_detach(handle->attached_stream, handle);
  315. }
  316. res = handle->ops->close(handle);
  317. if (res) {
  318. SYS_LOG_ERR("close failed [%d]\n", res);
  319. }
  320. if (handle->sync_sem) {
  321. handle->write_finished = 1;
  322. os_sem_give(handle->sync_sem);
  323. }
  324. handle->state = STATE_CLOSE;
  325. return res;
  326. }
  327. int stream_destroy(io_stream_t handle)
  328. {
  329. int res = 0;
  330. if (handle->attached_stream) {
  331. stream_detach(handle->attached_stream, handle);
  332. }
  333. /* ops->destroy should also be allowed to be NULL, since ops->init is allowed to be NULL */
  334. if (handle->ops->destroy) {
  335. res = handle->ops->destroy(handle);
  336. if (res) {
  337. SYS_LOG_ERR("destroy failed [%d]\n", res);
  338. }
  339. }
  340. if (handle->sync_sem)
  341. mem_free(handle->sync_sem);
  342. mem_free(handle);
  343. return res;
  344. }
  345. bool stream_check_finished(io_stream_t handle)
  346. {
  347. bool brw = false;
  348. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  349. goto exit;
  350. }
  351. if ((handle->write_finished) && (stream_get_length(handle) <= 0)) {
  352. brw = true;
  353. }
  354. exit:
  355. return brw;
  356. }
  357. int stream_get_length(io_stream_t handle)
  358. {
  359. int brw = -ENOSYS;
  360. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  361. goto exit;
  362. }
  363. if (handle->ops->get_length) {
  364. brw = handle->ops->get_length(handle);
  365. } else {
  366. brw = handle->wofs - handle->rofs;
  367. }
  368. exit:
  369. return brw;
  370. }
  371. int stream_get_space(io_stream_t handle)
  372. {
  373. int brw = -ENOSYS;
  374. int attached_space = 0;
  375. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  376. goto exit;
  377. }
  378. if (handle->ops->get_space) {
  379. brw = handle->ops->get_space(handle);
  380. } else {
  381. brw = stream_get_length(handle);
  382. if (brw >= 0) {
  383. brw = handle->total_size - stream_get_length(handle);
  384. }
  385. }
  386. if (!os_is_in_isr()) {
  387. os_mutex_lock(&handle->attach_lock, OS_FOREVER);
  388. }
  389. /**data write to attached stream */
  390. for (int i = 0; i < ARRAY_SIZE(handle->attach_stream); i++) {
  391. if (handle->attach_mode[i] != MODE_OUT)
  392. continue;
  393. if (!handle->attach_stream[i])
  394. continue;
  395. attached_space = stream_get_space(handle->attach_stream[i]);
  396. if (brw > attached_space) {
  397. brw = attached_space;
  398. }
  399. }
  400. if (!os_is_in_isr()) {
  401. os_mutex_unlock(&handle->attach_lock);
  402. }
  403. exit:
  404. return brw;
  405. }
  406. int stream_set_observer(io_stream_t handle, void * observer, stream_observer_notify notify, uint8_t type)
  407. {
  408. int i;
  409. if (!_stream_check_handle_state(handle,STATE_OPEN)) {
  410. return -ENOSYS;
  411. }
  412. for (i = 0; i < ARRAY_SIZE(handle->observer); i++) {
  413. if (!handle->observer_notify[i]) {
  414. handle->observer[i] = observer;
  415. handle->observer_type[i] = type;
  416. handle->observer_notify[i] = notify;
  417. return 0;
  418. }
  419. }
  420. return -EBUSY;
  421. }
  422. int stream_attach(io_stream_t origin, io_stream_t attach_stream, int attach_type)
  423. {
  424. int brw = -ENOSYS;
  425. int i;
  426. if (!(attach_stream->mode & MODE_OUT))
  427. return -EINVAL;
  428. if (!_stream_check_handle_state(origin,STATE_OPEN)) {
  429. goto exit;
  430. }
  431. if (!(origin->mode & attach_type)) {
  432. SYS_LOG_ERR("mode %d not match %d \n", origin->mode, attach_type);
  433. return -EINVAL;
  434. }
  435. if (!os_is_in_isr()) {
  436. os_mutex_lock(&origin->attach_lock, OS_FOREVER);
  437. }
  438. for (i = 0; i < ARRAY_SIZE(origin->attach_stream); i++) {
  439. if (!origin->attach_stream[i]) {
  440. origin->attach_stream[i] = attach_stream;
  441. origin->attach_mode[i] = attach_type;
  442. attach_stream->attached_stream = origin;
  443. SYS_LOG_INF("origin %p , attach %p mode %d \n",origin,attach_stream,attach_type);
  444. brw = 0;
  445. break;
  446. }
  447. }
  448. if (!os_is_in_isr()) {
  449. os_mutex_unlock(&origin->attach_lock);
  450. }
  451. exit:
  452. return brw;
  453. }
  454. int stream_detach(io_stream_t origin, io_stream_t detach_stream)
  455. {
  456. int brw = -ENOSYS;
  457. int i;
  458. if (!_stream_check_handle_state(origin, STATE_OPEN)) {
  459. detach_stream->attached_stream = NULL;
  460. return -ENOSYS;
  461. }
  462. if (!os_is_in_isr()) {
  463. os_mutex_lock(&origin->attach_lock, OS_FOREVER);
  464. }
  465. for (i = 0; i < ARRAY_SIZE(origin->attach_stream); i++) {
  466. if (origin->attach_stream[i] == detach_stream) {
  467. origin->attach_stream[i] = NULL;
  468. origin->attach_mode[i] = 0;
  469. detach_stream->attached_stream = NULL;
  470. SYS_LOG_INF("origin %p , detach %p\n",origin,detach_stream);
  471. brw = 0;
  472. break;
  473. }
  474. }
  475. if (!os_is_in_isr()) {
  476. os_mutex_unlock(&origin->attach_lock);
  477. }
  478. return brw;
  479. }
  480. void *stream_get_ringbuffer(io_stream_t handle)
  481. {
  482. void *buf = NULL;
  483. if (handle == NULL) {
  484. return buf;
  485. }
  486. if (handle->ops && handle->ops->get_ringbuffer) {
  487. buf = handle->ops->get_ringbuffer(handle);
  488. }
  489. return buf;
  490. }
  491. void stream_dump(io_stream_t stream, const char *name, const char *line_prefix)
  492. {
  493. os_printk("%s%s (t:%d,s:%d): rofs=0x%x, wofs=0x%x, size=0x%x, length=0x%x, space=0x%x\n",
  494. line_prefix, name, stream->type, stream->state, stream->rofs, stream->wofs,
  495. stream->total_size, stream_get_length(stream), stream_get_space(stream));
  496. }