123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206 |
- #!/usr/bin/env python3
- # vim: set syntax=python ts=4 :
- #
- # Copyright (c) 2018 Intel Corporation
- # SPDX-License-Identifier: Apache-2.0
- import os
- import contextlib
- import string
- import mmap
- import sys
- import re
- import subprocess
- import select
- import shutil
- import shlex
- import signal
- import threading
- import concurrent.futures
- from collections import OrderedDict
- import queue
- import time
- import csv
- import glob
- import concurrent
- import xml.etree.ElementTree as ET
- import logging
- import pty
- from pathlib import Path
- from distutils.spawn import find_executable
- from colorama import Fore
- import pickle
- import platform
- import yaml
- import json
- from multiprocessing import Lock, Process, Value
- try:
- # Use the C LibYAML parser if available, rather than the Python parser.
- # It's much faster.
- from yaml import CSafeLoader as SafeLoader
- from yaml import CDumper as Dumper
- except ImportError:
- from yaml import SafeLoader, Dumper
- try:
- import serial
- except ImportError:
- print("Install pyserial python module with pip to use --device-testing option.")
- try:
- from tabulate import tabulate
- except ImportError:
- print("Install tabulate python module with pip to use --device-testing option.")
- try:
- import psutil
- except ImportError:
- print("Install psutil python module with pip to run in Qemu.")
- ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
- if not ZEPHYR_BASE:
- sys.exit("$ZEPHYR_BASE environment variable undefined")
- # This is needed to load edt.pickle files.
- sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
- "python-devicetree", "src"))
- from devicetree import edtlib # pylint: disable=unused-import
- # Use this for internal comparisons; that's what canonicalization is
- # for. Don't use it when invoking other components of the build system
- # to avoid confusing and hard to trace inconsistencies in error messages
- # and logs, generated Makefiles, etc. compared to when users invoke these
- # components directly.
- # Note "normalization" is different from canonicalization, see os.path.
- canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE)
- sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
- import scl
- import expr_parser
- logger = logging.getLogger('twister')
- logger.setLevel(logging.DEBUG)
- class ExecutionCounter(object):
- def __init__(self, total=0):
- self._done = Value('i', 0)
- self._passed = Value('i', 0)
- self._skipped_configs = Value('i', 0)
- self._skipped_runtime = Value('i', 0)
- self._skipped_cases = Value('i', 0)
- self._error = Value('i', 0)
- self._failed = Value('i', 0)
- self._total = Value('i', total)
- self._cases = Value('i', 0)
- self.lock = Lock()
- @property
- def cases(self):
- with self._cases.get_lock():
- return self._cases.value
- @cases.setter
- def cases(self, value):
- with self._cases.get_lock():
- self._cases.value = value
- @property
- def skipped_cases(self):
- with self._skipped_cases.get_lock():
- return self._skipped_cases.value
- @skipped_cases.setter
- def skipped_cases(self, value):
- with self._skipped_cases.get_lock():
- self._skipped_cases.value = value
- @property
- def error(self):
- with self._error.get_lock():
- return self._error.value
- @error.setter
- def error(self, value):
- with self._error.get_lock():
- self._error.value = value
- @property
- def done(self):
- with self._done.get_lock():
- return self._done.value
- @done.setter
- def done(self, value):
- with self._done.get_lock():
- self._done.value = value
- @property
- def passed(self):
- with self._passed.get_lock():
- return self._passed.value
- @passed.setter
- def passed(self, value):
- with self._passed.get_lock():
- self._passed.value = value
- @property
- def skipped_configs(self):
- with self._skipped_configs.get_lock():
- return self._skipped_configs.value
- @skipped_configs.setter
- def skipped_configs(self, value):
- with self._skipped_configs.get_lock():
- self._skipped_configs.value = value
- @property
- def skipped_runtime(self):
- with self._skipped_runtime.get_lock():
- return self._skipped_runtime.value
- @skipped_runtime.setter
- def skipped_runtime(self, value):
- with self._skipped_runtime.get_lock():
- self._skipped_runtime.value = value
- @property
- def failed(self):
- with self._failed.get_lock():
- return self._failed.value
- @failed.setter
- def failed(self, value):
- with self._failed.get_lock():
- self._failed.value = value
- @property
- def total(self):
- with self._total.get_lock():
- return self._total.value
- class CMakeCacheEntry:
- '''Represents a CMake cache entry.
- This class understands the type system in a CMakeCache.txt, and
- converts the following cache types to Python types:
- Cache Type Python type
- ---------- -------------------------------------------
- FILEPATH str
- PATH str
- STRING str OR list of str (if ';' is in the value)
- BOOL bool
- INTERNAL str OR list of str (if ';' is in the value)
- ---------- -------------------------------------------
- '''
- # Regular expression for a cache entry.
- #
- # CMake variable names can include escape characters, allowing a
- # wider set of names than is easy to match with a regular
- # expression. To be permissive here, use a non-greedy match up to
- # the first colon (':'). This breaks if the variable name has a
- # colon inside, but it's good enough.
- CACHE_ENTRY = re.compile(
- r'''(?P<name>.*?) # name
- :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL) # type
- =(?P<value>.*) # value
- ''', re.X)
- @classmethod
- def _to_bool(cls, val):
- # Convert a CMake BOOL string into a Python bool.
- #
- # "True if the constant is 1, ON, YES, TRUE, Y, or a
- # non-zero number. False if the constant is 0, OFF, NO,
- # FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in
- # the suffix -NOTFOUND. Named boolean constants are
- # case-insensitive. If the argument is not one of these
- # constants, it is treated as a variable."
- #
- # https://cmake.org/cmake/help/v3.0/command/if.html
- val = val.upper()
- if val in ('ON', 'YES', 'TRUE', 'Y'):
- return 1
- elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''):
- return 0
- elif val.endswith('-NOTFOUND'):
- return 0
- else:
- try:
- v = int(val)
- return v != 0
- except ValueError as exc:
- raise ValueError('invalid bool {}'.format(val)) from exc
- @classmethod
- def from_line(cls, line, line_no):
- # Comments can only occur at the beginning of a line.
- # (The value of an entry could contain a comment character).
- if line.startswith('//') or line.startswith('#'):
- return None
- # Whitespace-only lines do not contain cache entries.
- if not line.strip():
- return None
- m = cls.CACHE_ENTRY.match(line)
- if not m:
- return None
- name, type_, value = (m.group(g) for g in ('name', 'type', 'value'))
- if type_ == 'BOOL':
- try:
- value = cls._to_bool(value)
- except ValueError as exc:
- args = exc.args + ('on line {}: {}'.format(line_no, line),)
- raise ValueError(args) from exc
- elif type_ in ['STRING', 'INTERNAL']:
- # If the value is a CMake list (i.e. is a string which
- # contains a ';'), convert to a Python list.
- if ';' in value:
- value = value.split(';')
- return CMakeCacheEntry(name, value)
- def __init__(self, name, value):
- self.name = name
- self.value = value
- def __str__(self):
- fmt = 'CMakeCacheEntry(name={}, value={})'
- return fmt.format(self.name, self.value)
- class CMakeCache:
- '''Parses and represents a CMake cache file.'''
- @staticmethod
- def from_file(cache_file):
- return CMakeCache(cache_file)
- def __init__(self, cache_file):
- self.cache_file = cache_file
- self.load(cache_file)
- def load(self, cache_file):
- entries = []
- with open(cache_file, 'r') as cache:
- for line_no, line in enumerate(cache):
- entry = CMakeCacheEntry.from_line(line, line_no)
- if entry:
- entries.append(entry)
- self._entries = OrderedDict((e.name, e) for e in entries)
- def get(self, name, default=None):
- entry = self._entries.get(name)
- if entry is not None:
- return entry.value
- else:
- return default
- def get_list(self, name, default=None):
- if default is None:
- default = []
- entry = self._entries.get(name)
- if entry is not None:
- value = entry.value
- if isinstance(value, list):
- return value
- elif isinstance(value, str):
- return [value] if value else []
- else:
- msg = 'invalid value {} type {}'
- raise RuntimeError(msg.format(value, type(value)))
- else:
- return default
- def __contains__(self, name):
- return name in self._entries
- def __getitem__(self, name):
- return self._entries[name].value
- def __setitem__(self, name, entry):
- if not isinstance(entry, CMakeCacheEntry):
- msg = 'improper type {} for value {}, expecting CMakeCacheEntry'
- raise TypeError(msg.format(type(entry), entry))
- self._entries[name] = entry
- def __delitem__(self, name):
- del self._entries[name]
- def __iter__(self):
- return iter(self._entries.values())
- class TwisterException(Exception):
- pass
- class TwisterRuntimeError(TwisterException):
- pass
- class ConfigurationError(TwisterException):
- def __init__(self, cfile, message):
- TwisterException.__init__(self, cfile + ": " + message)
- class BuildError(TwisterException):
- pass
- class ExecutionError(TwisterException):
- pass
- class HarnessImporter:
- def __init__(self, name):
- sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
- module = __import__("harness")
- if name:
- my_class = getattr(module, name)
- else:
- my_class = getattr(module, "Test")
- self.instance = my_class()
- class Handler:
- def __init__(self, instance, type_str="build"):
- """Constructor
- """
- self.state = "waiting"
- self.run = False
- self.duration = 0
- self.type_str = type_str
- self.binary = None
- self.pid_fn = None
- self.call_make_run = False
- self.name = instance.name
- self.instance = instance
- self.timeout = instance.testcase.timeout
- self.sourcedir = instance.testcase.source_dir
- self.build_dir = instance.build_dir
- self.log = os.path.join(self.build_dir, "handler.log")
- self.returncode = 0
- self.set_state("running", self.duration)
- self.generator = None
- self.generator_cmd = None
- self.args = []
- self.terminated = False
- def set_state(self, state, duration):
- self.state = state
- self.duration = duration
- def get_state(self):
- ret = (self.state, self.duration)
- return ret
- def record(self, harness):
- if harness.recording:
- filename = os.path.join(self.build_dir, "recording.csv")
- with open(filename, "at") as csvfile:
- cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep)
- cw.writerow(harness.fieldnames)
- for instance in harness.recording:
- cw.writerow(instance)
- def terminate(self, proc):
- # encapsulate terminate functionality so we do it consistently where ever
- # we might want to terminate the proc. We need try_kill_process_by_pid
- # because of both how newer ninja (1.6.0 or greater) and .NET / renode
- # work. Newer ninja's don't seem to pass SIGTERM down to the children
- # so we need to use try_kill_process_by_pid.
- for child in psutil.Process(proc.pid).children(recursive=True):
- try:
- os.kill(child.pid, signal.SIGTERM)
- except ProcessLookupError:
- pass
- proc.terminate()
- # sleep for a while before attempting to kill
- time.sleep(0.5)
- proc.kill()
- self.terminated = True
- def add_missing_testscases(self, harness):
- """
- If testsuite was broken by some error (e.g. timeout) it is necessary to
- add information about next testcases, which were not be
- performed due to this error.
- """
- for c in self.instance.testcase.cases:
- if c not in harness.tests:
- harness.tests[c] = "BLOCK"
- class BinaryHandler(Handler):
- def __init__(self, instance, type_str):
- """Constructor
- @param instance Test Instance
- """
- super().__init__(instance, type_str)
- self.call_west_flash = False
- # Tool options
- self.valgrind = False
- self.lsan = False
- self.asan = False
- self.ubsan = False
- self.coverage = False
- def try_kill_process_by_pid(self):
- if self.pid_fn:
- pid = int(open(self.pid_fn).read())
- os.unlink(self.pid_fn)
- self.pid_fn = None # clear so we don't try to kill the binary twice
- try:
- os.kill(pid, signal.SIGTERM)
- except ProcessLookupError:
- pass
- def _output_reader(self, proc):
- self.line = proc.stdout.readline()
- def _output_handler(self, proc, harness):
- if harness.is_pytest:
- harness.handle(None)
- return
- log_out_fp = open(self.log, "wt")
- timeout_extended = False
- timeout_time = time.time() + self.timeout
- while True:
- this_timeout = timeout_time - time.time()
- if this_timeout < 0:
- break
- reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
- reader_t.start()
- reader_t.join(this_timeout)
- if not reader_t.is_alive():
- line = self.line
- logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip()))
- log_out_fp.write(line.decode('utf-8'))
- log_out_fp.flush()
- harness.handle(line.decode('utf-8').rstrip())
- if harness.state:
- if not timeout_extended or harness.capture_coverage:
- timeout_extended = True
- if harness.capture_coverage:
- timeout_time = time.time() + 30
- else:
- timeout_time = time.time() + 2
- else:
- reader_t.join(0)
- break
- try:
- # POSIX arch based ztests end on their own,
- # so let's give it up to 100ms to do so
- proc.wait(0.1)
- except subprocess.TimeoutExpired:
- self.terminate(proc)
- log_out_fp.close()
- def handle(self):
- harness_name = self.instance.testcase.harness.capitalize()
- harness_import = HarnessImporter(harness_name)
- harness = harness_import.instance
- harness.configure(self.instance)
- if self.call_make_run:
- command = [self.generator_cmd, "run"]
- elif self.call_west_flash:
- command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
- else:
- command = [self.binary]
- run_valgrind = False
- if self.valgrind and shutil.which("valgrind"):
- command = ["valgrind", "--error-exitcode=2",
- "--leak-check=full",
- "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
- "--log-file=" + self.build_dir + "/valgrind.log"
- ] + command
- run_valgrind = True
- logger.debug("Spawning process: " +
- " ".join(shlex.quote(word) for word in command) + os.linesep +
- "in directory: " + self.build_dir)
- start_time = time.time()
- env = os.environ.copy()
- if self.asan:
- env["ASAN_OPTIONS"] = "log_path=stdout:" + \
- env.get("ASAN_OPTIONS", "")
- if not self.lsan:
- env["ASAN_OPTIONS"] += "detect_leaks=0"
- if self.ubsan:
- env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
- env.get("UBSAN_OPTIONS", "")
- with subprocess.Popen(command, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
- logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
- t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
- t.start()
- t.join()
- if t.is_alive():
- self.terminate(proc)
- t.join()
- proc.wait()
- self.returncode = proc.returncode
- self.try_kill_process_by_pid()
- handler_time = time.time() - start_time
- if self.coverage:
- subprocess.call(["GCOV_PREFIX=" + self.build_dir,
- "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True)
- # FIXME: This is needed when killing the simulator, the console is
- # garbled and needs to be reset. Did not find a better way to do that.
- if sys.stdout.isatty():
- subprocess.call(["stty", "sane"])
- if harness.is_pytest:
- harness.pytest_run(self.log)
- self.instance.results = harness.tests
- if not self.terminated and self.returncode != 0:
- # When a process is killed, the default handler returns 128 + SIGTERM
- # so in that case the return code itself is not meaningful
- self.set_state("failed", handler_time)
- self.instance.reason = "Failed"
- elif run_valgrind and self.returncode == 2:
- self.set_state("failed", handler_time)
- self.instance.reason = "Valgrind error"
- elif harness.state:
- self.set_state(harness.state, handler_time)
- if harness.state == "failed":
- self.instance.reason = "Failed"
- else:
- self.set_state("timeout", handler_time)
- self.instance.reason = "Timeout"
- self.add_missing_testscases(harness)
- self.record(harness)
- class DeviceHandler(Handler):
- def __init__(self, instance, type_str):
- """Constructor
- @param instance Test Instance
- """
- super().__init__(instance, type_str)
- self.suite = None
- def monitor_serial(self, ser, halt_fileno, harness):
- if harness.is_pytest:
- harness.handle(None)
- return
- log_out_fp = open(self.log, "wt")
- ser_fileno = ser.fileno()
- readlist = [halt_fileno, ser_fileno]
- if self.coverage:
- # Set capture_coverage to True to indicate that right after
- # test results we should get coverage data, otherwise we exit
- # from the test.
- harness.capture_coverage = True
- ser.flush()
- while ser.isOpen():
- readable, _, _ = select.select(readlist, [], [], self.timeout)
- if halt_fileno in readable:
- logger.debug('halted')
- ser.close()
- break
- if ser_fileno not in readable:
- continue # Timeout.
- serial_line = None
- try:
- serial_line = ser.readline()
- except TypeError:
- pass
- except serial.SerialException:
- ser.close()
- break
- # Just because ser_fileno has data doesn't mean an entire line
- # is available yet.
- if serial_line:
- sl = serial_line.decode('utf-8', 'ignore').lstrip()
- logger.debug("DEVICE: {0}".format(sl.rstrip()))
- log_out_fp.write(sl)
- log_out_fp.flush()
- harness.handle(sl.rstrip())
- if harness.state:
- if not harness.capture_coverage:
- ser.close()
- break
- log_out_fp.close()
- def device_is_available(self, instance):
- device = instance.platform.name
- fixture = instance.testcase.harness_config.get("fixture")
- for d in self.suite.duts:
- if fixture and fixture not in d.fixtures:
- continue
- if d.platform != device or not (d.serial or d.serial_pty):
- continue
- d.lock.acquire()
- avail = False
- if d.available:
- d.available = 0
- d.counter += 1
- avail = True
- d.lock.release()
- if avail:
- return d
- return None
- def make_device_available(self, serial):
- for d in self.suite.duts:
- if d.serial == serial or d.serial_pty:
- d.available = 1
- @staticmethod
- def run_custom_script(script, timeout):
- with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
- try:
- stdout, _ = proc.communicate(timeout=timeout)
- logger.debug(stdout.decode())
- except subprocess.TimeoutExpired:
- proc.kill()
- proc.communicate()
- logger.error("{} timed out".format(script))
- def handle(self):
- out_state = "failed"
- runner = None
- hardware = self.device_is_available(self.instance)
- while not hardware:
- logger.debug("Waiting for device {} to become available".format(self.instance.platform.name))
- time.sleep(1)
- hardware = self.device_is_available(self.instance)
- runner = hardware.runner or self.suite.west_runner
- serial_pty = hardware.serial_pty
- ser_pty_process = None
- if serial_pty:
- master, slave = pty.openpty()
- try:
- ser_pty_process = subprocess.Popen(re.split(',| ', serial_pty), stdout=master, stdin=master, stderr=master)
- except subprocess.CalledProcessError as error:
- logger.error("Failed to run subprocess {}, error {}".format(serial_pty, error.output))
- return
- serial_device = os.ttyname(slave)
- else:
- serial_device = hardware.serial
- logger.debug("Using serial device {}".format(serial_device))
- if (self.suite.west_flash is not None) or runner:
- command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
- command_extra_args = []
- # There are three ways this option is used.
- # 1) bare: --west-flash
- # This results in options.west_flash == []
- # 2) with a value: --west-flash="--board-id=42"
- # This results in options.west_flash == "--board-id=42"
- # 3) Multiple values: --west-flash="--board-id=42,--erase"
- # This results in options.west_flash == "--board-id=42 --erase"
- if self.suite.west_flash and self.suite.west_flash != []:
- command_extra_args.extend(self.suite.west_flash.split(','))
- if runner:
- command.append("--runner")
- command.append(runner)
- board_id = hardware.probe_id or hardware.id
- product = hardware.product
- if board_id is not None:
- if runner == "pyocd":
- command_extra_args.append("--board-id")
- command_extra_args.append(board_id)
- elif runner == "nrfjprog":
- command_extra_args.append("--snr")
- command_extra_args.append(board_id)
- elif runner == "openocd" and product == "STM32 STLink":
- command_extra_args.append("--cmd-pre-init")
- command_extra_args.append("hla_serial %s" % (board_id))
- elif runner == "openocd" and product == "STLINK-V3":
- command_extra_args.append("--cmd-pre-init")
- command_extra_args.append("hla_serial %s" % (board_id))
- elif runner == "openocd" and product == "EDBG CMSIS-DAP":
- command_extra_args.append("--cmd-pre-init")
- command_extra_args.append("cmsis_dap_serial %s" % (board_id))
- elif runner == "jlink":
- command.append("--tool-opt=-SelectEmuBySN %s" % (board_id))
- if command_extra_args != []:
- command.append('--')
- command.extend(command_extra_args)
- else:
- command = [self.generator_cmd, "-C", self.build_dir, "flash"]
- pre_script = hardware.pre_script
- post_flash_script = hardware.post_flash_script
- post_script = hardware.post_script
- if pre_script:
- self.run_custom_script(pre_script, 30)
- try:
- ser = serial.Serial(
- serial_device,
- baudrate=115200,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS,
- timeout=self.timeout
- )
- except serial.SerialException as e:
- self.set_state("failed", 0)
- self.instance.reason = "Failed"
- logger.error("Serial device error: %s" % (str(e)))
- if serial_pty and ser_pty_process:
- ser_pty_process.terminate()
- outs, errs = ser_pty_process.communicate()
- logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
- self.make_device_available(serial_device)
- return
- ser.flush()
- harness_name = self.instance.testcase.harness.capitalize()
- harness_import = HarnessImporter(harness_name)
- harness = harness_import.instance
- harness.configure(self.instance)
- read_pipe, write_pipe = os.pipe()
- start_time = time.time()
- t = threading.Thread(target=self.monitor_serial, daemon=True,
- args=(ser, read_pipe, harness))
- t.start()
- d_log = "{}/device.log".format(self.instance.build_dir)
- logger.debug('Flash command: %s', command)
- try:
- stdout = stderr = None
- with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
- try:
- (stdout, stderr) = proc.communicate(timeout=30)
- logger.debug(stdout.decode())
- if proc.returncode != 0:
- self.instance.reason = "Device issue (Flash?)"
- with open(d_log, "w") as dlog_fp:
- dlog_fp.write(stderr.decode())
- os.write(write_pipe, b'x') # halt the thread
- out_state = "flash_error"
- except subprocess.TimeoutExpired:
- proc.kill()
- (stdout, stderr) = proc.communicate()
- self.instance.reason = "Device issue (Timeout)"
- with open(d_log, "w") as dlog_fp:
- dlog_fp.write(stderr.decode())
- except subprocess.CalledProcessError:
- os.write(write_pipe, b'x') # halt the thread
- if post_flash_script:
- self.run_custom_script(post_flash_script, 30)
- t.join(self.timeout)
- if t.is_alive():
- logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
- out_state = "timeout"
- if ser.isOpen():
- ser.close()
- if serial_pty:
- ser_pty_process.terminate()
- outs, errs = ser_pty_process.communicate()
- logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
- os.close(write_pipe)
- os.close(read_pipe)
- handler_time = time.time() - start_time
- if out_state in ["timeout", "flash_error"]:
- self.add_missing_testscases(harness)
- if out_state == "timeout":
- self.instance.reason = "Timeout"
- elif out_state == "flash_error":
- self.instance.reason = "Flash error"
- if harness.is_pytest:
- harness.pytest_run(self.log)
- self.instance.results = harness.tests
- # sometimes a test instance hasn't been executed successfully with an
- # empty dictionary results, in order to include it into final report,
- # so fill the results as BLOCK
- if self.instance.results == {}:
- for k in self.instance.testcase.cases:
- self.instance.results[k] = 'BLOCK'
- if harness.state:
- self.set_state(harness.state, handler_time)
- if harness.state == "failed":
- self.instance.reason = "Failed"
- else:
- self.set_state(out_state, handler_time)
- if post_script:
- self.run_custom_script(post_script, 30)
- self.make_device_available(serial_device)
- self.record(harness)
- class QEMUHandler(Handler):
- """Spawns a thread to monitor QEMU output from pipes
- We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
- We need to do this as once qemu starts, it runs forever until killed.
- Test cases emit special messages to the console as they run, we check
- for these to collect whether the test passed or failed.
- """
- def __init__(self, instance, type_str):
- """Constructor
- @param instance Test instance
- """
- super().__init__(instance, type_str)
- self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
- self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
- if "ignore_qemu_crash" in instance.testcase.tags:
- self.ignore_qemu_crash = True
- self.ignore_unexpected_eof = True
- else:
- self.ignore_qemu_crash = False
- self.ignore_unexpected_eof = False
- @staticmethod
- def _get_cpu_time(pid):
- """get process CPU time.
- The guest virtual time in QEMU icount mode isn't host time and
- it's maintained by counting guest instructions, so we use QEMU
- process exection time to mostly simulate the time of guest OS.
- """
- proc = psutil.Process(pid)
- cpu_time = proc.cpu_times()
- return cpu_time.user + cpu_time.system
- @staticmethod
- def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness,
- ignore_unexpected_eof=False):
- fifo_in = fifo_fn + ".in"
- fifo_out = fifo_fn + ".out"
- # These in/out nodes are named from QEMU's perspective, not ours
- if os.path.exists(fifo_in):
- os.unlink(fifo_in)
- os.mkfifo(fifo_in)
- if os.path.exists(fifo_out):
- os.unlink(fifo_out)
- os.mkfifo(fifo_out)
- # We don't do anything with out_fp but we need to open it for
- # writing so that QEMU doesn't block, due to the way pipes work
- out_fp = open(fifo_in, "wb")
- # Disable internal buffering, we don't
- # want read() or poll() to ever block if there is data in there
- in_fp = open(fifo_out, "rb", buffering=0)
- log_out_fp = open(logfile, "wt")
- start_time = time.time()
- timeout_time = start_time + timeout
- p = select.poll()
- p.register(in_fp, select.POLLIN)
- out_state = None
- line = ""
- timeout_extended = False
- pid = 0
- if os.path.exists(pid_fn):
- pid = int(open(pid_fn).read())
- while True:
- this_timeout = int((timeout_time - time.time()) * 1000)
- if this_timeout < 0 or not p.poll(this_timeout):
- try:
- if pid and this_timeout > 0:
- #there's possibility we polled nothing because
- #of not enough CPU time scheduled by host for
- #QEMU process during p.poll(this_timeout)
- cpu_time = QEMUHandler._get_cpu_time(pid)
- if cpu_time < timeout and not out_state:
- timeout_time = time.time() + (timeout - cpu_time)
- continue
- except ProcessLookupError:
- out_state = "failed"
- break
- if not out_state:
- out_state = "timeout"
- break
- if pid == 0 and os.path.exists(pid_fn):
- pid = int(open(pid_fn).read())
- if harness.is_pytest:
- harness.handle(None)
- out_state = harness.state
- break
- try:
- c = in_fp.read(1).decode("utf-8")
- except UnicodeDecodeError:
- # Test is writing something weird, fail
- out_state = "unexpected byte"
- break
- if c == "":
- # EOF, this shouldn't happen unless QEMU crashes
- if not ignore_unexpected_eof:
- out_state = "unexpected eof"
- break
- line = line + c
- if c != "\n":
- continue
- # line contains a full line of data output from QEMU
- log_out_fp.write(line)
- log_out_fp.flush()
- line = line.strip()
- logger.debug(f"QEMU ({pid}): {line}")
- harness.handle(line)
- if harness.state:
- # if we have registered a fail make sure the state is not
- # overridden by a false success message coming from the
- # testsuite
- if out_state not in ['failed', 'unexpected eof', 'unexpected byte']:
- out_state = harness.state
- # if we get some state, that means test is doing well, we reset
- # the timeout and wait for 2 more seconds to catch anything
- # printed late. We wait much longer if code
- # coverage is enabled since dumping this information can
- # take some time.
- if not timeout_extended or harness.capture_coverage:
- timeout_extended = True
- if harness.capture_coverage:
- timeout_time = time.time() + 30
- else:
- timeout_time = time.time() + 2
- line = ""
- if harness.is_pytest:
- harness.pytest_run(logfile)
- out_state = harness.state
- handler.record(harness)
- handler_time = time.time() - start_time
- logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
- if out_state == "timeout":
- handler.instance.reason = "Timeout"
- handler.set_state("failed", handler_time)
- elif out_state == "failed":
- handler.instance.reason = "Failed"
- handler.set_state("failed", handler_time)
- elif out_state in ['unexpected eof', 'unexpected byte']:
- handler.instance.reason = out_state
- handler.set_state("failed", handler_time)
- else:
- handler.set_state(out_state, handler_time)
- log_out_fp.close()
- out_fp.close()
- in_fp.close()
- if pid:
- try:
- if pid:
- os.kill(pid, signal.SIGTERM)
- except ProcessLookupError:
- # Oh well, as long as it's dead! User probably sent Ctrl-C
- pass
- os.unlink(fifo_in)
- os.unlink(fifo_out)
- def handle(self):
- self.results = {}
- self.run = True
- # We pass this to QEMU which looks for fifos with .in and .out
- # suffixes.
- self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
- self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid")
- if os.path.exists(self.pid_fn):
- os.unlink(self.pid_fn)
- self.log_fn = self.log
- harness_import = HarnessImporter(self.instance.testcase.harness.capitalize())
- harness = harness_import.instance
- harness.configure(self.instance)
- self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
- args=(self, self.timeout, self.build_dir,
- self.log_fn, self.fifo_fn,
- self.pid_fn, self.results, harness,
- self.ignore_unexpected_eof))
- self.instance.results = harness.tests
- self.thread.daemon = True
- logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
- self.thread.start()
- if sys.stdout.isatty():
- subprocess.call(["stty", "sane"])
- logger.debug("Running %s (%s)" % (self.name, self.type_str))
- command = [self.generator_cmd]
- command += ["-C", self.build_dir, "run"]
- is_timeout = False
- qemu_pid = None
- with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc:
- logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
- try:
- proc.wait(self.timeout)
- except subprocess.TimeoutExpired:
- # sometimes QEMU can't handle SIGTERM signal correctly
- # in that case kill -9 QEMU process directly and leave
- # twister to judge testing result by console output
- is_timeout = True
- self.terminate(proc)
- if harness.state == "passed":
- self.returncode = 0
- else:
- self.returncode = proc.returncode
- else:
- if os.path.exists(self.pid_fn):
- qemu_pid = int(open(self.pid_fn).read())
- logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
- self.returncode = proc.returncode
- # Need to wait for harness to finish processing
- # output from QEMU. Otherwise it might miss some
- # error messages.
- self.thread.join(0)
- if self.thread.is_alive():
- logger.debug("Timed out while monitoring QEMU output")
- if os.path.exists(self.pid_fn):
- qemu_pid = int(open(self.pid_fn).read())
- os.unlink(self.pid_fn)
- logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
- if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state:
- self.set_state("failed", 0)
- if is_timeout:
- self.instance.reason = "Timeout"
- else:
- self.instance.reason = "Exited with {}".format(self.returncode)
- self.add_missing_testscases(harness)
- def get_fifo(self):
- return self.fifo_fn
- class SizeCalculator:
- alloc_sections = [
- "bss",
- "noinit",
- "app_bss",
- "app_noinit",
- "ccm_bss",
- "ccm_noinit"
- ]
- rw_sections = [
- "datas",
- "initlevel",
- "exceptions",
- "initshell",
- "_static_thread_data_area",
- "k_timer_area",
- "k_mem_slab_area",
- "k_mem_pool_area",
- "sw_isr_table",
- "k_sem_area",
- "k_mutex_area",
- "app_shmem_regions",
- "_k_fifo_area",
- "_k_lifo_area",
- "k_stack_area",
- "k_msgq_area",
- "k_mbox_area",
- "k_pipe_area",
- "net_if_area",
- "net_if_dev_area",
- "net_l2_area",
- "net_l2_data",
- "k_queue_area",
- "_net_buf_pool_area",
- "app_datas",
- "kobject_data",
- "mmu_tables",
- "app_pad",
- "priv_stacks",
- "ccm_data",
- "usb_descriptor",
- "usb_data", "usb_bos_desc",
- "uart_mux",
- 'log_backends_sections',
- 'log_dynamic_sections',
- 'log_const_sections',
- "app_smem",
- 'shell_root_cmds_sections',
- 'log_const_sections',
- "font_entry_sections",
- "priv_stacks_noinit",
- "_GCOV_BSS_SECTION_NAME",
- "gcov",
- "nocache",
- "devices",
- "k_heap_area",
- ]
- # These get copied into RAM only on non-XIP
- ro_sections = [
- "rom_start",
- "text",
- "ctors",
- "init_array",
- "reset",
- "z_object_assignment_area",
- "rodata",
- "net_l2",
- "vector",
- "sw_isr_table",
- "settings_handler_static_area",
- "bt_l2cap_fixed_chan_area",
- "bt_l2cap_br_fixed_chan_area",
- "bt_gatt_service_static_area",
- "vectors",
- "net_socket_register_area",
- "net_ppp_proto",
- "shell_area",
- "tracing_backend_area",
- "ppp_protocol_handler_area",
- ]
- def __init__(self, filename, extra_sections):
- """Constructor
- @param filename Path to the output binary
- The <filename> is parsed by objdump to determine section sizes
- """
- # Make sure this is an ELF binary
- with open(filename, "rb") as f:
- magic = f.read(4)
- try:
- if magic != b'\x7fELF':
- raise TwisterRuntimeError("%s is not an ELF binary" % filename)
- except Exception as e:
- print(str(e))
- sys.exit(2)
- # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK.
- # GREP can not be used as it returns an error if the symbol is not
- # found.
- is_xip_command = "nm " + filename + \
- " | awk '/CONFIG_XIP/ { print $3 }'"
- is_xip_output = subprocess.check_output(
- is_xip_command, shell=True, stderr=subprocess.STDOUT).decode(
- "utf-8").strip()
- try:
- if is_xip_output.endswith("no symbols"):
- raise TwisterRuntimeError("%s has no symbol information" % filename)
- except Exception as e:
- print(str(e))
- sys.exit(2)
- self.is_xip = (len(is_xip_output) != 0)
- self.filename = filename
- self.sections = []
- self.rom_size = 0
- self.ram_size = 0
- self.extra_sections = extra_sections
- self._calculate_sizes()
- def get_ram_size(self):
- """Get the amount of RAM the application will use up on the device
- @return amount of RAM, in bytes
- """
- return self.ram_size
- def get_rom_size(self):
- """Get the size of the data that this application uses on device's flash
- @return amount of ROM, in bytes
- """
- return self.rom_size
- def unrecognized_sections(self):
- """Get a list of sections inside the binary that weren't recognized
- @return list of unrecognized section names
- """
- slist = []
- for v in self.sections:
- if not v["recognized"]:
- slist.append(v["name"])
- return slist
- def _calculate_sizes(self):
- """ Calculate RAM and ROM usage by section """
- objdump_command = "objdump -h " + self.filename
- objdump_output = subprocess.check_output(
- objdump_command, shell=True).decode("utf-8").splitlines()
- for line in objdump_output:
- words = line.split()
- if not words: # Skip lines that are too short
- continue
- index = words[0]
- if not index[0].isdigit(): # Skip lines that do not start
- continue # with a digit
- name = words[1] # Skip lines with section names
- if name[0] == '.': # starting with '.'
- continue
- # TODO this doesn't actually reflect the size in flash or RAM as
- # it doesn't include linker-imposed padding between sections.
- # It is close though.
- size = int(words[2], 16)
- if size == 0:
- continue
- load_addr = int(words[4], 16)
- virt_addr = int(words[3], 16)
- # Add section to memory use totals (for both non-XIP and XIP scenarios)
- # Unrecognized section names are not included in the calculations.
- recognized = True
- if name in SizeCalculator.alloc_sections:
- self.ram_size += size
- stype = "alloc"
- elif name in SizeCalculator.rw_sections:
- self.ram_size += size
- self.rom_size += size
- stype = "rw"
- elif name in SizeCalculator.ro_sections:
- self.rom_size += size
- if not self.is_xip:
- self.ram_size += size
- stype = "ro"
- else:
- stype = "unknown"
- if name not in self.extra_sections:
- recognized = False
- self.sections.append({"name": name, "load_addr": load_addr,
- "size": size, "virt_addr": virt_addr,
- "type": stype, "recognized": recognized})
- class TwisterConfigParser:
- """Class to read test case files with semantic checking
- """
- def __init__(self, filename, schema):
- """Instantiate a new TwisterConfigParser object
- @param filename Source .yaml file to read
- """
- self.data = {}
- self.schema = schema
- self.filename = filename
- self.tests = {}
- self.common = {}
- def load(self):
- self.data = scl.yaml_load_verify(self.filename, self.schema)
- if 'tests' in self.data:
- self.tests = self.data['tests']
- if 'common' in self.data:
- self.common = self.data['common']
- def _cast_value(self, value, typestr):
- if isinstance(value, str):
- v = value.strip()
- if typestr == "str":
- return v
- elif typestr == "float":
- return float(value)
- elif typestr == "int":
- return int(value)
- elif typestr == "bool":
- return value
- elif typestr.startswith("list") and isinstance(value, list):
- return value
- elif typestr.startswith("list") and isinstance(value, str):
- vs = v.split()
- if len(typestr) > 4 and typestr[4] == ":":
- return [self._cast_value(vsi, typestr[5:]) for vsi in vs]
- else:
- return vs
- elif typestr.startswith("set"):
- vs = v.split()
- if len(typestr) > 3 and typestr[3] == ":":
- return {self._cast_value(vsi, typestr[4:]) for vsi in vs}
- else:
- return set(vs)
- elif typestr.startswith("map"):
- return value
- else:
- raise ConfigurationError(
- self.filename, "unknown type '%s'" % value)
- def get_test(self, name, valid_keys):
- """Get a dictionary representing the keys/values within a test
- @param name The test in the .yaml file to retrieve data from
- @param valid_keys A dictionary representing the intended semantics
- for this test. Each key in this dictionary is a key that could
- be specified, if a key is given in the .yaml file which isn't in
- here, it will generate an error. Each value in this dictionary
- is another dictionary containing metadata:
- "default" - Default value if not given
- "type" - Data type to convert the text value to. Simple types
- supported are "str", "float", "int", "bool" which will get
- converted to respective Python data types. "set" and "list"
- may also be specified which will split the value by
- whitespace (but keep the elements as strings). finally,
- "list:<type>" and "set:<type>" may be given which will
- perform a type conversion after splitting the value up.
- "required" - If true, raise an error if not defined. If false
- and "default" isn't specified, a type conversion will be
- done on an empty string
- @return A dictionary containing the test key-value pairs with
- type conversion and default values filled in per valid_keys
- """
- d = {}
- for k, v in self.common.items():
- d[k] = v
- for k, v in self.tests[name].items():
- if k in d:
- if isinstance(d[k], str):
- # By default, we just concatenate string values of keys
- # which appear both in "common" and per-test sections,
- # but some keys are handled in adhoc way based on their
- # semantics.
- if k == "filter":
- d[k] = "(%s) and (%s)" % (d[k], v)
- else:
- d[k] += " " + v
- else:
- d[k] = v
- for k, kinfo in valid_keys.items():
- if k not in d:
- if "required" in kinfo:
- required = kinfo["required"]
- else:
- required = False
- if required:
- raise ConfigurationError(
- self.filename,
- "missing required value for '%s' in test '%s'" %
- (k, name))
- else:
- if "default" in kinfo:
- default = kinfo["default"]
- else:
- default = self._cast_value("", kinfo["type"])
- d[k] = default
- else:
- try:
- d[k] = self._cast_value(d[k], kinfo["type"])
- except ValueError:
- raise ConfigurationError(
- self.filename, "bad %s value '%s' for key '%s' in name '%s'" %
- (kinfo["type"], d[k], k, name))
- return d
- class Platform:
- """Class representing metadata for a particular platform
- Maps directly to BOARD when building"""
- platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE,
- "scripts", "schemas", "twister", "platform-schema.yaml"))
- def __init__(self):
- """Constructor.
- """
- self.name = ""
- self.twister = True
- # if no RAM size is specified by the board, take a default of 128K
- self.ram = 128
- self.ignore_tags = []
- self.only_tags = []
- self.default = False
- # if no flash size is specified by the board, take a default of 512K
- self.flash = 512
- self.supported = set()
- self.arch = ""
- self.type = "na"
- self.simulation = "na"
- self.supported_toolchains = []
- self.env = []
- self.env_satisfied = True
- self.filter_data = dict()
- def load(self, platform_file):
- scp = TwisterConfigParser(platform_file, self.platform_schema)
- scp.load()
- data = scp.data
- self.name = data['identifier']
- self.twister = data.get("twister", True)
- # if no RAM size is specified by the board, take a default of 128K
- self.ram = data.get("ram", 128)
- testing = data.get("testing", {})
- self.ignore_tags = testing.get("ignore_tags", [])
- self.only_tags = testing.get("only_tags", [])
- self.default = testing.get("default", False)
- # if no flash size is specified by the board, take a default of 512K
- self.flash = data.get("flash", 512)
- self.supported = set()
- for supp_feature in data.get("supported", []):
- for item in supp_feature.split(":"):
- self.supported.add(item)
- self.arch = data['arch']
- self.type = data.get('type', "na")
- self.simulation = data.get('simulation', "na")
- self.supported_toolchains = data.get("toolchain", [])
- self.env = data.get("env", [])
- self.env_satisfied = True
- for env in self.env:
- if not os.environ.get(env, None):
- self.env_satisfied = False
- def __repr__(self):
- return "<%s on %s>" % (self.name, self.arch)
- class DisablePyTestCollectionMixin(object):
- __test__ = False
- class TestCase(DisablePyTestCollectionMixin):
- """Class representing a test application
- """
- def __init__(self, testcase_root, workdir, name):
- """TestCase constructor.
- This gets called by TestSuite as it finds and reads test yaml files.
- Multiple TestCase instances may be generated from a single testcase.yaml,
- each one corresponds to an entry within that file.
- We need to have a unique name for every single test case. Since
- a testcase.yaml can define multiple tests, the canonical name for
- the test case is <workdir>/<name>.
- @param testcase_root os.path.abspath() of one of the --testcase-root
- @param workdir Sub-directory of testcase_root where the
- .yaml test configuration file was found
- @param name Name of this test case, corresponding to the entry name
- in the test case configuration file. For many test cases that just
- define one test, can be anything and is usually "test". This is
- really only used to distinguish between different cases when
- the testcase.yaml defines multiple tests
- """
- self.source_dir = ""
- self.yamlfile = ""
- self.cases = []
- self.name = self.get_unique(testcase_root, workdir, name)
- self.id = name
- self.type = None
- self.tags = set()
- self.extra_args = None
- self.extra_configs = None
- self.arch_allow = None
- self.arch_exclude = None
- self.skip = False
- self.platform_exclude = None
- self.platform_allow = None
- self.toolchain_exclude = None
- self.toolchain_allow = None
- self.tc_filter = None
- self.timeout = 60
- self.harness = ""
- self.harness_config = {}
- self.build_only = True
- self.build_on_all = False
- self.slow = False
- self.min_ram = -1
- self.depends_on = None
- self.min_flash = -1
- self.extra_sections = None
- self.integration_platforms = []
- @staticmethod
- def get_unique(testcase_root, workdir, name):
- canonical_testcase_root = os.path.realpath(testcase_root)
- if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents:
- # This is in ZEPHYR_BASE, so include path in name for uniqueness
- # FIXME: We should not depend on path of test for unique names.
- relative_tc_root = os.path.relpath(canonical_testcase_root,
- start=canonical_zephyr_base)
- else:
- relative_tc_root = ""
- # workdir can be "."
- unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name))
- check = name.split(".")
- if len(check) < 2:
- raise TwisterException(f"""bad test name '{name}' in {testcase_root}/{workdir}. \
- Tests should reference the category and subsystem with a dot as a separator.
- """
- )
- return unique
- @staticmethod
- def scan_file(inf_name):
- suite_regex = re.compile(
- # do not match until end-of-line, otherwise we won't allow
- # stc_regex below to catch the ones that are declared in the same
- # line--as we only search starting the end of this match
- br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
- re.MULTILINE)
- stc_regex = re.compile(
- br"^\s*" # empy space at the beginning is ok
- # catch the case where it is declared in the same sentence, e.g:
- #
- # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME));
- br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?"
- # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME)
- br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?"
- # Consume the argument that becomes the extra testcse
- br"\(\s*"
- br"(?P<stc_name>[a-zA-Z0-9_]+)"
- # _setup_teardown() variant has two extra arguments that we ignore
- br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?"
- br"\s*\)",
- # We don't check how it finishes; we don't care
- re.MULTILINE)
- suite_run_regex = re.compile(
- br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)",
- re.MULTILINE)
- achtung_regex = re.compile(
- br"(#ifdef|#endif)",
- re.MULTILINE)
- warnings = None
- with open(inf_name) as inf:
- if os.name == 'nt':
- mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ}
- else:
- mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ,
- 'offset': 0}
- with contextlib.closing(mmap.mmap(**mmap_args)) as main_c:
- suite_regex_match = suite_regex.search(main_c)
- if not suite_regex_match:
- # can't find ztest_test_suite, maybe a client, because
- # it includes ztest.h
- return None, None
- suite_run_match = suite_run_regex.search(main_c)
- if not suite_run_match:
- raise ValueError("can't find ztest_run_test_suite")
- achtung_matches = re.findall(
- achtung_regex,
- main_c[suite_regex_match.end():suite_run_match.start()])
- if achtung_matches:
- warnings = "found invalid %s in ztest_test_suite()" \
- % ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True))
- _matches = re.findall(
- stc_regex,
- main_c[suite_regex_match.end():suite_run_match.start()])
- for match in _matches:
- if not match.decode().startswith("test_"):
- warnings = "Found a test that does not start with test_"
- matches = [match.decode().replace("test_", "", 1) for match in _matches]
- return matches, warnings
- def scan_path(self, path):
- subcases = []
- for filename in glob.glob(os.path.join(path, "src", "*.c*")):
- try:
- _subcases, warnings = self.scan_file(filename)
- if warnings:
- logger.error("%s: %s" % (filename, warnings))
- raise TwisterRuntimeError("%s: %s" % (filename, warnings))
- if _subcases:
- subcases += _subcases
- except ValueError as e:
- logger.error("%s: can't find: %s" % (filename, e))
- for filename in glob.glob(os.path.join(path, "*.c")):
- try:
- _subcases, warnings = self.scan_file(filename)
- if warnings:
- logger.error("%s: %s" % (filename, warnings))
- if _subcases:
- subcases += _subcases
- except ValueError as e:
- logger.error("%s: can't find: %s" % (filename, e))
- return subcases
- def parse_subcases(self, test_path):
- results = self.scan_path(test_path)
- for sub in results:
- name = "{}.{}".format(self.id, sub)
- self.cases.append(name)
- if not results:
- self.cases.append(self.id)
- def __str__(self):
- return self.name
- class TestInstance(DisablePyTestCollectionMixin):
- """Class representing the execution of a particular TestCase on a platform
- @param test The TestCase object we want to build/execute
- @param platform Platform object that we want to build and run against
- @param base_outdir Base directory for all test results. The actual
- out directory used is <outdir>/<platform>/<test case name>
- """
- def __init__(self, testcase, platform, outdir):
- self.testcase = testcase
- self.platform = platform
- self.status = None
- self.reason = "Unknown"
- self.metrics = dict()
- self.handler = None
- self.outdir = outdir
- self.name = os.path.join(platform.name, testcase.name)
- self.build_dir = os.path.join(outdir, platform.name, testcase.name)
- self.run = False
- self.results = {}
- def __getstate__(self):
- d = self.__dict__.copy()
- return d
- def __setstate__(self, d):
- self.__dict__.update(d)
- def __lt__(self, other):
- return self.name < other.name
- @staticmethod
- def testcase_runnable(testcase, fixtures):
- can_run = False
- # console harness allows us to run the test and capture data.
- if testcase.harness in [ 'console', 'ztest', 'pytest']:
- can_run = True
- # if we have a fixture that is also being supplied on the
- # command-line, then we need to run the test, not just build it.
- fixture = testcase.harness_config.get('fixture')
- if fixture:
- can_run = (fixture in fixtures)
- elif testcase.harness:
- can_run = False
- else:
- can_run = True
- return can_run
- # Global testsuite parameters
- def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[]):
- # right now we only support building on windows. running is still work
- # in progress.
- if os.name == 'nt':
- return False
- # we asked for build-only on the command line
- if self.testcase.build_only:
- return False
- # Do not run slow tests:
- skip_slow = self.testcase.slow and not enable_slow
- if skip_slow:
- return False
- target_ready = bool(self.testcase.type == "unit" or \
- self.platform.type == "native" or \
- self.platform.simulation in ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp"] or \
- filter == 'runnable')
- if self.platform.simulation == "nsim":
- if not find_executable("nsimdrv"):
- target_ready = False
- if self.platform.simulation == "mdb-nsim":
- if not find_executable("mdb"):
- target_ready = False
- if self.platform.simulation == "renode":
- if not find_executable("renode"):
- target_ready = False
- if self.platform.simulation == "tsim":
- if not find_executable("tsim-leon3"):
- target_ready = False
- testcase_runnable = self.testcase_runnable(self.testcase, fixtures)
- return testcase_runnable and target_ready
- def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
- # Create this in a "twister/" subdirectory otherwise this
- # will pass this overlay to kconfig.py *twice* and kconfig.cmake
- # will silently give that second time precedence over any
- # --extra-args=CONFIG_*
- subdir = os.path.join(self.build_dir, "twister")
- content = ""
- if self.testcase.extra_configs:
- content = "\n".join(self.testcase.extra_configs)
- if enable_coverage:
- if platform.name in coverage_platform:
- content = content + "\nCONFIG_COVERAGE=y"
- content = content + "\nCONFIG_COVERAGE_DUMP=y"
- if enable_asan:
- if platform.type == "native":
- content = content + "\nCONFIG_ASAN=y"
- if enable_ubsan:
- if platform.type == "native":
- content = content + "\nCONFIG_UBSAN=y"
- if content:
- os.makedirs(subdir, exist_ok=True)
- file = os.path.join(subdir, "testcase_extra.conf")
- with open(file, "w") as f:
- f.write(content)
- return content
- def calculate_sizes(self):
- """Get the RAM/ROM sizes of a test case.
- This can only be run after the instance has been executed by
- MakeGenerator, otherwise there won't be any binaries to measure.
- @return A SizeCalculator object
- """
- fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf"))
- fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe")))
- fns = [x for x in fns if not x.endswith('_prebuilt.elf')]
- if len(fns) != 1:
- raise BuildError("Missing/multiple output ELF binary")
- return SizeCalculator(fns[0], self.testcase.extra_sections)
- def fill_results_by_status(self):
- """Fills results according to self.status
- The method is used to propagate the instance level status
- to the test cases inside. Useful when the whole instance is skipped
- and the info is required also at the test cases level for reporting.
- Should be used with caution, e.g. should not be used
- to fill all results with passes
- """
- status_to_verdict = {
- 'skipped': 'SKIP',
- 'error': 'BLOCK',
- 'failure': 'FAILED'
- }
- for k in self.results:
- self.results[k] = status_to_verdict[self.status]
- def __repr__(self):
- return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name)
- class CMake():
- config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
- dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
- def __init__(self, testcase, platform, source_dir, build_dir):
- self.cwd = None
- self.capture_output = True
- self.defconfig = {}
- self.cmake_cache = {}
- self.instance = None
- self.testcase = testcase
- self.platform = platform
- self.source_dir = source_dir
- self.build_dir = build_dir
- self.log = "build.log"
- self.generator = None
- self.generator_cmd = None
- def parse_generated(self):
- self.defconfig = {}
- return {}
- def run_build(self, args=[]):
- logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
- cmake_args = []
- cmake_args.extend(args)
- cmake = shutil.which('cmake')
- cmd = [cmake] + cmake_args
- kwargs = dict()
- if self.capture_output:
- kwargs['stdout'] = subprocess.PIPE
- # CMake sends the output of message() to stderr unless it's STATUS
- kwargs['stderr'] = subprocess.STDOUT
- if self.cwd:
- kwargs['cwd'] = self.cwd
- p = subprocess.Popen(cmd, **kwargs)
- out, _ = p.communicate()
- results = {}
- if p.returncode == 0:
- msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
- self.instance.status = "passed"
- results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
- if out:
- log_msg = out.decode(sys.getdefaultencoding())
- with open(os.path.join(self.build_dir, self.log), "a") as log:
- log.write(log_msg)
- else:
- return None
- else:
- # A real error occurred, raise an exception
- log_msg = ""
- if out:
- log_msg = out.decode(sys.getdefaultencoding())
- with open(os.path.join(self.build_dir, self.log), "a") as log:
- log.write(log_msg)
- if log_msg:
- res = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg)
- if res and not self.overflow_as_errors:
- logger.debug("Test skipped due to {} Overflow".format(res[0]))
- self.instance.status = "skipped"
- self.instance.reason = "{} overflow".format(res[0])
- else:
- self.instance.status = "error"
- self.instance.reason = "Build failure"
- results = {
- "returncode": p.returncode,
- "instance": self.instance,
- }
- return results
- def run_cmake(self, args=[]):
- if self.warnings_as_errors:
- ldflags = "-Wl,--fatal-warnings"
- cflags = "-Werror"
- aflags = "-Wa,--fatal-warnings"
- gen_defines_args = "--edtlib-Werror"
- else:
- ldflags = cflags = aflags = ""
- gen_defines_args = ""
- logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
- cmake_args = [
- f'-B{self.build_dir}',
- f'-S{self.source_dir}',
- f'-DEXTRA_CFLAGS="{cflags}"',
- f'-DEXTRA_AFLAGS="{aflags}',
- f'-DEXTRA_LDFLAGS="{ldflags}"',
- f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
- f'-G{self.generator}'
- ]
- if self.cmake_only:
- cmake_args.append("-DCMAKE_EXPORT_COMPILE_COMMANDS=1")
- args = ["-D{}".format(a.replace('"', '')) for a in args]
- cmake_args.extend(args)
- cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
- cmake_args.extend(cmake_opts)
- logger.debug("Calling cmake with arguments: {}".format(cmake_args))
- cmake = shutil.which('cmake')
- cmd = [cmake] + cmake_args
- kwargs = dict()
- if self.capture_output:
- kwargs['stdout'] = subprocess.PIPE
- # CMake sends the output of message() to stderr unless it's STATUS
- kwargs['stderr'] = subprocess.STDOUT
- if self.cwd:
- kwargs['cwd'] = self.cwd
- p = subprocess.Popen(cmd, **kwargs)
- out, _ = p.communicate()
- if p.returncode == 0:
- filter_results = self.parse_generated()
- msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
- logger.debug(msg)
- results = {'msg': msg, 'filter': filter_results}
- else:
- self.instance.status = "error"
- self.instance.reason = "Cmake build failure"
- self.instance.fill_results_by_status()
- logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
- results = {"returncode": p.returncode}
- if out:
- with open(os.path.join(self.build_dir, self.log), "a") as log:
- log_msg = out.decode(sys.getdefaultencoding())
- log.write(log_msg)
- return results
- @staticmethod
- def run_cmake_script(args=[]):
- logger.debug("Running cmake script %s" % (args[0]))
- cmake_args = ["-D{}".format(a.replace('"', '')) for a in args[1:]]
- cmake_args.extend(['-P', args[0]])
- logger.debug("Calling cmake with arguments: {}".format(cmake_args))
- cmake = shutil.which('cmake')
- if not cmake:
- msg = "Unable to find `cmake` in path"
- logger.error(msg)
- raise Exception(msg)
- cmd = [cmake] + cmake_args
- kwargs = dict()
- kwargs['stdout'] = subprocess.PIPE
- # CMake sends the output of message() to stderr unless it's STATUS
- kwargs['stderr'] = subprocess.STDOUT
- p = subprocess.Popen(cmd, **kwargs)
- out, _ = p.communicate()
- # It might happen that the environment adds ANSI escape codes like \x1b[0m,
- # for instance if twister is executed from inside a makefile. In such a
- # scenario it is then necessary to remove them, as otherwise the JSON decoding
- # will fail.
- ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
- out = ansi_escape.sub('', out.decode())
- if p.returncode == 0:
- msg = "Finished running %s" % (args[0])
- logger.debug(msg)
- results = {"returncode": p.returncode, "msg": msg, "stdout": out}
- else:
- logger.error("Cmake script failure: %s" % (args[0]))
- results = {"returncode": p.returncode, "returnmsg": out}
- return results
- class FilterBuilder(CMake):
- def __init__(self, testcase, platform, source_dir, build_dir):
- super().__init__(testcase, platform, source_dir, build_dir)
- self.log = "config-twister.log"
- def parse_generated(self):
- if self.platform.name == "unit_testing":
- return {}
- cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
- defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
- with open(defconfig_path, "r") as fp:
- defconfig = {}
- for line in fp.readlines():
- m = self.config_re.match(line)
- if not m:
- if line.strip() and not line.startswith("#"):
- sys.stderr.write("Unrecognized line %s\n" % line)
- continue
- defconfig[m.group(1)] = m.group(2).strip()
- self.defconfig = defconfig
- cmake_conf = {}
- try:
- cache = CMakeCache.from_file(cmake_cache_path)
- except FileNotFoundError:
- cache = {}
- for k in iter(cache):
- cmake_conf[k.name] = k.value
- self.cmake_cache = cmake_conf
- filter_data = {
- "ARCH": self.platform.arch,
- "PLATFORM": self.platform.name
- }
- filter_data.update(os.environ)
- filter_data.update(self.defconfig)
- filter_data.update(self.cmake_cache)
- edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
- if self.testcase and self.testcase.tc_filter:
- try:
- if os.path.exists(edt_pickle):
- with open(edt_pickle, 'rb') as f:
- edt = pickle.load(f)
- else:
- edt = None
- res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt)
- except (ValueError, SyntaxError) as se:
- sys.stderr.write(
- "Failed processing %s\n" % self.testcase.yamlfile)
- raise se
- if not res:
- return {os.path.join(self.platform.name, self.testcase.name): True}
- else:
- return {os.path.join(self.platform.name, self.testcase.name): False}
- else:
- self.platform.filter_data = filter_data
- return filter_data
- class ProjectBuilder(FilterBuilder):
- def __init__(self, suite, instance, **kwargs):
- super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir)
- self.log = "build.log"
- self.instance = instance
- self.suite = suite
- self.filtered_tests = 0
- self.lsan = kwargs.get('lsan', False)
- self.asan = kwargs.get('asan', False)
- self.ubsan = kwargs.get('ubsan', False)
- self.valgrind = kwargs.get('valgrind', False)
- self.extra_args = kwargs.get('extra_args', [])
- self.device_testing = kwargs.get('device_testing', False)
- self.cmake_only = kwargs.get('cmake_only', False)
- self.cleanup = kwargs.get('cleanup', False)
- self.coverage = kwargs.get('coverage', False)
- self.inline_logs = kwargs.get('inline_logs', False)
- self.generator = kwargs.get('generator', None)
- self.generator_cmd = kwargs.get('generator_cmd', None)
- self.verbose = kwargs.get('verbose', None)
- self.warnings_as_errors = kwargs.get('warnings_as_errors', True)
- self.overflow_as_errors = kwargs.get('overflow_as_errors', False)
- @staticmethod
- def log_info(filename, inline_logs):
- filename = os.path.abspath(os.path.realpath(filename))
- if inline_logs:
- logger.info("{:-^100}".format(filename))
- try:
- with open(filename) as fp:
- data = fp.read()
- except Exception as e:
- data = "Unable to read log data (%s)\n" % (str(e))
- logger.error(data)
- logger.info("{:-^100}".format(filename))
- else:
- logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
- def log_info_file(self, inline_logs):
- build_dir = self.instance.build_dir
- h_log = "{}/handler.log".format(build_dir)
- b_log = "{}/build.log".format(build_dir)
- v_log = "{}/valgrind.log".format(build_dir)
- d_log = "{}/device.log".format(build_dir)
- if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
- self.log_info("{}".format(v_log), inline_logs)
- elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
- self.log_info("{}".format(h_log), inline_logs)
- elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
- self.log_info("{}".format(d_log), inline_logs)
- else:
- self.log_info("{}".format(b_log), inline_logs)
- def setup_handler(self):
- instance = self.instance
- args = []
- # FIXME: Needs simplification
- if instance.platform.simulation == "qemu":
- instance.handler = QEMUHandler(instance, "qemu")
- args.append("QEMU_PIPE=%s" % instance.handler.get_fifo())
- instance.handler.call_make_run = True
- elif instance.testcase.type == "unit":
- instance.handler = BinaryHandler(instance, "unit")
- instance.handler.binary = os.path.join(instance.build_dir, "testbinary")
- if self.coverage:
- args.append("COVERAGE=1")
- elif instance.platform.type == "native":
- handler = BinaryHandler(instance, "native")
- handler.asan = self.asan
- handler.valgrind = self.valgrind
- handler.lsan = self.lsan
- handler.ubsan = self.ubsan
- handler.coverage = self.coverage
- handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
- instance.handler = handler
- elif instance.platform.simulation == "renode":
- if find_executable("renode"):
- instance.handler = BinaryHandler(instance, "renode")
- instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid")
- instance.handler.call_make_run = True
- elif instance.platform.simulation == "tsim":
- instance.handler = BinaryHandler(instance, "tsim")
- instance.handler.call_make_run = True
- elif self.device_testing:
- instance.handler = DeviceHandler(instance, "device")
- instance.handler.coverage = self.coverage
- elif instance.platform.simulation == "nsim":
- if find_executable("nsimdrv"):
- instance.handler = BinaryHandler(instance, "nsim")
- instance.handler.call_make_run = True
- elif instance.platform.simulation == "mdb-nsim":
- if find_executable("mdb"):
- instance.handler = BinaryHandler(instance, "nsim")
- instance.handler.call_make_run = True
- elif instance.platform.simulation == "armfvp":
- instance.handler = BinaryHandler(instance, "armfvp")
- instance.handler.call_make_run = True
- if instance.handler:
- instance.handler.args = args
- instance.handler.generator_cmd = self.generator_cmd
- instance.handler.generator = self.generator
- def process(self, pipeline, done, message, lock, results):
- op = message.get('op')
- if not self.instance.handler:
- self.setup_handler()
- # The build process, call cmake and build with configured generator
- if op == "cmake":
- res = self.cmake()
- if self.instance.status in ["failed", "error"]:
- pipeline.put({"op": "report", "test": self.instance})
- elif self.cmake_only:
- if self.instance.status is None:
- self.instance.status = "passed"
- pipeline.put({"op": "report", "test": self.instance})
- else:
- if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
- logger.debug("filtering %s" % self.instance.name)
- self.instance.status = "skipped"
- self.instance.reason = "filter"
- results.skipped_runtime += 1
- for case in self.instance.testcase.cases:
- self.instance.results.update({case: 'SKIP'})
- pipeline.put({"op": "report", "test": self.instance})
- else:
- pipeline.put({"op": "build", "test": self.instance})
- elif op == "build":
- logger.debug("build test: %s" % self.instance.name)
- res = self.build()
- if not res:
- self.instance.status = "error"
- self.instance.reason = "Build Failure"
- pipeline.put({"op": "report", "test": self.instance})
- else:
- # Count skipped cases during build, for example
- # due to ram/rom overflow.
- inst = res.get("instance", None)
- if inst and inst.status == "skipped":
- results.skipped_runtime += 1
- if res.get('returncode', 1) > 0:
- pipeline.put({"op": "report", "test": self.instance})
- else:
- if self.instance.run and self.instance.handler:
- pipeline.put({"op": "run", "test": self.instance})
- else:
- pipeline.put({"op": "report", "test": self.instance})
- # Run the generated binary using one of the supported handlers
- elif op == "run":
- logger.debug("run test: %s" % self.instance.name)
- self.run()
- self.instance.status, _ = self.instance.handler.get_state()
- logger.debug(f"run status: {self.instance.name} {self.instance.status}")
- # to make it work with pickle
- self.instance.handler.thread = None
- self.instance.handler.suite = None
- pipeline.put({
- "op": "report",
- "test": self.instance,
- "status": self.instance.status,
- "reason": self.instance.reason
- }
- )
- # Report results and output progress to screen
- elif op == "report":
- with lock:
- done.put(self.instance)
- self.report_out(results)
- if self.cleanup and not self.coverage and self.instance.status == "passed":
- pipeline.put({
- "op": "cleanup",
- "test": self.instance
- })
- elif op == "cleanup":
- if self.device_testing:
- self.cleanup_device_testing_artifacts()
- else:
- self.cleanup_artifacts()
- def cleanup_artifacts(self, additional_keep=[]):
- logger.debug("Cleaning up {}".format(self.instance.build_dir))
- allow = [
- 'zephyr/.config',
- 'handler.log',
- 'build.log',
- 'device.log',
- 'recording.csv',
- ]
- allow += additional_keep
- allow = [os.path.join(self.instance.build_dir, file) for file in allow]
- for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
- for name in filenames:
- path = os.path.join(dirpath, name)
- if path not in allow:
- os.remove(path)
- # Remove empty directories and symbolic links to directories
- for dir in dirnames:
- path = os.path.join(dirpath, dir)
- if os.path.islink(path):
- os.remove(path)
- elif not os.listdir(path):
- os.rmdir(path)
- def cleanup_device_testing_artifacts(self):
- logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
- sanitizelist = [
- 'CMakeCache.txt',
- 'zephyr/runners.yaml',
- ]
- keep = [
- 'zephyr/zephyr.hex',
- 'zephyr/zephyr.bin',
- 'zephyr/zephyr.elf',
- ]
- keep += sanitizelist
- self.cleanup_artifacts(keep)
- # sanitize paths so files are relocatable
- for file in sanitizelist:
- file = os.path.join(self.instance.build_dir, file)
- with open(file, "rt") as fin:
- data = fin.read()
- data = data.replace(canonical_zephyr_base+"/", "")
- with open(file, "wt") as fin:
- fin.write(data)
- def report_out(self, results):
- total_to_do = results.total - results.skipped_configs
- total_tests_width = len(str(total_to_do))
- results.done += 1
- instance = self.instance
- if instance.status in ["error", "failed", "timeout", "flash_error"]:
- if instance.status == "error":
- results.error += 1
- results.failed += 1
- if self.verbose:
- status = Fore.RED + "FAILED " + Fore.RESET + instance.reason
- else:
- print("")
- logger.error(
- "{:<25} {:<50} {}FAILED{}: {}".format(
- instance.platform.name,
- instance.testcase.name,
- Fore.RED,
- Fore.RESET,
- instance.reason))
- if not self.verbose:
- self.log_info_file(self.inline_logs)
- elif instance.status == "skipped":
- status = Fore.YELLOW + "SKIPPED" + Fore.RESET
- elif instance.status == "passed":
- status = Fore.GREEN + "PASSED" + Fore.RESET
- else:
- logger.debug(f"Unknown status = {instance.status}")
- status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
- if self.verbose:
- if self.cmake_only:
- more_info = "cmake"
- elif instance.status == "skipped":
- more_info = instance.reason
- else:
- if instance.handler and instance.run:
- more_info = instance.handler.type_str
- htime = instance.handler.duration
- if htime:
- more_info += " {:.3f}s".format(htime)
- else:
- more_info = "build"
- logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
- results.done, total_tests_width, total_to_do, instance.platform.name,
- instance.testcase.name, status, more_info))
- if instance.status in ["error", "failed", "timeout"]:
- self.log_info_file(self.inline_logs)
- else:
- completed_perc = 0
- if total_to_do > 0:
- completed_perc = int((float(results.done) / total_to_do) * 100)
- skipped = results.skipped_configs + results.skipped_runtime
- sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % (
- Fore.GREEN,
- results.done,
- total_to_do,
- Fore.RESET,
- completed_perc,
- Fore.YELLOW if skipped > 0 else Fore.RESET,
- skipped,
- Fore.RESET,
- Fore.RED if results.failed > 0 else Fore.RESET,
- results.failed,
- Fore.RESET
- )
- )
- sys.stdout.flush()
- def cmake(self):
- instance = self.instance
- args = self.testcase.extra_args[:]
- args += self.extra_args
- if instance.handler:
- args += instance.handler.args
- # merge overlay files into one variable
- def extract_overlays(args):
- re_overlay = re.compile('OVERLAY_CONFIG=(.*)')
- other_args = []
- overlays = []
- for arg in args:
- match = re_overlay.search(arg)
- if match:
- overlays.append(match.group(1).strip('\'"'))
- else:
- other_args.append(arg)
- args[:] = other_args
- return overlays
- overlays = extract_overlays(args)
- if os.path.exists(os.path.join(instance.build_dir,
- "twister", "testcase_extra.conf")):
- overlays.append(os.path.join(instance.build_dir,
- "twister", "testcase_extra.conf"))
- if overlays:
- args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
- res = self.run_cmake(args)
- return res
- def build(self):
- res = self.run_build(['--build', self.build_dir])
- return res
- def run(self):
- instance = self.instance
- if instance.handler:
- if instance.handler.type_str == "device":
- instance.handler.suite = self.suite
- instance.handler.handle()
- sys.stdout.flush()
- class TestSuite(DisablePyTestCollectionMixin):
- config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
- dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
- tc_schema = scl.yaml_load(
- os.path.join(ZEPHYR_BASE,
- "scripts", "schemas", "twister", "testcase-schema.yaml"))
- quarantine_schema = scl.yaml_load(
- os.path.join(ZEPHYR_BASE,
- "scripts", "schemas", "twister", "quarantine-schema.yaml"))
- testcase_valid_keys = {"tags": {"type": "set", "required": False},
- "type": {"type": "str", "default": "integration"},
- "extra_args": {"type": "list"},
- "extra_configs": {"type": "list"},
- "build_only": {"type": "bool", "default": False},
- "build_on_all": {"type": "bool", "default": False},
- "skip": {"type": "bool", "default": False},
- "slow": {"type": "bool", "default": False},
- "timeout": {"type": "int", "default": 60},
- "min_ram": {"type": "int", "default": 8},
- "depends_on": {"type": "set"},
- "min_flash": {"type": "int", "default": 32},
- "arch_allow": {"type": "set"},
- "arch_exclude": {"type": "set"},
- "extra_sections": {"type": "list", "default": []},
- "integration_platforms": {"type": "list", "default": []},
- "platform_exclude": {"type": "set"},
- "platform_allow": {"type": "set"},
- "toolchain_exclude": {"type": "set"},
- "toolchain_allow": {"type": "set"},
- "filter": {"type": "str"},
- "harness": {"type": "str"},
- "harness_config": {"type": "map", "default": {}}
- }
- RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "release",
- "twister_last_release.csv")
- SAMPLE_FILENAME = 'sample.yaml'
- TESTCASE_FILENAME = 'testcase.yaml'
- def __init__(self, board_root_list=[], testcase_roots=[], outdir=None):
- self.roots = testcase_roots
- if not isinstance(board_root_list, list):
- self.board_roots = [board_root_list]
- else:
- self.board_roots = board_root_list
- # Testsuite Options
- self.coverage_platform = []
- self.build_only = False
- self.cmake_only = False
- self.cleanup = False
- self.enable_slow = False
- self.device_testing = False
- self.fixtures = []
- self.enable_coverage = False
- self.enable_ubsan = False
- self.enable_lsan = False
- self.enable_asan = False
- self.enable_valgrind = False
- self.extra_args = []
- self.inline_logs = False
- self.enable_sizes_report = False
- self.west_flash = None
- self.west_runner = None
- self.generator = None
- self.generator_cmd = None
- self.warnings_as_errors = True
- self.overflow_as_errors = False
- self.quarantine_verify = False
- # Keep track of which test cases we've filtered out and why
- self.testcases = {}
- self.quarantine = {}
- self.platforms = []
- self.selected_platforms = []
- self.filtered_platforms = []
- self.default_platforms = []
- self.outdir = os.path.abspath(outdir)
- self.discards = {}
- self.load_errors = 0
- self.instances = dict()
- self.total_platforms = 0
- self.start_time = 0
- self.duration = 0
- self.warnings = 0
- # hardcoded for now
- self.duts = []
- # run integration tests only
- self.integration = False
- self.pipeline = None
- self.version = "NA"
- def check_zephyr_version(self):
- try:
- subproc = subprocess.run(["git", "describe", "--abbrev=12"],
- stdout=subprocess.PIPE,
- universal_newlines=True,
- cwd=ZEPHYR_BASE)
- if subproc.returncode == 0:
- self.version = subproc.stdout.strip()
- logger.info(f"Zephyr version: {self.version}")
- except OSError:
- logger.info("Cannot read zephyr version.")
- def get_platform_instances(self, platform):
- filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + "/")}
- return filtered_dict
- def config(self):
- logger.info("coverage platform: {}".format(self.coverage_platform))
- # Debug Functions
- @staticmethod
- def info(what):
- sys.stdout.write(what + "\n")
- sys.stdout.flush()
- def update_counting(self, results=None, initial=False):
- results.skipped_configs = 0
- results.skipped_cases = 0
- for instance in self.instances.values():
- if initial:
- results.cases += len(instance.testcase.cases)
- if instance.status == 'skipped':
- results.skipped_configs += 1
- results.skipped_cases += len(instance.testcase.cases)
- elif instance.status == "passed":
- results.passed += 1
- for res in instance.results.values():
- if res == 'SKIP':
- results.skipped_cases += 1
- def compare_metrics(self, filename):
- # name, datatype, lower results better
- interesting_metrics = [("ram_size", int, True),
- ("rom_size", int, True)]
- if not os.path.exists(filename):
- logger.error("Cannot compare metrics, %s not found" % filename)
- return []
- results = []
- saved_metrics = {}
- with open(filename) as fp:
- cr = csv.DictReader(fp)
- for row in cr:
- d = {}
- for m, _, _ in interesting_metrics:
- d[m] = row[m]
- saved_metrics[(row["test"], row["platform"])] = d
- for instance in self.instances.values():
- mkey = (instance.testcase.name, instance.platform.name)
- if mkey not in saved_metrics:
- continue
- sm = saved_metrics[mkey]
- for metric, mtype, lower_better in interesting_metrics:
- if metric not in instance.metrics:
- continue
- if sm[metric] == "":
- continue
- delta = instance.metrics.get(metric, 0) - mtype(sm[metric])
- if delta == 0:
- continue
- results.append((instance, metric, instance.metrics.get(metric, 0), delta,
- lower_better))
- return results
- def footprint_reports(self, report, show_footprint, all_deltas,
- footprint_threshold, last_metrics):
- if not report:
- return
- logger.debug("running footprint_reports")
- deltas = self.compare_metrics(report)
- warnings = 0
- if deltas and show_footprint:
- for i, metric, value, delta, lower_better in deltas:
- if not all_deltas and ((delta < 0 and lower_better) or
- (delta > 0 and not lower_better)):
- continue
- percentage = 0
- if value > delta:
- percentage = (float(delta) / float(value - delta))
- if not all_deltas and (percentage < (footprint_threshold / 100.0)):
- continue
- logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
- i.platform.name, i.testcase.name, Fore.YELLOW,
- "INFO" if all_deltas else "WARNING", Fore.RESET,
- metric, delta, value, percentage))
- warnings += 1
- if warnings:
- logger.warning("Deltas based on metrics from last %s" %
- ("release" if not last_metrics else "run"))
- def summary(self, results, unrecognized_sections):
- failed = 0
- run = 0
- for instance in self.instances.values():
- if instance.status == "failed":
- failed += 1
- elif instance.metrics.get("unrecognized") and not unrecognized_sections:
- logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" %
- (Fore.RED, Fore.RESET, instance.name,
- str(instance.metrics.get("unrecognized", []))))
- failed += 1
- if instance.metrics.get('handler_time', None):
- run += 1
- if results.total and results.total != results.skipped_configs:
- pass_rate = (float(results.passed) / float(results.total - results.skipped_configs))
- else:
- pass_rate = 0
- logger.info(
- "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
- Fore.RED if failed else Fore.GREEN,
- results.passed,
- results.total - results.skipped_configs,
- Fore.RESET,
- pass_rate,
- Fore.RED if results.failed else Fore.RESET,
- results.failed,
- Fore.RESET,
- results.skipped_configs,
- Fore.YELLOW if self.warnings else Fore.RESET,
- self.warnings,
- Fore.RESET,
- self.duration))
- self.total_platforms = len(self.platforms)
- # if we are only building, do not report about tests being executed.
- if self.platforms and not self.build_only:
- logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format(
- results.cases - results.skipped_cases,
- results.skipped_cases,
- len(self.filtered_platforms),
- self.total_platforms,
- (100 * len(self.filtered_platforms) / len(self.platforms))
- ))
- logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \
- {Fore.RED}{results.total - run - results.skipped_configs}{Fore.RESET} test configurations were only built.")
- def save_reports(self, name, suffix, report_dir, no_update, release, only_failed, platform_reports, json_report):
- if not self.instances:
- return
- logger.info("Saving reports...")
- if name:
- report_name = name
- else:
- report_name = "twister"
- if report_dir:
- os.makedirs(report_dir, exist_ok=True)
- filename = os.path.join(report_dir, report_name)
- outdir = report_dir
- else:
- filename = os.path.join(self.outdir, report_name)
- outdir = self.outdir
- if suffix:
- filename = "{}_{}".format(filename, suffix)
- if not no_update:
- self.xunit_report(filename + ".xml", full_report=False,
- append=only_failed, version=self.version)
- self.xunit_report(filename + "_report.xml", full_report=True,
- append=only_failed, version=self.version)
- self.csv_report(filename + ".csv")
- if json_report:
- self.json_report(filename + ".json", append=only_failed, version=self.version)
- if platform_reports:
- self.target_report(outdir, suffix, append=only_failed)
- if self.discards:
- self.discard_report(filename + "_discard.csv")
- if release:
- self.csv_report(self.RELEASE_DATA)
- def add_configurations(self):
- for board_root in self.board_roots:
- board_root = os.path.abspath(board_root)
- logger.debug("Reading platform configuration files under %s..." %
- board_root)
- for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")):
- try:
- platform = Platform()
- platform.load(file)
- if platform.name in [p.name for p in self.platforms]:
- logger.error(f"Duplicate platform {platform.name} in {file}")
- raise Exception(f"Duplicate platform identifier {platform.name} found")
- if platform.twister:
- self.platforms.append(platform)
- if platform.default:
- self.default_platforms.append(platform.name)
- except RuntimeError as e:
- logger.error("E: %s: can't load: %s" % (file, e))
- self.load_errors += 1
- def get_all_tests(self):
- tests = []
- for _, tc in self.testcases.items():
- for case in tc.cases:
- tests.append(case)
- return tests
- @staticmethod
- def get_toolchain():
- toolchain_script = Path(ZEPHYR_BASE) / Path('cmake/verify-toolchain.cmake')
- result = CMake.run_cmake_script([toolchain_script, "FORMAT=json"])
- try:
- if result['returncode']:
- raise TwisterRuntimeError(f"E: {result['returnmsg']}")
- except Exception as e:
- print(str(e))
- sys.exit(2)
- toolchain = json.loads(result['stdout'])['ZEPHYR_TOOLCHAIN_VARIANT']
- logger.info(f"Using '{toolchain}' toolchain.")
- return toolchain
- def add_testcases(self, testcase_filter=[]):
- for root in self.roots:
- root = os.path.abspath(root)
- logger.debug("Reading test case configuration files under %s..." % root)
- for dirpath, _, filenames in os.walk(root, topdown=True):
- if self.SAMPLE_FILENAME in filenames:
- filename = self.SAMPLE_FILENAME
- elif self.TESTCASE_FILENAME in filenames:
- filename = self.TESTCASE_FILENAME
- else:
- continue
- logger.debug("Found possible test case in " + dirpath)
- tc_path = os.path.join(dirpath, filename)
- try:
- parsed_data = TwisterConfigParser(tc_path, self.tc_schema)
- parsed_data.load()
- tc_path = os.path.dirname(tc_path)
- workdir = os.path.relpath(tc_path, root)
- for name in parsed_data.tests.keys():
- tc = TestCase(root, workdir, name)
- tc_dict = parsed_data.get_test(name, self.testcase_valid_keys)
- tc.source_dir = tc_path
- tc.yamlfile = tc_path
- tc.type = tc_dict["type"]
- tc.tags = tc_dict["tags"]
- tc.extra_args = tc_dict["extra_args"]
- tc.extra_configs = tc_dict["extra_configs"]
- tc.arch_allow = tc_dict["arch_allow"]
- tc.arch_exclude = tc_dict["arch_exclude"]
- tc.skip = tc_dict["skip"]
- tc.platform_exclude = tc_dict["platform_exclude"]
- tc.platform_allow = tc_dict["platform_allow"]
- tc.toolchain_exclude = tc_dict["toolchain_exclude"]
- tc.toolchain_allow = tc_dict["toolchain_allow"]
- tc.tc_filter = tc_dict["filter"]
- tc.timeout = tc_dict["timeout"]
- tc.harness = tc_dict["harness"]
- tc.harness_config = tc_dict["harness_config"]
- if tc.harness == 'console' and not tc.harness_config:
- raise Exception('Harness config error: console harness defined without a configuration.')
- tc.build_only = tc_dict["build_only"]
- tc.build_on_all = tc_dict["build_on_all"]
- tc.slow = tc_dict["slow"]
- tc.min_ram = tc_dict["min_ram"]
- tc.depends_on = tc_dict["depends_on"]
- tc.min_flash = tc_dict["min_flash"]
- tc.extra_sections = tc_dict["extra_sections"]
- tc.integration_platforms = tc_dict["integration_platforms"]
- tc.parse_subcases(tc_path)
- if testcase_filter:
- if tc.name and tc.name in testcase_filter:
- self.testcases[tc.name] = tc
- else:
- self.testcases[tc.name] = tc
- except Exception as e:
- logger.error("%s: can't load (skipping): %s" % (tc_path, e))
- self.load_errors += 1
- return len(self.testcases)
- def get_platform(self, name):
- selected_platform = None
- for platform in self.platforms:
- if platform.name == name:
- selected_platform = platform
- break
- return selected_platform
- def load_quarantine(self, file):
- """
- Loads quarantine list from the given yaml file. Creates a dictionary
- of all tests configurations (platform + scenario: comment) that shall be
- skipped due to quarantine
- """
- # Load yaml into quarantine_yaml
- quarantine_yaml = scl.yaml_load_verify(file, self.quarantine_schema)
- # Create quarantine_list with a product of the listed
- # platforms and scenarios for each entry in quarantine yaml
- quarantine_list = []
- for quar_dict in quarantine_yaml:
- if quar_dict['platforms'][0] == "all":
- plat = [p.name for p in self.platforms]
- else:
- plat = quar_dict['platforms']
- comment = quar_dict.get('comment', "NA")
- quarantine_list.append([{".".join([p, s]): comment}
- for p in plat for s in quar_dict['scenarios']])
- # Flatten the quarantine_list
- quarantine_list = [it for sublist in quarantine_list for it in sublist]
- # Change quarantine_list into a dictionary
- for d in quarantine_list:
- self.quarantine.update(d)
- def load_from_file(self, file, filter_status=[], filter_platform=[]):
- try:
- with open(file, "r") as fp:
- cr = csv.DictReader(fp)
- instance_list = []
- for row in cr:
- if row["status"] in filter_status:
- continue
- test = row["test"]
- platform = self.get_platform(row["platform"])
- if filter_platform and platform.name not in filter_platform:
- continue
- instance = TestInstance(self.testcases[test], platform, self.outdir)
- if self.device_testing:
- tfilter = 'runnable'
- else:
- tfilter = 'buildable'
- instance.run = instance.check_runnable(
- self.enable_slow,
- tfilter,
- self.fixtures
- )
- instance.create_overlay(platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform)
- instance_list.append(instance)
- self.add_instances(instance_list)
- except KeyError as e:
- logger.error("Key error while parsing tests file.({})".format(str(e)))
- sys.exit(2)
- except FileNotFoundError as e:
- logger.error("Couldn't find input file with list of tests. ({})".format(e))
- sys.exit(2)
- def apply_filters(self, **kwargs):
- toolchain = self.get_toolchain()
- discards = {}
- platform_filter = kwargs.get('platform')
- exclude_platform = kwargs.get('exclude_platform', [])
- testcase_filter = kwargs.get('run_individual_tests', [])
- arch_filter = kwargs.get('arch')
- tag_filter = kwargs.get('tag')
- exclude_tag = kwargs.get('exclude_tag')
- all_filter = kwargs.get('all')
- runnable = kwargs.get('runnable')
- force_toolchain = kwargs.get('force_toolchain')
- force_platform = kwargs.get('force_platform')
- emu_filter = kwargs.get('emulation_only')
- logger.debug("platform filter: " + str(platform_filter))
- logger.debug(" arch_filter: " + str(arch_filter))
- logger.debug(" tag_filter: " + str(tag_filter))
- logger.debug(" exclude_tag: " + str(exclude_tag))
- default_platforms = False
- emulation_platforms = False
- if all_filter:
- logger.info("Selecting all possible platforms per test case")
- # When --all used, any --platform arguments ignored
- platform_filter = []
- elif not platform_filter and not emu_filter:
- logger.info("Selecting default platforms per test case")
- default_platforms = True
- elif emu_filter:
- logger.info("Selecting emulation platforms per test case")
- emulation_platforms = True
- if platform_filter:
- platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
- elif emu_filter:
- platforms = list(filter(lambda p: p.simulation != 'na', self.platforms))
- elif arch_filter:
- platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
- elif default_platforms:
- platforms = list(filter(lambda p: p.default, self.platforms))
- else:
- platforms = self.platforms
- logger.info("Building initial testcase list...")
- for tc_name, tc in self.testcases.items():
- if tc.build_on_all and not platform_filter:
- platform_scope = self.platforms
- elif tc.integration_platforms and self.integration:
- platform_scope = list(filter(lambda item: item.name in tc.integration_platforms, \
- self.platforms))
- else:
- platform_scope = platforms
- integration = self.integration and tc.integration_platforms
- # If there isn't any overlap between the platform_allow list and the platform_scope
- # we set the scope to the platform_allow list
- if tc.platform_allow and not platform_filter and not integration:
- a = set(platform_scope)
- b = set(filter(lambda item: item.name in tc.platform_allow, self.platforms))
- c = a.intersection(b)
- if not c:
- platform_scope = list(filter(lambda item: item.name in tc.platform_allow, \
- self.platforms))
- # list of instances per testcase, aka configurations.
- instance_list = []
- for plat in platform_scope:
- instance = TestInstance(tc, plat, self.outdir)
- if runnable:
- tfilter = 'runnable'
- else:
- tfilter = 'buildable'
- instance.run = instance.check_runnable(
- self.enable_slow,
- tfilter,
- self.fixtures
- )
- for t in tc.cases:
- instance.results[t] = None
- if runnable and self.duts:
- for h in self.duts:
- if h.platform == plat.name:
- if tc.harness_config.get('fixture') in h.fixtures:
- instance.run = True
- if not force_platform and plat.name in exclude_platform:
- discards[instance] = discards.get(instance, "Platform is excluded on command line.")
- if (plat.arch == "unit") != (tc.type == "unit"):
- # Discard silently
- continue
- if runnable and not instance.run:
- discards[instance] = discards.get(instance, "Not runnable on device")
- if self.integration and tc.integration_platforms and plat.name not in tc.integration_platforms:
- discards[instance] = discards.get(instance, "Not part of integration platforms")
- if tc.skip:
- discards[instance] = discards.get(instance, "Skip filter")
- if tag_filter and not tc.tags.intersection(tag_filter):
- discards[instance] = discards.get(instance, "Command line testcase tag filter")
- if exclude_tag and tc.tags.intersection(exclude_tag):
- discards[instance] = discards.get(instance, "Command line testcase exclude filter")
- if testcase_filter and tc_name not in testcase_filter:
- discards[instance] = discards.get(instance, "Testcase name filter")
- if arch_filter and plat.arch not in arch_filter:
- discards[instance] = discards.get(instance, "Command line testcase arch filter")
- if not force_platform:
- if tc.arch_allow and plat.arch not in tc.arch_allow:
- discards[instance] = discards.get(instance, "Not in test case arch allow list")
- if tc.arch_exclude and plat.arch in tc.arch_exclude:
- discards[instance] = discards.get(instance, "In test case arch exclude")
- if tc.platform_exclude and plat.name in tc.platform_exclude:
- discards[instance] = discards.get(instance, "In test case platform exclude")
- if tc.toolchain_exclude and toolchain in tc.toolchain_exclude:
- discards[instance] = discards.get(instance, "In test case toolchain exclude")
- if platform_filter and plat.name not in platform_filter:
- discards[instance] = discards.get(instance, "Command line platform filter")
- if tc.platform_allow and plat.name not in tc.platform_allow:
- discards[instance] = discards.get(instance, "Not in testcase platform allow list")
- if tc.toolchain_allow and toolchain not in tc.toolchain_allow:
- discards[instance] = discards.get(instance, "Not in testcase toolchain allow list")
- if not plat.env_satisfied:
- discards[instance] = discards.get(instance, "Environment ({}) not satisfied".format(", ".join(plat.env)))
- if not force_toolchain \
- and toolchain and (toolchain not in plat.supported_toolchains) \
- and "host" not in plat.supported_toolchains \
- and tc.type != 'unit':
- discards[instance] = discards.get(instance, "Not supported by the toolchain")
- if plat.ram < tc.min_ram:
- discards[instance] = discards.get(instance, "Not enough RAM")
- if tc.depends_on:
- dep_intersection = tc.depends_on.intersection(set(plat.supported))
- if dep_intersection != set(tc.depends_on):
- discards[instance] = discards.get(instance, "No hardware support")
- if plat.flash < tc.min_flash:
- discards[instance] = discards.get(instance, "Not enough FLASH")
- if set(plat.ignore_tags) & tc.tags:
- discards[instance] = discards.get(instance, "Excluded tags per platform (exclude_tags)")
- if plat.only_tags and not set(plat.only_tags) & tc.tags:
- discards[instance] = discards.get(instance, "Excluded tags per platform (only_tags)")
- test_configuration = ".".join([instance.platform.name,
- instance.testcase.id])
- # skip quarantined tests
- if test_configuration in self.quarantine and not self.quarantine_verify:
- discards[instance] = discards.get(instance,
- f"Quarantine: {self.quarantine[test_configuration]}")
- # run only quarantined test to verify their statuses (skip everything else)
- if self.quarantine_verify and test_configuration not in self.quarantine:
- discards[instance] = discards.get(instance, "Not under quarantine")
- # if nothing stopped us until now, it means this configuration
- # needs to be added.
- instance_list.append(instance)
- # no configurations, so jump to next testcase
- if not instance_list:
- continue
- # if twister was launched with no platform options at all, we
- # take all default platforms
- if default_platforms and not tc.build_on_all and not integration:
- if tc.platform_allow:
- a = set(self.default_platforms)
- b = set(tc.platform_allow)
- c = a.intersection(b)
- if c:
- aa = list(filter(lambda tc: tc.platform.name in c, instance_list))
- self.add_instances(aa)
- else:
- self.add_instances(instance_list)
- else:
- instances = list(filter(lambda tc: tc.platform.default, instance_list))
- self.add_instances(instances)
- elif integration:
- instances = list(filter(lambda item: item.platform.name in tc.integration_platforms, instance_list))
- self.add_instances(instances)
- elif emulation_platforms:
- self.add_instances(instance_list)
- for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)):
- discards[instance] = discards.get(instance, "Not an emulated platform")
- else:
- self.add_instances(instance_list)
- for _, case in self.instances.items():
- case.create_overlay(case.platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform)
- self.discards = discards
- self.selected_platforms = set(p.platform.name for p in self.instances.values())
- for instance in self.discards:
- instance.reason = self.discards[instance]
- # If integration mode is on all skips on integration_platforms are treated as errors.
- if self.integration and instance.platform.name in instance.testcase.integration_platforms \
- and "Quarantine" not in instance.reason:
- instance.status = "error"
- instance.reason += " but is one of the integration platforms"
- instance.fill_results_by_status()
- self.instances[instance.name] = instance
- else:
- instance.status = "skipped"
- instance.fill_results_by_status()
- self.filtered_platforms = set(p.platform.name for p in self.instances.values()
- if p.status != "skipped" )
- return discards
- def add_instances(self, instance_list):
- for instance in instance_list:
- self.instances[instance.name] = instance
- @staticmethod
- def calc_one_elf_size(instance):
- if instance.status not in ["error", "failed", "skipped"]:
- if instance.platform.type != "native":
- size_calc = instance.calculate_sizes()
- instance.metrics["ram_size"] = size_calc.get_ram_size()
- instance.metrics["rom_size"] = size_calc.get_rom_size()
- instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
- else:
- instance.metrics["ram_size"] = 0
- instance.metrics["rom_size"] = 0
- instance.metrics["unrecognized"] = []
- instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0
- def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False):
- for instance in self.instances.values():
- if build_only:
- instance.run = False
- if instance.status not in ['passed', 'skipped', 'error']:
- logger.debug(f"adding {instance.name}")
- instance.status = None
- if test_only and instance.run:
- pipeline.put({"op": "run", "test": instance})
- else:
- pipeline.put({"op": "cmake", "test": instance})
- # If the instance got 'error' status before, proceed to the report stage
- if instance.status == "error":
- pipeline.put({"op": "report", "test": instance})
- def pipeline_mgr(self, pipeline, done_queue, lock, results):
- while True:
- try:
- task = pipeline.get_nowait()
- except queue.Empty:
- break
- else:
- test = task['test']
- pb = ProjectBuilder(self,
- test,
- lsan=self.enable_lsan,
- asan=self.enable_asan,
- ubsan=self.enable_ubsan,
- coverage=self.enable_coverage,
- extra_args=self.extra_args,
- device_testing=self.device_testing,
- cmake_only=self.cmake_only,
- cleanup=self.cleanup,
- valgrind=self.enable_valgrind,
- inline_logs=self.inline_logs,
- generator=self.generator,
- generator_cmd=self.generator_cmd,
- verbose=self.verbose,
- warnings_as_errors=self.warnings_as_errors,
- overflow_as_errors=self.overflow_as_errors
- )
- pb.process(pipeline, done_queue, task, lock, results)
- return True
- def execute(self, pipeline, done, results):
- lock = Lock()
- logger.info("Adding tasks to the queue...")
- self.add_tasks_to_queue(pipeline, self.build_only, self.test_only)
- logger.info("Added initial list of jobs to queue")
- processes = []
- for job in range(self.jobs):
- logger.debug(f"Launch process {job}")
- p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
- processes.append(p)
- p.start()
- try:
- for p in processes:
- p.join()
- except KeyboardInterrupt:
- logger.info("Execution interrupted")
- for p in processes:
- p.terminate()
- # FIXME: This needs to move out.
- if self.enable_size_report and not self.cmake_only:
- # Parallelize size calculation
- executor = concurrent.futures.ThreadPoolExecutor(self.jobs)
- futures = [executor.submit(self.calc_one_elf_size, instance)
- for instance in self.instances.values()]
- concurrent.futures.wait(futures)
- else:
- for instance in self.instances.values():
- instance.metrics["ram_size"] = 0
- instance.metrics["rom_size"] = 0
- instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0
- instance.metrics["unrecognized"] = []
- return results
- def discard_report(self, filename):
- try:
- if not self.discards:
- raise TwisterRuntimeError("apply_filters() hasn't been run!")
- except Exception as e:
- logger.error(str(e))
- sys.exit(2)
- with open(filename, "wt") as csvfile:
- fieldnames = ["test", "arch", "platform", "reason"]
- cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
- cw.writeheader()
- for instance, reason in sorted(self.discards.items()):
- rowdict = {"test": instance.testcase.name,
- "arch": instance.platform.arch,
- "platform": instance.platform.name,
- "reason": reason}
- cw.writerow(rowdict)
- def target_report(self, outdir, suffix, append=False):
- platforms = {inst.platform.name for _, inst in self.instances.items()}
- for platform in platforms:
- if suffix:
- filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix))
- else:
- filename = os.path.join(outdir,"{}.xml".format(platform))
- self.xunit_report(filename, platform, full_report=True,
- append=append, version=self.version)
- @staticmethod
- def process_log(log_file):
- filtered_string = ""
- if os.path.exists(log_file):
- with open(log_file, "rb") as f:
- log = f.read().decode("utf-8")
- filtered_string = ''.join(filter(lambda x: x in string.printable, log))
- return filtered_string
- def xunit_report(self, filename, platform=None, full_report=False, append=False, version="NA"):
- total = 0
- fails = passes = errors = skips = 0
- if platform:
- selected = [platform]
- logger.info(f"Writing target report for {platform}...")
- else:
- logger.info(f"Writing xunit report {filename}...")
- selected = self.selected_platforms
- if os.path.exists(filename) and append:
- tree = ET.parse(filename)
- eleTestsuites = tree.getroot()
- else:
- eleTestsuites = ET.Element('testsuites')
- for p in selected:
- inst = self.get_platform_instances(p)
- fails = 0
- passes = 0
- errors = 0
- skips = 0
- duration = 0
- for _, instance in inst.items():
- handler_time = instance.metrics.get('handler_time', 0)
- duration += handler_time
- if full_report and instance.run:
- for k in instance.results.keys():
- if instance.results[k] == 'PASS':
- passes += 1
- elif instance.results[k] == 'BLOCK':
- errors += 1
- elif instance.results[k] == 'SKIP' or instance.status in ['skipped']:
- skips += 1
- else:
- fails += 1
- else:
- if instance.status in ["error", "failed", "timeout", "flash_error"]:
- if instance.reason in ['build_error', 'handler_crash']:
- errors += 1
- else:
- fails += 1
- elif instance.status == 'skipped':
- skips += 1
- elif instance.status == 'passed':
- passes += 1
- else:
- if instance.status:
- logger.error(f"{instance.name}: Unknown status {instance.status}")
- else:
- logger.error(f"{instance.name}: No status")
- total = (errors + passes + fails + skips)
- # do not produce a report if no tests were actually run (only built)
- if total == 0:
- continue
- run = p
- eleTestsuite = None
- # When we re-run the tests, we re-use the results and update only with
- # the newly run tests.
- if os.path.exists(filename) and append:
- ts = eleTestsuites.findall(f'testsuite/[@name="{p}"]')
- if ts:
- eleTestsuite = ts[0]
- eleTestsuite.attrib['failures'] = "%d" % fails
- eleTestsuite.attrib['errors'] = "%d" % errors
- eleTestsuite.attrib['skipped'] = "%d" % skips
- else:
- logger.info(f"Did not find any existing results for {p}")
- eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
- name=run, time="%f" % duration,
- tests="%d" % (total),
- failures="%d" % fails,
- errors="%d" % (errors), skipped="%s" % (skips))
- eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
- # Multiple 'property' can be added to 'properties'
- # differing by name and value
- ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
- else:
- eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
- name=run, time="%f" % duration,
- tests="%d" % (total),
- failures="%d" % fails,
- errors="%d" % (errors), skipped="%s" % (skips))
- eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
- # Multiple 'property' can be added to 'properties'
- # differing by name and value
- ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
- for _, instance in inst.items():
- if full_report:
- tname = os.path.basename(instance.testcase.name)
- else:
- tname = instance.testcase.id
- handler_time = instance.metrics.get('handler_time', 0)
- if full_report:
- for k in instance.results.keys():
- # remove testcases that are being re-run from exiting reports
- for tc in eleTestsuite.findall(f'testcase/[@name="{k}"]'):
- eleTestsuite.remove(tc)
- classname = ".".join(tname.split(".")[:2])
- eleTestcase = ET.SubElement(
- eleTestsuite, 'testcase',
- classname=classname,
- name="%s" % (k), time="%f" % handler_time)
- if instance.results[k] in ['FAIL', 'BLOCK'] or \
- (not instance.run and instance.status in ["error", "failed", "timeout"]):
- if instance.results[k] == 'FAIL':
- el = ET.SubElement(
- eleTestcase,
- 'failure',
- type="failure",
- message="failed")
- else:
- el = ET.SubElement(
- eleTestcase,
- 'error',
- type="failure",
- message=instance.reason)
- log_root = os.path.join(self.outdir, instance.platform.name, instance.testcase.name)
- log_file = os.path.join(log_root, "handler.log")
- el.text = self.process_log(log_file)
- elif instance.results[k] == 'PASS' \
- or (not instance.run and instance.status in ["passed"]):
- pass
- elif instance.results[k] == 'SKIP' or (instance.status in ["skipped"]):
- el = ET.SubElement(eleTestcase, 'skipped', type="skipped", message=instance.reason)
- else:
- el = ET.SubElement(
- eleTestcase,
- 'error',
- type="error",
- message=f"{instance.reason}")
- else:
- if platform:
- classname = ".".join(instance.testcase.name.split(".")[:2])
- else:
- classname = p + ":" + ".".join(instance.testcase.name.split(".")[:2])
- # remove testcases that are being re-run from exiting reports
- for tc in eleTestsuite.findall(f'testcase/[@classname="{classname}"][@name="{instance.testcase.name}"]'):
- eleTestsuite.remove(tc)
- eleTestcase = ET.SubElement(eleTestsuite, 'testcase',
- classname=classname,
- name="%s" % (instance.testcase.name),
- time="%f" % handler_time)
- if instance.status in ["error", "failed", "timeout", "flash_error"]:
- failure = ET.SubElement(
- eleTestcase,
- 'failure',
- type="failure",
- message=instance.reason)
- log_root = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name))
- bl = os.path.join(log_root, "build.log")
- hl = os.path.join(log_root, "handler.log")
- log_file = bl
- if instance.reason != 'Build error':
- if os.path.exists(hl):
- log_file = hl
- else:
- log_file = bl
- failure.text = self.process_log(log_file)
- elif instance.status == "skipped":
- ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped")
- result = ET.tostring(eleTestsuites)
- with open(filename, 'wb') as report:
- report.write(result)
- return fails, passes, errors, skips
- def csv_report(self, filename):
- with open(filename, "wt") as csvfile:
- fieldnames = ["test", "arch", "platform", "status",
- "extra_args", "handler", "handler_time", "ram_size",
- "rom_size"]
- cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
- cw.writeheader()
- for instance in self.instances.values():
- rowdict = {"test": instance.testcase.name,
- "arch": instance.platform.arch,
- "platform": instance.platform.name,
- "extra_args": " ".join(instance.testcase.extra_args),
- "handler": instance.platform.simulation}
- rowdict["status"] = instance.status
- if instance.status not in ["error", "failed", "timeout"]:
- if instance.handler:
- rowdict["handler_time"] = instance.metrics.get("handler_time", 0)
- ram_size = instance.metrics.get("ram_size", 0)
- rom_size = instance.metrics.get("rom_size", 0)
- rowdict["ram_size"] = ram_size
- rowdict["rom_size"] = rom_size
- cw.writerow(rowdict)
- def json_report(self, filename, append=False, version="NA"):
- logger.info(f"Writing JSON report {filename}")
- report = {}
- selected = self.selected_platforms
- report["environment"] = {"os": os.name,
- "zephyr_version": version,
- "toolchain": self.get_toolchain()
- }
- json_data = {}
- if os.path.exists(filename) and append:
- with open(filename, 'r') as json_file:
- json_data = json.load(json_file)
- suites = json_data.get("testsuites", [])
- if suites:
- suite = suites[0]
- testcases = suite.get("testcases", [])
- else:
- suite = {}
- testcases = []
- for p in selected:
- inst = self.get_platform_instances(p)
- for _, instance in inst.items():
- testcase = {}
- handler_log = os.path.join(instance.build_dir, "handler.log")
- build_log = os.path.join(instance.build_dir, "build.log")
- device_log = os.path.join(instance.build_dir, "device.log")
- handler_time = instance.metrics.get('handler_time', 0)
- ram_size = instance.metrics.get ("ram_size", 0)
- rom_size = instance.metrics.get("rom_size",0)
- for k in instance.results.keys():
- testcases = list(filter(lambda d: not (d.get('testcase') == k and d.get('platform') == p), testcases ))
- testcase = {"testcase": k,
- "arch": instance.platform.arch,
- "platform": p,
- }
- if ram_size:
- testcase["ram_size"] = ram_size
- if rom_size:
- testcase["rom_size"] = rom_size
- if instance.results[k] in ["PASS"] or instance.status == 'passed':
- testcase["status"] = "passed"
- if instance.handler:
- testcase["execution_time"] = handler_time
- elif instance.results[k] in ['FAIL', 'BLOCK'] or instance.status in ["error", "failed", "timeout", "flash_error"]:
- testcase["status"] = "failed"
- testcase["reason"] = instance.reason
- testcase["execution_time"] = handler_time
- if os.path.exists(handler_log):
- testcase["test_output"] = self.process_log(handler_log)
- elif os.path.exists(device_log):
- testcase["device_log"] = self.process_log(device_log)
- else:
- testcase["build_log"] = self.process_log(build_log)
- elif instance.status == 'skipped':
- testcase["status"] = "skipped"
- testcase["reason"] = instance.reason
- testcases.append(testcase)
- suites = [ {"testcases": testcases} ]
- report["testsuites"] = suites
- with open(filename, "wt") as json_file:
- json.dump(report, json_file, indent=4, separators=(',',':'))
- def get_testcase(self, identifier):
- results = []
- for _, tc in self.testcases.items():
- for case in tc.cases:
- if case == identifier:
- results.append(tc)
- return results
- class CoverageTool:
- """ Base class for every supported coverage tool
- """
- def __init__(self):
- self.gcov_tool = None
- self.base_dir = None
- @staticmethod
- def factory(tool):
- if tool == 'lcov':
- t = Lcov()
- elif tool == 'gcovr':
- t = Gcovr()
- else:
- logger.error("Unsupported coverage tool specified: {}".format(tool))
- return None
- logger.debug(f"Select {tool} as the coverage tool...")
- return t
- @staticmethod
- def retrieve_gcov_data(input_file):
- logger.debug("Working on %s" % input_file)
- extracted_coverage_info = {}
- capture_data = False
- capture_complete = False
- with open(input_file, 'r') as fp:
- for line in fp.readlines():
- if re.search("GCOV_COVERAGE_DUMP_START", line):
- capture_data = True
- continue
- if re.search("GCOV_COVERAGE_DUMP_END", line):
- capture_complete = True
- break
- # Loop until the coverage data is found.
- if not capture_data:
- continue
- if line.startswith("*"):
- sp = line.split("<")
- if len(sp) > 1:
- # Remove the leading delimiter "*"
- file_name = sp[0][1:]
- # Remove the trailing new line char
- hex_dump = sp[1][:-1]
- else:
- continue
- else:
- continue
- extracted_coverage_info.update({file_name: hex_dump})
- if not capture_data:
- capture_complete = True
- return {'complete': capture_complete, 'data': extracted_coverage_info}
- @staticmethod
- def create_gcda_files(extracted_coverage_info):
- logger.debug("Generating gcda files")
- for filename, hexdump_val in extracted_coverage_info.items():
- # if kobject_hash is given for coverage gcovr fails
- # hence skipping it problem only in gcovr v4.1
- if "kobject_hash" in filename:
- filename = (filename[:-4]) + "gcno"
- try:
- os.remove(filename)
- except Exception:
- pass
- continue
- with open(filename, 'wb') as fp:
- fp.write(bytes.fromhex(hexdump_val))
- def generate(self, outdir):
- for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
- gcov_data = self.__class__.retrieve_gcov_data(filename)
- capture_complete = gcov_data['complete']
- extracted_coverage_info = gcov_data['data']
- if capture_complete:
- self.__class__.create_gcda_files(extracted_coverage_info)
- logger.debug("Gcov data captured: {}".format(filename))
- else:
- logger.error("Gcov data capture incomplete: {}".format(filename))
- with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
- ret = self._generate(outdir, coveragelog)
- if ret == 0:
- logger.info("HTML report generated: {}".format(
- os.path.join(outdir, "coverage", "index.html")))
- class Lcov(CoverageTool):
- def __init__(self):
- super().__init__()
- self.ignores = []
- def add_ignore_file(self, pattern):
- self.ignores.append('*' + pattern + '*')
- def add_ignore_directory(self, pattern):
- self.ignores.append('*/' + pattern + '/*')
- def _generate(self, outdir, coveragelog):
- coveragefile = os.path.join(outdir, "coverage.info")
- ztestfile = os.path.join(outdir, "ztest.info")
- cmd = ["lcov", "--gcov-tool", self.gcov_tool,
- "--capture", "--directory", outdir,
- "--rc", "lcov_branch_coverage=1",
- "--output-file", coveragefile]
- cmd_str = " ".join(cmd)
- logger.debug(f"Running {cmd_str}...")
- subprocess.call(cmd, stdout=coveragelog)
- # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
- subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract",
- coveragefile,
- os.path.join(self.base_dir, "tests", "ztest", "*"),
- "--output-file", ztestfile,
- "--rc", "lcov_branch_coverage=1"], stdout=coveragelog)
- if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
- subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove",
- ztestfile,
- os.path.join(self.base_dir, "tests/ztest/test/*"),
- "--output-file", ztestfile,
- "--rc", "lcov_branch_coverage=1"],
- stdout=coveragelog)
- files = [coveragefile, ztestfile]
- else:
- files = [coveragefile]
- for i in self.ignores:
- subprocess.call(
- ["lcov", "--gcov-tool", self.gcov_tool, "--remove",
- coveragefile, i, "--output-file",
- coveragefile, "--rc", "lcov_branch_coverage=1"],
- stdout=coveragelog)
- # The --ignore-errors source option is added to avoid it exiting due to
- # samples/application_development/external_lib/
- return subprocess.call(["genhtml", "--legend", "--branch-coverage",
- "--ignore-errors", "source",
- "-output-directory",
- os.path.join(outdir, "coverage")] + files,
- stdout=coveragelog)
- class Gcovr(CoverageTool):
- def __init__(self):
- super().__init__()
- self.ignores = []
- def add_ignore_file(self, pattern):
- self.ignores.append('.*' + pattern + '.*')
- def add_ignore_directory(self, pattern):
- self.ignores.append(".*/" + pattern + '/.*')
- @staticmethod
- def _interleave_list(prefix, list):
- tuple_list = [(prefix, item) for item in list]
- return [item for sublist in tuple_list for item in sublist]
- def _generate(self, outdir, coveragelog):
- coveragefile = os.path.join(outdir, "coverage.json")
- ztestfile = os.path.join(outdir, "ztest.json")
- excludes = Gcovr._interleave_list("-e", self.ignores)
- # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
- cmd = ["gcovr", "-r", self.base_dir, "--gcov-executable",
- self.gcov_tool, "-e", "tests/*"] + excludes + ["--json", "-o",
- coveragefile, outdir]
- cmd_str = " ".join(cmd)
- logger.debug(f"Running {cmd_str}...")
- subprocess.call(cmd, stdout=coveragelog)
- subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
- self.gcov_tool, "-f", "tests/ztest", "-e",
- "tests/ztest/test/*", "--json", "-o", ztestfile,
- outdir], stdout=coveragelog)
- if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
- files = [coveragefile, ztestfile]
- else:
- files = [coveragefile]
- subdir = os.path.join(outdir, "coverage")
- os.makedirs(subdir, exist_ok=True)
- tracefiles = self._interleave_list("--add-tracefile", files)
- return subprocess.call(["gcovr", "-r", self.base_dir, "--html",
- "--html-details"] + tracefiles +
- ["-o", os.path.join(subdir, "index.html")],
- stdout=coveragelog)
- class DUT(object):
- def __init__(self,
- id=None,
- serial=None,
- platform=None,
- product=None,
- serial_pty=None,
- connected=False,
- pre_script=None,
- post_script=None,
- post_flash_script=None,
- runner=None):
- self.serial = serial
- self.platform = platform
- self.serial_pty = serial_pty
- self._counter = Value("i", 0)
- self._available = Value("i", 1)
- self.connected = connected
- self.pre_script = pre_script
- self.id = id
- self.product = product
- self.runner = runner
- self.fixtures = []
- self.post_flash_script = post_flash_script
- self.post_script = post_script
- self.pre_script = pre_script
- self.probe_id = None
- self.notes = None
- self.lock = Lock()
- self.match = False
- @property
- def available(self):
- with self._available.get_lock():
- return self._available.value
- @available.setter
- def available(self, value):
- with self._available.get_lock():
- self._available.value = value
- @property
- def counter(self):
- with self._counter.get_lock():
- return self._counter.value
- @counter.setter
- def counter(self, value):
- with self._counter.get_lock():
- self._counter.value = value
- def to_dict(self):
- d = {}
- exclude = ['_available', '_counter', 'match']
- v = vars(self)
- for k in v.keys():
- if k not in exclude and v[k]:
- d[k] = v[k]
- return d
- def __repr__(self):
- return f"<{self.platform} ({self.product}) on {self.serial}>"
- class HardwareMap:
- schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml")
- manufacturer = [
- 'ARM',
- 'SEGGER',
- 'MBED',
- 'STMicroelectronics',
- 'Atmel Corp.',
- 'Texas Instruments',
- 'Silicon Labs',
- 'NXP Semiconductors',
- 'Microchip Technology Inc.',
- 'FTDI',
- 'Digilent'
- ]
- runner_mapping = {
- 'pyocd': [
- 'DAPLink CMSIS-DAP',
- 'MBED CMSIS-DAP'
- ],
- 'jlink': [
- 'J-Link',
- 'J-Link OB'
- ],
- 'openocd': [
- 'STM32 STLink', '^XDS110.*', 'STLINK-V3'
- ],
- 'dediprog': [
- 'TTL232R-3V3',
- 'MCP2200 USB Serial Port Emulator'
- ]
- }
- def __init__(self):
- self.detected = []
- self.duts = []
- def add_device(self, serial, platform, pre_script, is_pty):
- device = DUT(platform=platform, connected=True, pre_script=pre_script)
- if is_pty:
- device.serial_pty = serial
- else:
- device.serial = serial
- self.duts.append(device)
- def load(self, map_file):
- hwm_schema = scl.yaml_load(self.schema_path)
- duts = scl.yaml_load_verify(map_file, hwm_schema)
- for dut in duts:
- pre_script = dut.get('pre_script')
- post_script = dut.get('post_script')
- post_flash_script = dut.get('post_flash_script')
- platform = dut.get('platform')
- id = dut.get('id')
- runner = dut.get('runner')
- serial = dut.get('serial')
- product = dut.get('product')
- fixtures = dut.get('fixtures', [])
- new_dut = DUT(platform=platform,
- product=product,
- runner=runner,
- id=id,
- serial=serial,
- connected=serial is not None,
- pre_script=pre_script,
- post_script=post_script,
- post_flash_script=post_flash_script)
- new_dut.fixtures = fixtures
- new_dut.counter = 0
- self.duts.append(new_dut)
- def scan(self, persistent=False):
- from serial.tools import list_ports
- if persistent and platform.system() == 'Linux':
- # On Linux, /dev/serial/by-id provides symlinks to
- # '/dev/ttyACMx' nodes using names which are unique as
- # long as manufacturers fill out USB metadata nicely.
- #
- # This creates a map from '/dev/ttyACMx' device nodes
- # to '/dev/serial/by-id/usb-...' symlinks. The symlinks
- # go into the hardware map because they stay the same
- # even when the user unplugs / replugs the device.
- #
- # Some inexpensive USB/serial adapters don't result
- # in unique names here, though, so use of this feature
- # requires explicitly setting persistent=True.
- by_id = Path('/dev/serial/by-id')
- def readlink(link):
- return str((by_id / link).resolve())
- persistent_map = {readlink(link): str(link)
- for link in by_id.iterdir()}
- else:
- persistent_map = {}
- serial_devices = list_ports.comports()
- logger.info("Scanning connected hardware...")
- for d in serial_devices:
- if d.manufacturer in self.manufacturer:
- # TI XDS110 can have multiple serial devices for a single board
- # assume endpoint 0 is the serial, skip all others
- if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
- continue
- s_dev = DUT(platform="unknown",
- id=d.serial_number,
- serial=persistent_map.get(d.device, d.device),
- product=d.product,
- runner='unknown',
- connected=True)
- for runner, _ in self.runner_mapping.items():
- products = self.runner_mapping.get(runner)
- if d.product in products:
- s_dev.runner = runner
- continue
- # Try regex matching
- for p in products:
- if re.match(p, d.product):
- s_dev.runner = runner
- s_dev.connected = True
- self.detected.append(s_dev)
- else:
- logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
- def save(self, hwm_file):
- # use existing map
- self.detected.sort(key=lambda x: x.serial or '')
- if os.path.exists(hwm_file):
- with open(hwm_file, 'r') as yaml_file:
- hwm = yaml.load(yaml_file, Loader=SafeLoader)
- if hwm:
- hwm.sort(key=lambda x: x['serial'] or '')
- # disconnect everything
- for h in hwm:
- h['connected'] = False
- h['serial'] = None
- for _detected in self.detected:
- for h in hwm:
- if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match:
- h['connected'] = True
- h['serial'] = _detected.serial
- _detected.match = True
- new_duts = list(filter(lambda d: not d.match, self.detected))
- new = []
- for d in new_duts:
- new.append(d.to_dict())
- if hwm:
- hwm = hwm + new
- else:
- hwm = new
- with open(hwm_file, 'w') as yaml_file:
- yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False)
- self.load(hwm_file)
- logger.info("Registered devices:")
- self.dump()
- else:
- # create new file
- dl = []
- for _connected in self.detected:
- platform = _connected.platform
- id = _connected.id
- runner = _connected.runner
- serial = _connected.serial
- product = _connected.product
- d = {
- 'platform': platform,
- 'id': id,
- 'runner': runner,
- 'serial': serial,
- 'product': product,
- 'connected': _connected.connected
- }
- dl.append(d)
- with open(hwm_file, 'w') as yaml_file:
- yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False)
- logger.info("Detected devices:")
- self.dump(detected=True)
- def dump(self, filtered=[], header=[], connected_only=False, detected=False):
- print("")
- table = []
- if detected:
- to_show = self.detected
- else:
- to_show = self.duts
- if not header:
- header = ["Platform", "ID", "Serial device"]
- for p in to_show:
- platform = p.platform
- connected = p.connected
- if filtered and platform not in filtered:
- continue
- if not connected_only or connected:
- table.append([platform, p.id, p.serial])
- print(tabulate(table, headers=header, tablefmt="github"))
|