123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- #!/usr/bin/env python3
- import struct
- import binascii
- import sys
- import itertools as it
- TAG_TYPES = {
- 'splice': (0x700, 0x400),
- 'create': (0x7ff, 0x401),
- 'delete': (0x7ff, 0x4ff),
- 'name': (0x700, 0x000),
- 'reg': (0x7ff, 0x001),
- 'dir': (0x7ff, 0x002),
- 'superblock': (0x7ff, 0x0ff),
- 'struct': (0x700, 0x200),
- 'dirstruct': (0x7ff, 0x200),
- 'ctzstruct': (0x7ff, 0x202),
- 'inlinestruct': (0x7ff, 0x201),
- 'userattr': (0x700, 0x300),
- 'tail': (0x700, 0x600),
- 'softtail': (0x7ff, 0x600),
- 'hardtail': (0x7ff, 0x601),
- 'gstate': (0x700, 0x700),
- 'movestate': (0x7ff, 0x7ff),
- 'crc': (0x700, 0x500),
- }
- class Tag:
- def __init__(self, *args):
- if len(args) == 1:
- self.tag = args[0]
- elif len(args) == 3:
- if isinstance(args[0], str):
- type = TAG_TYPES[args[0]][1]
- else:
- type = args[0]
- if isinstance(args[1], str):
- id = int(args[1], 0) if args[1] not in 'x.' else 0x3ff
- else:
- id = args[1]
- if isinstance(args[2], str):
- size = int(args[2], str) if args[2] not in 'x.' else 0x3ff
- else:
- size = args[2]
- self.tag = (type << 20) | (id << 10) | size
- else:
- assert False
- @property
- def isvalid(self):
- return not bool(self.tag & 0x80000000)
- @property
- def isattr(self):
- return not bool(self.tag & 0x40000000)
- @property
- def iscompactable(self):
- return bool(self.tag & 0x20000000)
- @property
- def isunique(self):
- return not bool(self.tag & 0x10000000)
- @property
- def type(self):
- return (self.tag & 0x7ff00000) >> 20
- @property
- def type1(self):
- return (self.tag & 0x70000000) >> 20
- @property
- def type3(self):
- return (self.tag & 0x7ff00000) >> 20
- @property
- def id(self):
- return (self.tag & 0x000ffc00) >> 10
- @property
- def size(self):
- return (self.tag & 0x000003ff) >> 0
- @property
- def dsize(self):
- return 4 + (self.size if self.size != 0x3ff else 0)
- @property
- def chunk(self):
- return self.type & 0xff
- @property
- def schunk(self):
- return struct.unpack('b', struct.pack('B', self.chunk))[0]
- def is_(self, type):
- return (self.type & TAG_TYPES[type][0]) == TAG_TYPES[type][1]
- def mkmask(self):
- return Tag(
- 0x700 if self.isunique else 0x7ff,
- 0x3ff if self.isattr else 0,
- 0)
- def chid(self, nid):
- ntag = Tag(self.type, nid, self.size)
- if hasattr(self, 'off'): ntag.off = self.off
- if hasattr(self, 'data'): ntag.data = self.data
- if hasattr(self, 'crc'): ntag.crc = self.crc
- return ntag
- def typerepr(self):
- if self.is_('crc') and getattr(self, 'crc', 0xffffffff) != 0xffffffff:
- return 'crc (bad)'
- reverse_types = {v: k for k, v in TAG_TYPES.items()}
- for prefix in range(12):
- mask = 0x7ff & ~((1 << prefix)-1)
- if (mask, self.type & mask) in reverse_types:
- type = reverse_types[mask, self.type & mask]
- if prefix > 0:
- return '%s %#0*x' % (
- type, prefix//4, self.type & ((1 << prefix)-1))
- else:
- return type
- else:
- return '%02x' % self.type
- def idrepr(self):
- return repr(self.id) if self.id != 0x3ff else '.'
- def sizerepr(self):
- return repr(self.size) if self.size != 0x3ff else 'x'
- def __repr__(self):
- return 'Tag(%r, %d, %d)' % (self.typerepr(), self.id, self.size)
- def __lt__(self, other):
- return (self.id, self.type) < (other.id, other.type)
- def __bool__(self):
- return self.isvalid
- def __int__(self):
- return self.tag
- def __index__(self):
- return self.tag
- class MetadataPair:
- def __init__(self, blocks):
- if len(blocks) > 1:
- self.pair = [MetadataPair([block]) for block in blocks]
- self.pair = sorted(self.pair, reverse=True)
- self.data = self.pair[0].data
- self.rev = self.pair[0].rev
- self.tags = self.pair[0].tags
- self.ids = self.pair[0].ids
- self.log = self.pair[0].log
- self.all_ = self.pair[0].all_
- return
- self.pair = [self]
- self.data = blocks[0]
- block = self.data
- self.rev, = struct.unpack('<I', block[0:4])
- crc = binascii.crc32(block[0:4])
- # parse tags
- corrupt = False
- tag = Tag(0xffffffff)
- off = 4
- self.log = []
- self.all_ = []
- while len(block) - off >= 4:
- ntag, = struct.unpack('>I', block[off:off+4])
- tag = Tag(int(tag) ^ ntag)
- tag.off = off + 4
- tag.data = block[off+4:off+tag.dsize]
- if tag.is_('crc'):
- crc = binascii.crc32(block[off:off+4+4], crc)
- else:
- crc = binascii.crc32(block[off:off+tag.dsize], crc)
- tag.crc = crc
- off += tag.dsize
- self.all_.append(tag)
- if tag.is_('crc'):
- # is valid commit?
- if crc != 0xffffffff:
- corrupt = True
- if not corrupt:
- self.log = self.all_.copy()
- # reset tag parsing
- crc = 0
- tag = Tag(int(tag) ^ ((tag.type & 1) << 31))
- # find active ids
- self.ids = list(it.takewhile(
- lambda id: Tag('name', id, 0) in self,
- it.count()))
- # find most recent tags
- self.tags = []
- for tag in self.log:
- if tag.is_('crc') or tag.is_('splice'):
- continue
- elif tag.id == 0x3ff:
- if tag in self and self[tag] is tag:
- self.tags.append(tag)
- else:
- # id could have change, I know this is messy and slow
- # but it works
- for id in self.ids:
- ntag = tag.chid(id)
- if ntag in self and self[ntag] is tag:
- self.tags.append(ntag)
- self.tags = sorted(self.tags)
- def __bool__(self):
- return bool(self.log)
- def __lt__(self, other):
- # corrupt blocks don't count
- if not self or not other:
- return bool(other)
- # use sequence arithmetic to avoid overflow
- return not ((other.rev - self.rev) & 0x80000000)
- def __contains__(self, args):
- try:
- self[args]
- return True
- except KeyError:
- return False
- def __getitem__(self, args):
- if isinstance(args, tuple):
- gmask, gtag = args
- else:
- gmask, gtag = args.mkmask(), args
- gdiff = 0
- for tag in reversed(self.log):
- if (gmask.id != 0 and tag.is_('splice') and
- tag.id <= gtag.id - gdiff):
- if tag.is_('create') and tag.id == gtag.id - gdiff:
- # creation point
- break
- gdiff += tag.schunk
- if ((int(gmask) & int(tag)) ==
- (int(gmask) & int(gtag.chid(gtag.id - gdiff)))):
- if tag.size == 0x3ff:
- # deleted
- break
- return tag
- raise KeyError(gmask, gtag)
- def _dump_tags(self, tags, f=sys.stdout, truncate=True):
- f.write("%-8s %-8s %-13s %4s %4s" % (
- 'off', 'tag', 'type', 'id', 'len'))
- if truncate:
- f.write(' data (truncated)')
- f.write('\n')
- for tag in tags:
- f.write("%08x: %08x %-13s %4s %4s" % (
- tag.off, tag,
- tag.typerepr(), tag.idrepr(), tag.sizerepr()))
- if truncate:
- f.write(" %-23s %-8s\n" % (
- ' '.join('%02x' % c for c in tag.data[:8]),
- ''.join(c if c >= ' ' and c <= '~' else '.'
- for c in map(chr, tag.data[:8]))))
- else:
- f.write("\n")
- for i in range(0, len(tag.data), 16):
- f.write(" %08x: %-47s %-16s\n" % (
- tag.off+i,
- ' '.join('%02x' % c for c in tag.data[i:i+16]),
- ''.join(c if c >= ' ' and c <= '~' else '.'
- for c in map(chr, tag.data[i:i+16]))))
- def dump_tags(self, f=sys.stdout, truncate=True):
- self._dump_tags(self.tags, f=f, truncate=truncate)
- def dump_log(self, f=sys.stdout, truncate=True):
- self._dump_tags(self.log, f=f, truncate=truncate)
- def dump_all(self, f=sys.stdout, truncate=True):
- self._dump_tags(self.all_, f=f, truncate=truncate)
- def main(args):
- blocks = []
- with open(args.disk, 'rb') as f:
- for block in [args.block1, args.block2]:
- if block is None:
- continue
- f.seek(block * args.block_size)
- blocks.append(f.read(args.block_size)
- .ljust(args.block_size, b'\xff'))
- # find most recent pair
- mdir = MetadataPair(blocks)
- try:
- mdir.tail = mdir[Tag('tail', 0, 0)]
- if mdir.tail.size != 8 or mdir.tail.data == 8*b'\xff':
- mdir.tail = None
- except KeyError:
- mdir.tail = None
- print("mdir {%s} rev %d%s%s%s" % (
- ', '.join('%#x' % b
- for b in [args.block1, args.block2]
- if b is not None),
- mdir.rev,
- ' (was %s)' % ', '.join('%d' % m.rev for m in mdir.pair[1:])
- if len(mdir.pair) > 1 else '',
- ' (corrupted!)' if not mdir else '',
- ' -> {%#x, %#x}' % struct.unpack('<II', mdir.tail.data)
- if mdir.tail else ''))
- if args.all:
- mdir.dump_all(truncate=not args.no_truncate)
- elif args.log:
- mdir.dump_log(truncate=not args.no_truncate)
- else:
- mdir.dump_tags(truncate=not args.no_truncate)
- return 0 if mdir else 1
- if __name__ == "__main__":
- import argparse
- import sys
- parser = argparse.ArgumentParser(
- description="Dump useful info about metadata pairs in littlefs.")
- parser.add_argument('disk',
- help="File representing the block device.")
- parser.add_argument('block_size', type=lambda x: int(x, 0),
- help="Size of a block in bytes.")
- parser.add_argument('block1', type=lambda x: int(x, 0),
- help="First block address for finding the metadata pair.")
- parser.add_argument('block2', nargs='?', type=lambda x: int(x, 0),
- help="Second block address for finding the metadata pair.")
- parser.add_argument('-l', '--log', action='store_true',
- help="Show tags in log.")
- parser.add_argument('-a', '--all', action='store_true',
- help="Show all tags in log, included tags in corrupted commits.")
- parser.add_argument('-T', '--no-truncate', action='store_true',
- help="Don't truncate large amounts of data.")
- sys.exit(main(parser.parse_args()))
|