123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- # Copyright (c) 2020 Vestas Wind Systems A/S
- #
- # SPDX-License-Identifier: Apache-2.0
- '''Runner for performing program download over CANopen (DSP 302-3).'''
- import argparse
- import os
- from runners.core import ZephyrBinaryRunner, RunnerCaps
- try:
- import canopen
- from progress.bar import Bar
- MISSING_REQUIREMENTS = False
- except ImportError:
- MISSING_REQUIREMENTS = True
- # Default Python-CAN context to use, see python-can documentation for details
- DEFAULT_CAN_CONTEXT = 'default'
- # Object dictionary indexes
- H1F50_PROGRAM_DATA = 0x1F50
- H1F51_PROGRAM_CTRL = 0x1F51
- H1F56_PROGRAM_SWID = 0x1F56
- H1F57_FLASH_STATUS = 0x1F57
- # Program control commands
- PROGRAM_CTRL_STOP = 0x00
- PROGRAM_CTRL_START = 0x01
- PROGRAM_CTRL_RESET = 0x02
- PROGRAM_CTRL_CLEAR = 0x03
- PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
- class ToggleAction(argparse.Action):
- '''Toggle argument parser'''
- def __call__(self, parser, namespace, values, option_string=None):
- setattr(namespace, self.dest, not option_string.startswith('--no-'))
- class CANopenBinaryRunner(ZephyrBinaryRunner):
- '''Runner front-end for CANopen.'''
- def __init__(self, cfg, node_id, can_context=DEFAULT_CAN_CONTEXT,
- program_number=1, confirm=True,
- confirm_only=True, timeout=10):
- if MISSING_REQUIREMENTS:
- raise RuntimeError('one or more Python dependencies were missing; '
- "see the getting started guide for details on "
- "how to fix")
- super().__init__(cfg)
- self.bin_file = cfg.bin_file
- self.confirm = confirm
- self.confirm_only = confirm_only
- self.timeout = timeout
- self.downloader = CANopenProgramDownloader(logger=self.logger,
- node_id=node_id,
- can_context=can_context,
- program_number=program_number)
- @classmethod
- def name(cls):
- return 'canopen'
- @classmethod
- def capabilities(cls):
- return RunnerCaps(commands={'flash'}, flash_addr=False)
- @classmethod
- def do_add_parser(cls, parser):
- # Required:
- parser.add_argument('--node-id', required=True, help='Node ID')
- # Optional:
- parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
- help='Custom Python-CAN context to use')
- parser.add_argument('--program-number', default=1,
- help='program number, default is 1')
- parser.add_argument('--confirm', '--no-confirm',
- dest='confirm', nargs=0,
- action=ToggleAction,
- help='confirm after starting? (default: yes)')
- parser.add_argument('--confirm-only', default=False, action='store_true',
- help='confirm only, no program download (default: no)')
- parser.add_argument('--timeout', default=10,
- help='boot-up timeout, default is 10 seconds')
- parser.set_defaults(confirm=True)
- @classmethod
- def do_create(cls, cfg, args):
- return CANopenBinaryRunner(cfg, int(args.node_id),
- can_context=args.can_context,
- program_number=int(args.program_number),
- confirm=args.confirm,
- confirm_only=args.confirm_only,
- timeout=int(args.timeout))
- def do_run(self, command, **kwargs):
- if command == 'flash':
- self.flash(**kwargs)
- def flash(self, **kwargs):
- '''Download program to flash over CANopen'''
- self.ensure_output('bin')
- self.logger.info('Using Node ID %d, program number %d',
- self.downloader.node_id,
- self.downloader.program_number)
- self.downloader.connect()
- status = self.downloader.flash_status()
- if status == 0:
- self.downloader.swid()
- else:
- self.logger.warning('Flash status 0x{:02x}, '
- 'skipping software identification'.format(status))
- self.downloader.enter_pre_operational()
- if self.confirm_only:
- self.downloader.zephyr_confirm_program()
- self.downloader.disconnect()
- return
- if self.bin_file is None:
- raise ValueError('Cannot download program; bin_file is missing')
- self.downloader.stop_program()
- self.downloader.clear_program()
- self.downloader.download(self.bin_file)
- status = self.downloader.flash_status()
- if status != 0:
- raise ValueError('Program download failed: '
- 'flash status 0x{:02x}'.format(status))
- self.downloader.start_program()
- self.downloader.wait_for_bootup(self.timeout)
- self.downloader.swid()
- if self.confirm:
- self.downloader.enter_pre_operational()
- self.downloader.zephyr_confirm_program()
- self.downloader.disconnect()
- class CANopenProgramDownloader(object):
- '''CANopen program downloader'''
- def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
- program_number=1):
- super(CANopenProgramDownloader, self).__init__()
- self.logger = logger
- self.node_id = node_id
- self.can_context = can_context
- self.program_number = program_number
- self.network = canopen.Network()
- self.node = self.network.add_node(self.node_id,
- self.create_object_dictionary())
- self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
- self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
- self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
- self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
- def connect(self):
- '''Connect to CAN network'''
- try:
- self.network.connect(context=self.can_context)
- except:
- raise ValueError('Unable to connect to CAN network')
- def disconnect(self):
- '''Disconnect from CAN network'''
- self.network.disconnect()
- def enter_pre_operational(self):
- '''Enter pre-operational NMT state'''
- self.logger.info("Entering pre-operational mode")
- try:
- self.node.nmt.state = 'PRE-OPERATIONAL'
- except:
- raise ValueError('Failed to enter pre-operational mode')
- def _ctrl_program(self, cmd):
- '''Write program control command to CANopen object dictionary (0x1f51)'''
- try:
- self.ctrl_sdo.raw = cmd
- except:
- raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
- def stop_program(self):
- '''Write stop control command to CANopen object dictionary (0x1f51)'''
- self.logger.info('Stopping program')
- self._ctrl_program(PROGRAM_CTRL_STOP)
- def start_program(self):
- '''Write start control command to CANopen object dictionary (0x1f51)'''
- self.logger.info('Starting program')
- self._ctrl_program(PROGRAM_CTRL_START)
- def clear_program(self):
- '''Write clear control command to CANopen object dictionary (0x1f51)'''
- self.logger.info('Clearing program')
- self._ctrl_program(PROGRAM_CTRL_CLEAR)
- def zephyr_confirm_program(self):
- '''Write confirm control command to CANopen object dictionary (0x1f51)'''
- self.logger.info('Confirming program')
- self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
- def swid(self):
- '''Read software identification from CANopen object dictionary (0x1f56)'''
- try:
- swid = self.swid_sdo.raw
- except:
- raise ValueError('Failed to read software identification')
- self.logger.info('Program software identification: 0x{:08x}'.format(swid))
- return swid
- def flash_status(self):
- '''Read flash status identification'''
- try:
- status = self.flash_sdo.raw
- except:
- raise ValueError('Failed to read flash status identification')
- return status
- def download(self, bin_file):
- '''Download program to CANopen object dictionary (0x1f50)'''
- self.logger.info('Downloading program: %s', bin_file)
- try:
- size = os.path.getsize(bin_file)
- infile = open(bin_file, 'rb')
- outfile = self.data_sdo.open('wb', size=size)
- progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
- while True:
- chunk = infile.read(1024)
- if not chunk:
- break
- outfile.write(chunk)
- progress.next(n=len(chunk))
- progress.finish()
- infile.close()
- outfile.close()
- except:
- raise ValueError('Failed to download program')
- def wait_for_bootup(self, timeout=10):
- '''Wait for boot-up message reception'''
- self.logger.info('Waiting for boot-up message...')
- try:
- self.node.nmt.wait_for_bootup(timeout=timeout)
- except:
- raise ValueError('Timeout waiting for boot-up message')
- @staticmethod
- def create_object_dictionary():
- '''Create a synthetic CANopen object dictionary for program download'''
- objdict = canopen.objectdictionary.ObjectDictionary()
- array = canopen.objectdictionary.Array('Program data', 0x1f50)
- member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
- member.data_type = canopen.objectdictionary.DOMAIN
- array.add_member(member)
- objdict.add_object(array)
- array = canopen.objectdictionary.Array('Program control', 0x1f51)
- member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
- member.data_type = canopen.objectdictionary.UNSIGNED8
- array.add_member(member)
- objdict.add_object(array)
- array = canopen.objectdictionary.Array('Program sofware ID', 0x1f56)
- member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
- member.data_type = canopen.objectdictionary.UNSIGNED32
- array.add_member(member)
- objdict.add_object(array)
- array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
- member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
- member.data_type = canopen.objectdictionary.UNSIGNED32
- array.add_member(member)
- objdict.add_object(array)
- return objdict
|