Expand source code Browse git
from __future__ import annotations
import itertools
import re
import string
from refinery.lib.argformats import ParserError, PythonExpression, numseq
from refinery.lib.meta import SizeInt, check_variable_name, metavars, STRING_FORMAT_HELP
from refinery.lib.structures import StreamDetour, StructReader
from refinery.lib.types import Param
from refinery.units import Arg, Chunk, Unit
def identity(x):
return x
_REST_MARKER = '#'
class struct(Unit):
"""
Parse structured binary data into meta variables using a parsing language based on the Python
struct format.
This unit uses two separate semantics based on format strings: One for parsing the input, and
another one for parsing the output.
(1) The input parsing format works as follows: A struct definition can include bare Python
struct parser letters like L for long integer or B for bytes, but also the following additional
format characters:
- `a` for null-terminated ASCII strings,
- `u` to read encoded, null-terminated UTF16 strings,
- `w` to read decoded, null-terminated UTF16 strings,
- `g` to read Microsoft GUID values,
- `E` to read 7-bit encoded integers.
For example, the string `LLxxHaa` will read two unsigned 32bit integers, then skip two bytes,
then read one unsigned 16bit integer, then two null-terminated ASCII strings. The unit defaults
to using native byte order with no alignment.
To extract fields from the struct definition under a name, format specifications are inserted
into the struct definitions that look like this:
{name[!alignment]:format}
The `alignment` parameter is optional. It must be an expression that evaluates to an integer
value. If it is specified, the current data pointer is aligned to a multiple of this value
before reading the field. The `format` can either be an integer expression specifying a number
of bytes to read, or any of the aforementioned format strings. The extracted data is then
stored in the meta variable with the given name. For example, `LLxxH{foo:a}{bar:a}` would be
parsed in the same way as the previous example, but the two ASCII strings would also be stored
in meta variables under the names `foo` and `bar`, respectively. The `format` string of a named
field is itself parsed as a foramt string expression, where all the previously parsed fields
are already available. For example, `I{:{}}` reads a single 32-bit integer length prefix and
then reads as many bytes as that prefix specifies.
(2) Conversely, the standard refinery string formatting is used to specify the output. %s
For example, the struct definition `LLxxH{foo:a}{bar:a}` with the output format `{foo}/{bar}`
would parse data as before, but the output body would be the concatnation of the field `foo`,
a forward slash, and the field `bar`. Variables used in the output expression are not included
as meta variables. As format fields in the output expression, one can also use `{1}`, `{2}` or
`{-1}` to access extracted fields by index. The value `{0}` represents the entire chunk of
structured data. By default, the output format `{%s}` is used, which represents either the last
byte string field that was extracted, or the entire chunk of structured data if none of the
fields were extracted.
"""
def __init__(
self,
spec: Param[str, Arg.String(help='Structure format as explained above.')],
*outputs: Param[str, Arg.String(metavar='output', help='Output format as explained above.')],
multi: Param[bool, Arg.Switch('-m', help=(
'Read as many pieces of structured data as possible intead of just one.'))] = False,
count: Param[int, Arg.Number('-c', help=(
'A limit on the number of chunks to read in multi mode; there is no limit by default.'))] = 0,
until: Param[str, Arg.String('-u', metavar='E', help=(
'An expression evaluated on each chunk in multi mode. New chunks will be parsed '
'only if the result is nonzero.'))] = '',
format: Param[str, Arg.String('-f', metavar='F', help=(
'Optionally specify a format string expression to auto-name extracted fields without a '
'given name. The format string accepts the field {{c}} for the type code and {{n}} for '
'the variable index.'))] = '',
name: Param[str, Arg.String('-n', metavar='VAR', group='FIELDS', help=(
'Equivalent to --format=VAR{{n}}.'))] = '',
more: Param[bool, Arg.Switch('-M', help=(
'After parsing the struct, emit one chunk that contains the data that was left '
'over in the buffer. If no data was left over, this chunk will be empty.'))] = False
):
if name:
format = format or F'{name}{{n}}'
outputs = outputs or (F'{{{_REST_MARKER}}}',)
super().__init__(spec=spec, outputs=outputs, until=until, format=format, count=count, multi=multi, more=more)
def process(self, data: Chunk):
formatter = string.Formatter()
field_format: str = self.args.format
until = self.args.until
until = until and PythonExpression(until, all_variables_allowed=True)
reader = StructReader(memoryview(data))
checkpoint = 0
mainspec = self.args.spec
byteorder = mainspec[:1]
count = self.args.count
if byteorder in '<@=!>':
mainspec = mainspec[1:]
else:
byteorder = '='
def fixorder(spec):
if spec[0] not in '<@=!>':
spec = byteorder + spec
return spec
previously_existing_variables = set(metavars(data).variable_names())
it = itertools.count() if self.args.multi else (0,)
for index in it:
field_counter = 0
checkpoint = reader.tell()
if reader.eof:
break
if 0 < count <= index:
break
meta = metavars(data)
meta.index = index
args = []
last = None
self.log_debug(F'starting new read at: 0x{checkpoint:08X}')
try:
for prefix, name, spec, conversion in formatter.parse(mainspec):
if prefix:
fields = reader.read_struct(fixorder(prefix))
if field_format:
codes = re.findall('[?cbBhHiIlLqQnNefdspPauwgk]', prefix)
if len(codes) != len(fields):
codes = 'v' * len(fields)
for code, field in zip(codes, fields):
code = 'b' if code == '?' else code.lower()
v = field_format.format_map({'c': code, 'n': field_counter})
meta[v] = field
field_counter += 1
args.extend(fields)
if name is None:
continue
if spec is None:
spec = ''
assert isinstance(spec, str)
assert isinstance(name, str)
field_counter += 1
if name and not name.isdecimal():
check_variable_name(name)
if not conversion:
peek = False
else:
alignment = PythonExpression.Evaluate(conversion, meta)
if alignment == 0:
peek = True
else:
_aa = reader.tell()
reader.byte_align(alignment)
_ab = reader.tell()
if _aa != _ab:
self.log_info(F'aligned from 0x{_aa:X} to 0x{_ab:X}')
spec, _, pipeline = spec.partition(':')
if spec:
spec = meta.format_str(spec, self.codec, args)
if spec:
try:
_exp = PythonExpression.Evaluate(spec, meta)
except ParserError:
pass
else:
spec = _exp
if spec == '':
last = value = reader.read(peek=peek)
elif isinstance(spec, int):
if spec < 0:
spec += reader.remaining_bytes
if spec < 0:
raise ValueError(F'The specified negative read offset is {-spec} beyond the cursor.')
last = value = reader.read_bytes(spec, peek=peek)
else:
value = reader.read_struct(fixorder(spec), peek=peek)
if not value:
self.log_debug(F'field {name} was empty, ignoring.')
continue
if len(value) > 1:
self.log_info(F'parsing field {name} produced {len(value)} items reading a tuple')
else:
value = value[0]
if pipeline:
value = numseq(pipeline, reverse=True, seed=value)
args.append(value)
if name == _REST_MARKER:
raise ValueError(F'Extracting a field with name {_REST_MARKER} is forbidden.')
elif name.isdecimal():
index = int(name)
limit = len(args) - 1
if index > limit:
self.log_warn(F'cannot assign index field {name}, the highest index is {limit}')
else:
args[index] = value
continue
elif name:
meta[name] = value
if until and until(meta):
self.log_info(F'the expression ({until}) evaluated to true; aborting.')
break
with StreamDetour(reader, checkpoint) as detour:
full = reader.read(detour.cursor - checkpoint)
if last is None:
last = full
outputs = []
symbols = dict(meta)
symbols[_REST_MARKER] = last
for template in self.args.outputs:
used = set()
outputs.append(meta.format(template, self.codec, [full, *args], symbols, used=used))
for key in used:
if key in previously_existing_variables:
continue
meta.discard(key)
for output in outputs:
chunk = Chunk(output)
chunk.meta.update(meta)
chunk.set_next_batch(index)
yield chunk
except EOFError:
break
leftover = len(reader) - checkpoint
if not leftover:
return
elif self.args.more:
reader.seekset(checkpoint)
yield reader.read()
else:
leftover = repr(SizeInt(leftover)).strip()
self.log_info(F'discarding {leftover} left in buffer')
if __d := struct.__doc__:
struct.__doc__ = __d % (STRING_FORMAT_HELP, _REST_MARKER)
def identity(x)-
Expand source code Browse git
def identity(x): return x
class struct (spec, *outputs, multi=False, count=0, until='', format='', name='', more=False)-
Parse structured binary data into meta variables using a parsing language based on the Python struct format.
This unit uses two separate semantics based on format strings: One for parsing the input, and another one for parsing the output.
(1) The input parsing format works as follows: A struct definition can include bare Python struct parser letters like L for long integer or B for bytes, but also the following additional format characters:
afor null-terminated ASCII strings,uto read encoded, null-terminated UTF16 strings,wto read decoded, null-terminated UTF16 strings,gto read Microsoft GUID values,Eto read 7-bit encoded integers.
For example, the string
LLxxHaawill read two unsigned 32bit integers, then skip two bytes, then read one unsigned 16bit integer, then two null-terminated ASCII strings. The unit defaults to using native byte order with no alignment.To extract fields from the struct definition under a name, format specifications are inserted into the struct definitions that look like this:
{name[!alignment]:format}The
alignmentparameter is optional. It must be an expression that evaluates to an integer value. If it is specified, the current data pointer is aligned to a multiple of this value before reading the field. Theformatcan either be an integer expression specifying a number of bytes to read, or any of the aforementioned format strings. The extracted data is then stored in the meta variable with the given name. For example,LLxxH{foo:a}{bar:a}would be parsed in the same way as the previous example, but the two ASCII strings would also be stored in meta variables under the namesfooandbar, respectively. Theformatstring of a named field is itself parsed as a foramt string expression, where all the previously parsed fields are already available. For example,I{:{}}reads a single 32-bit integer length prefix and then reads as many bytes as that prefix specifies.(2) Conversely, the standard refinery string formatting is used to specify the output. The format definitions use the following syntax:
{field[!modifier]:handlers}The
fieldcan specify an extracted meta variable, or the positional index of an extracted value. The optional multibin suffixhandlersis used to post-process the value of this field. For example,{2:hex:zl:b64}means: Take the second match group, hex-decode it, decompress it using zl, and finally decode it using base64. The optional modifier can be one of these:!r: Computes the Pythonrepr()of the field before processing it.!s: Field is a UTF-8 string literal, not a variable.!a: Field is a latin1 string literal.!u: Field is a UTF-16LE string literal.!h: Field is a hex-encoded literal (shortcut for!s:h).!q: Field is a URL-encoded literal (shortcut for!s:q).!n: Field is an escape-sequence literal (shortcut for!s:n).!z: Field evaluates to integer N; returns N zero bytes.
For example, the struct definition
LLxxH{foo:a}{bar:a}with the output format{foo}/{bar}would parse data as before, but the output body would be the concatnation of the fieldfoo, a forward slash, and the fieldbar. Variables used in the output expression are not included as meta variables. As format fields in the output expression, one can also use{1},{2}or{-1}to access extracted fields by index. The value{0}represents the entire chunk of structured data. By default, the output format{#}is used, which represents either the last byte string field that was extracted, or the entire chunk of structured data if none of the fields were extracted.Expand source code Browse git
class struct(Unit): """ Parse structured binary data into meta variables using a parsing language based on the Python struct format. This unit uses two separate semantics based on format strings: One for parsing the input, and another one for parsing the output. (1) The input parsing format works as follows: A struct definition can include bare Python struct parser letters like L for long integer or B for bytes, but also the following additional format characters: - `a` for null-terminated ASCII strings, - `u` to read encoded, null-terminated UTF16 strings, - `w` to read decoded, null-terminated UTF16 strings, - `g` to read Microsoft GUID values, - `E` to read 7-bit encoded integers. For example, the string `LLxxHaa` will read two unsigned 32bit integers, then skip two bytes, then read one unsigned 16bit integer, then two null-terminated ASCII strings. The unit defaults to using native byte order with no alignment. To extract fields from the struct definition under a name, format specifications are inserted into the struct definitions that look like this: {name[!alignment]:format} The `alignment` parameter is optional. It must be an expression that evaluates to an integer value. If it is specified, the current data pointer is aligned to a multiple of this value before reading the field. The `format` can either be an integer expression specifying a number of bytes to read, or any of the aforementioned format strings. The extracted data is then stored in the meta variable with the given name. For example, `LLxxH{foo:a}{bar:a}` would be parsed in the same way as the previous example, but the two ASCII strings would also be stored in meta variables under the names `foo` and `bar`, respectively. The `format` string of a named field is itself parsed as a foramt string expression, where all the previously parsed fields are already available. For example, `I{:{}}` reads a single 32-bit integer length prefix and then reads as many bytes as that prefix specifies. (2) Conversely, the standard refinery string formatting is used to specify the output. %s For example, the struct definition `LLxxH{foo:a}{bar:a}` with the output format `{foo}/{bar}` would parse data as before, but the output body would be the concatnation of the field `foo`, a forward slash, and the field `bar`. Variables used in the output expression are not included as meta variables. As format fields in the output expression, one can also use `{1}`, `{2}` or `{-1}` to access extracted fields by index. The value `{0}` represents the entire chunk of structured data. By default, the output format `{%s}` is used, which represents either the last byte string field that was extracted, or the entire chunk of structured data if none of the fields were extracted. """ def __init__( self, spec: Param[str, Arg.String(help='Structure format as explained above.')], *outputs: Param[str, Arg.String(metavar='output', help='Output format as explained above.')], multi: Param[bool, Arg.Switch('-m', help=( 'Read as many pieces of structured data as possible intead of just one.'))] = False, count: Param[int, Arg.Number('-c', help=( 'A limit on the number of chunks to read in multi mode; there is no limit by default.'))] = 0, until: Param[str, Arg.String('-u', metavar='E', help=( 'An expression evaluated on each chunk in multi mode. New chunks will be parsed ' 'only if the result is nonzero.'))] = '', format: Param[str, Arg.String('-f', metavar='F', help=( 'Optionally specify a format string expression to auto-name extracted fields without a ' 'given name. The format string accepts the field {{c}} for the type code and {{n}} for ' 'the variable index.'))] = '', name: Param[str, Arg.String('-n', metavar='VAR', group='FIELDS', help=( 'Equivalent to --format=VAR{{n}}.'))] = '', more: Param[bool, Arg.Switch('-M', help=( 'After parsing the struct, emit one chunk that contains the data that was left ' 'over in the buffer. If no data was left over, this chunk will be empty.'))] = False ): if name: format = format or F'{name}{{n}}' outputs = outputs or (F'{{{_REST_MARKER}}}',) super().__init__(spec=spec, outputs=outputs, until=until, format=format, count=count, multi=multi, more=more) def process(self, data: Chunk): formatter = string.Formatter() field_format: str = self.args.format until = self.args.until until = until and PythonExpression(until, all_variables_allowed=True) reader = StructReader(memoryview(data)) checkpoint = 0 mainspec = self.args.spec byteorder = mainspec[:1] count = self.args.count if byteorder in '<@=!>': mainspec = mainspec[1:] else: byteorder = '=' def fixorder(spec): if spec[0] not in '<@=!>': spec = byteorder + spec return spec previously_existing_variables = set(metavars(data).variable_names()) it = itertools.count() if self.args.multi else (0,) for index in it: field_counter = 0 checkpoint = reader.tell() if reader.eof: break if 0 < count <= index: break meta = metavars(data) meta.index = index args = [] last = None self.log_debug(F'starting new read at: 0x{checkpoint:08X}') try: for prefix, name, spec, conversion in formatter.parse(mainspec): if prefix: fields = reader.read_struct(fixorder(prefix)) if field_format: codes = re.findall('[?cbBhHiIlLqQnNefdspPauwgk]', prefix) if len(codes) != len(fields): codes = 'v' * len(fields) for code, field in zip(codes, fields): code = 'b' if code == '?' else code.lower() v = field_format.format_map({'c': code, 'n': field_counter}) meta[v] = field field_counter += 1 args.extend(fields) if name is None: continue if spec is None: spec = '' assert isinstance(spec, str) assert isinstance(name, str) field_counter += 1 if name and not name.isdecimal(): check_variable_name(name) if not conversion: peek = False else: alignment = PythonExpression.Evaluate(conversion, meta) if alignment == 0: peek = True else: _aa = reader.tell() reader.byte_align(alignment) _ab = reader.tell() if _aa != _ab: self.log_info(F'aligned from 0x{_aa:X} to 0x{_ab:X}') spec, _, pipeline = spec.partition(':') if spec: spec = meta.format_str(spec, self.codec, args) if spec: try: _exp = PythonExpression.Evaluate(spec, meta) except ParserError: pass else: spec = _exp if spec == '': last = value = reader.read(peek=peek) elif isinstance(spec, int): if spec < 0: spec += reader.remaining_bytes if spec < 0: raise ValueError(F'The specified negative read offset is {-spec} beyond the cursor.') last = value = reader.read_bytes(spec, peek=peek) else: value = reader.read_struct(fixorder(spec), peek=peek) if not value: self.log_debug(F'field {name} was empty, ignoring.') continue if len(value) > 1: self.log_info(F'parsing field {name} produced {len(value)} items reading a tuple') else: value = value[0] if pipeline: value = numseq(pipeline, reverse=True, seed=value) args.append(value) if name == _REST_MARKER: raise ValueError(F'Extracting a field with name {_REST_MARKER} is forbidden.') elif name.isdecimal(): index = int(name) limit = len(args) - 1 if index > limit: self.log_warn(F'cannot assign index field {name}, the highest index is {limit}') else: args[index] = value continue elif name: meta[name] = value if until and until(meta): self.log_info(F'the expression ({until}) evaluated to true; aborting.') break with StreamDetour(reader, checkpoint) as detour: full = reader.read(detour.cursor - checkpoint) if last is None: last = full outputs = [] symbols = dict(meta) symbols[_REST_MARKER] = last for template in self.args.outputs: used = set() outputs.append(meta.format(template, self.codec, [full, *args], symbols, used=used)) for key in used: if key in previously_existing_variables: continue meta.discard(key) for output in outputs: chunk = Chunk(output) chunk.meta.update(meta) chunk.set_next_batch(index) yield chunk except EOFError: break leftover = len(reader) - checkpoint if not leftover: return elif self.args.more: reader.seekset(checkpoint) yield reader.read() else: leftover = repr(SizeInt(leftover)).strip() self.log_info(F'discarding {leftover} left in buffer')Ancestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Inherited members