canopen_program.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. # Copyright (c) 2020 Vestas Wind Systems A/S
  2. #
  3. # SPDX-License-Identifier: Apache-2.0
  4. '''Runner for performing program download over CANopen (DSP 302-3).'''
  5. import argparse
  6. import os
  7. from runners.core import ZephyrBinaryRunner, RunnerCaps
  8. try:
  9. import canopen
  10. from progress.bar import Bar
  11. MISSING_REQUIREMENTS = False
  12. except ImportError:
  13. MISSING_REQUIREMENTS = True
  14. # Default Python-CAN context to use, see python-can documentation for details
  15. DEFAULT_CAN_CONTEXT = 'default'
  16. # Object dictionary indexes
  17. H1F50_PROGRAM_DATA = 0x1F50
  18. H1F51_PROGRAM_CTRL = 0x1F51
  19. H1F56_PROGRAM_SWID = 0x1F56
  20. H1F57_FLASH_STATUS = 0x1F57
  21. # Program control commands
  22. PROGRAM_CTRL_STOP = 0x00
  23. PROGRAM_CTRL_START = 0x01
  24. PROGRAM_CTRL_RESET = 0x02
  25. PROGRAM_CTRL_CLEAR = 0x03
  26. PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
  27. class ToggleAction(argparse.Action):
  28. '''Toggle argument parser'''
  29. def __call__(self, parser, namespace, values, option_string=None):
  30. setattr(namespace, self.dest, not option_string.startswith('--no-'))
  31. class CANopenBinaryRunner(ZephyrBinaryRunner):
  32. '''Runner front-end for CANopen.'''
  33. def __init__(self, cfg, node_id, can_context=DEFAULT_CAN_CONTEXT,
  34. program_number=1, confirm=True,
  35. confirm_only=True, timeout=10):
  36. if MISSING_REQUIREMENTS:
  37. raise RuntimeError('one or more Python dependencies were missing; '
  38. "see the getting started guide for details on "
  39. "how to fix")
  40. super().__init__(cfg)
  41. self.bin_file = cfg.bin_file
  42. self.confirm = confirm
  43. self.confirm_only = confirm_only
  44. self.timeout = timeout
  45. self.downloader = CANopenProgramDownloader(logger=self.logger,
  46. node_id=node_id,
  47. can_context=can_context,
  48. program_number=program_number)
  49. @classmethod
  50. def name(cls):
  51. return 'canopen'
  52. @classmethod
  53. def capabilities(cls):
  54. return RunnerCaps(commands={'flash'}, flash_addr=False)
  55. @classmethod
  56. def do_add_parser(cls, parser):
  57. # Required:
  58. parser.add_argument('--node-id', required=True, help='Node ID')
  59. # Optional:
  60. parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
  61. help='Custom Python-CAN context to use')
  62. parser.add_argument('--program-number', default=1,
  63. help='program number, default is 1')
  64. parser.add_argument('--confirm', '--no-confirm',
  65. dest='confirm', nargs=0,
  66. action=ToggleAction,
  67. help='confirm after starting? (default: yes)')
  68. parser.add_argument('--confirm-only', default=False, action='store_true',
  69. help='confirm only, no program download (default: no)')
  70. parser.add_argument('--timeout', default=10,
  71. help='boot-up timeout, default is 10 seconds')
  72. parser.set_defaults(confirm=True)
  73. @classmethod
  74. def do_create(cls, cfg, args):
  75. return CANopenBinaryRunner(cfg, int(args.node_id),
  76. can_context=args.can_context,
  77. program_number=int(args.program_number),
  78. confirm=args.confirm,
  79. confirm_only=args.confirm_only,
  80. timeout=int(args.timeout))
  81. def do_run(self, command, **kwargs):
  82. if command == 'flash':
  83. self.flash(**kwargs)
  84. def flash(self, **kwargs):
  85. '''Download program to flash over CANopen'''
  86. self.ensure_output('bin')
  87. self.logger.info('Using Node ID %d, program number %d',
  88. self.downloader.node_id,
  89. self.downloader.program_number)
  90. self.downloader.connect()
  91. status = self.downloader.flash_status()
  92. if status == 0:
  93. self.downloader.swid()
  94. else:
  95. self.logger.warning('Flash status 0x{:02x}, '
  96. 'skipping software identification'.format(status))
  97. self.downloader.enter_pre_operational()
  98. if self.confirm_only:
  99. self.downloader.zephyr_confirm_program()
  100. self.downloader.disconnect()
  101. return
  102. if self.bin_file is None:
  103. raise ValueError('Cannot download program; bin_file is missing')
  104. self.downloader.stop_program()
  105. self.downloader.clear_program()
  106. self.downloader.download(self.bin_file)
  107. status = self.downloader.flash_status()
  108. if status != 0:
  109. raise ValueError('Program download failed: '
  110. 'flash status 0x{:02x}'.format(status))
  111. self.downloader.start_program()
  112. self.downloader.wait_for_bootup(self.timeout)
  113. self.downloader.swid()
  114. if self.confirm:
  115. self.downloader.enter_pre_operational()
  116. self.downloader.zephyr_confirm_program()
  117. self.downloader.disconnect()
  118. class CANopenProgramDownloader(object):
  119. '''CANopen program downloader'''
  120. def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
  121. program_number=1):
  122. super(CANopenProgramDownloader, self).__init__()
  123. self.logger = logger
  124. self.node_id = node_id
  125. self.can_context = can_context
  126. self.program_number = program_number
  127. self.network = canopen.Network()
  128. self.node = self.network.add_node(self.node_id,
  129. self.create_object_dictionary())
  130. self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
  131. self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
  132. self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
  133. self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
  134. def connect(self):
  135. '''Connect to CAN network'''
  136. try:
  137. self.network.connect(context=self.can_context)
  138. except:
  139. raise ValueError('Unable to connect to CAN network')
  140. def disconnect(self):
  141. '''Disconnect from CAN network'''
  142. self.network.disconnect()
  143. def enter_pre_operational(self):
  144. '''Enter pre-operational NMT state'''
  145. self.logger.info("Entering pre-operational mode")
  146. try:
  147. self.node.nmt.state = 'PRE-OPERATIONAL'
  148. except:
  149. raise ValueError('Failed to enter pre-operational mode')
  150. def _ctrl_program(self, cmd):
  151. '''Write program control command to CANopen object dictionary (0x1f51)'''
  152. try:
  153. self.ctrl_sdo.raw = cmd
  154. except:
  155. raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
  156. def stop_program(self):
  157. '''Write stop control command to CANopen object dictionary (0x1f51)'''
  158. self.logger.info('Stopping program')
  159. self._ctrl_program(PROGRAM_CTRL_STOP)
  160. def start_program(self):
  161. '''Write start control command to CANopen object dictionary (0x1f51)'''
  162. self.logger.info('Starting program')
  163. self._ctrl_program(PROGRAM_CTRL_START)
  164. def clear_program(self):
  165. '''Write clear control command to CANopen object dictionary (0x1f51)'''
  166. self.logger.info('Clearing program')
  167. self._ctrl_program(PROGRAM_CTRL_CLEAR)
  168. def zephyr_confirm_program(self):
  169. '''Write confirm control command to CANopen object dictionary (0x1f51)'''
  170. self.logger.info('Confirming program')
  171. self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
  172. def swid(self):
  173. '''Read software identification from CANopen object dictionary (0x1f56)'''
  174. try:
  175. swid = self.swid_sdo.raw
  176. except:
  177. raise ValueError('Failed to read software identification')
  178. self.logger.info('Program software identification: 0x{:08x}'.format(swid))
  179. return swid
  180. def flash_status(self):
  181. '''Read flash status identification'''
  182. try:
  183. status = self.flash_sdo.raw
  184. except:
  185. raise ValueError('Failed to read flash status identification')
  186. return status
  187. def download(self, bin_file):
  188. '''Download program to CANopen object dictionary (0x1f50)'''
  189. self.logger.info('Downloading program: %s', bin_file)
  190. try:
  191. size = os.path.getsize(bin_file)
  192. infile = open(bin_file, 'rb')
  193. outfile = self.data_sdo.open('wb', size=size)
  194. progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
  195. while True:
  196. chunk = infile.read(1024)
  197. if not chunk:
  198. break
  199. outfile.write(chunk)
  200. progress.next(n=len(chunk))
  201. progress.finish()
  202. infile.close()
  203. outfile.close()
  204. except:
  205. raise ValueError('Failed to download program')
  206. def wait_for_bootup(self, timeout=10):
  207. '''Wait for boot-up message reception'''
  208. self.logger.info('Waiting for boot-up message...')
  209. try:
  210. self.node.nmt.wait_for_bootup(timeout=timeout)
  211. except:
  212. raise ValueError('Timeout waiting for boot-up message')
  213. @staticmethod
  214. def create_object_dictionary():
  215. '''Create a synthetic CANopen object dictionary for program download'''
  216. objdict = canopen.objectdictionary.ObjectDictionary()
  217. array = canopen.objectdictionary.Array('Program data', 0x1f50)
  218. member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
  219. member.data_type = canopen.objectdictionary.DOMAIN
  220. array.add_member(member)
  221. objdict.add_object(array)
  222. array = canopen.objectdictionary.Array('Program control', 0x1f51)
  223. member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
  224. member.data_type = canopen.objectdictionary.UNSIGNED8
  225. array.add_member(member)
  226. objdict.add_object(array)
  227. array = canopen.objectdictionary.Array('Program sofware ID', 0x1f56)
  228. member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
  229. member.data_type = canopen.objectdictionary.UNSIGNED32
  230. array.add_member(member)
  231. objdict.add_object(array)
  232. array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
  233. member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
  234. member.data_type = canopen.objectdictionary.UNSIGNED32
  235. array.add_member(member)
  236. objdict.add_object(array)
  237. return objdict