diff options
Diffstat (limited to 'netclass')
| -rw-r--r-- | netclass/__init__.py | 2 | ||||
| -rw-r--r-- | netclass/binaryformatter.py | 128 | ||||
| -rw-r--r-- | netclass/jsonconverter.py | 26 | ||||
| -rw-r--r-- | netclass/netclass.py | 53 |
4 files changed, 209 insertions, 0 deletions
diff --git a/netclass/__init__.py b/netclass/__init__.py new file mode 100644 index 0000000..8837c48 --- /dev/null +++ b/netclass/__init__.py @@ -0,0 +1,2 @@ + +from .netclass import netclass, netclass_root diff --git a/netclass/binaryformatter.py b/netclass/binaryformatter.py new file mode 100644 index 0000000..00fd327 --- /dev/null +++ b/netclass/binaryformatter.py @@ -0,0 +1,128 @@ +# Copyright 2020 Declan Hoare + +# This should really be split into two layers: +# - binaryformatter.py deals with transforming Python objects into +# NRBF meta-dictionaries and vice versa. +# - nrbf.py reads and writes the binary structure. +# To do this, netfleece should ideally be patched, because as-is, it +# changes the meta-dictionaries somewhat from how they are in the file: +# in particular, after reading *AndTypes records it will consume the +# following records and move them to a 'Values' member on the lead +# record. +# Anyway, I think the interface of this module is fine, so it will do +# as a black box for now. + +import struct + +import netfleece +from netfleece.netfleece import RecordTypeEnum + +from .netclass import netclass_root, classes + +def deserialise(stream): + def extract_value(meta): #Recover the underlying structure from NRBF meta-dictionary + if "Value" in meta: + return meta["Value"] + elif "Values" in meta: + return classes[meta["ClassInfo"]["Name"]].from_dict( + {n.split("<")[1].split(">")[0]: extract_value(v) # They look like this: <Name>k__BackingField for some reason + for n, v in zip(meta["ClassInfo"]["MemberNames"], meta["Values"])}) + elif meta["RecordTypeEnum"] == "ObjectNull": + return None + else: + raise ValueError(f"Unknown value format: {meta}") + dnb = netfleece.DNBinary(stream, expand = True) + dnb.parse() + meta = dnb.backfill() + return extract_value(meta) + +# Serialisation is limited, mostly only supporting the features needed +# for ShiftOS +_nrbf_header = b"\0\x01\0\0\0\xFF\xFF\xFF\xFF\x01\0\0\0\0\0\0\0" +_nrbf_footer = b"\x0B" + +s32 = struct.Struct("<i") + +def serialise(stream, obj): + + assemblies = [None] + last_id = 0 + + def write_byte(val): + stream.write(bytes([val])) + def write_s32(val): + stream.write(s32.pack(val)) + def write_string(s): + data = s.encode("utf-8") + n = len(data) + if n > 0x7FFFFFFF: + raise ValueError(f"String is too long ({n} bytes)") + while n > 0x7F: + write_byte((n & 0x7F) | 0x80) + n >>= 7 + write_byte(n) + stream.write(data) + + def next_id(): + nonlocal last_id + last_id += 1 + return last_id + + def write_record_type(typ): + write_byte(typ.value) + + def write_library(name): + library_id = len(assemblies) + write_record_type(RecordTypeEnum.BinaryLibrary) + write_s32(library_id) + write_string(name) + assemblies.append(name) + return library_id + + def library(name): + return assemblies.index(name) + + def write_object_string(object_id, val): + write_record_type(RecordTypeEnum.BinaryObjectString) + write_s32(object_id) + write_string(val) + + def write_object_null(): + write_record_type(RecordTypeEnum.ObjectNull) + + def write_class_with_members(object_id, val): + library_id = library(val._assembly) + write_record_type(RecordTypeEnum.ClassWithMembers) + write_s32(object_id) + write_string(val._name) + write_s32(len(val._members)) + for typ, name in val._members: + write_string(f"<{name}>k__BackingField") + write_s32(library_id) + + def walk_library(it): + if isinstance(it, netclass_root): + if it._assembly not in assemblies: + write_library(it._assembly) + for typ, name in it._members: + walk_library(it._contents[name]) + + def walk(it): + object_id = next_id() + if isinstance(it, netclass_root): + write_class_with_members(object_id, it) + for typ, name in it._members: + walk(it._contents[name]) + elif isinstance(it, str): + write_object_string(object_id, it) + elif it is None: + write_object_null() + else: + raise TypeError("Type not supported!") + return object_id + + stream.write(_nrbf_header) + walk_library(obj) + walk(obj) + stream.write(_nrbf_footer) + diff --git a/netclass/jsonconverter.py b/netclass/jsonconverter.py new file mode 100644 index 0000000..8029fcd --- /dev/null +++ b/netclass/jsonconverter.py @@ -0,0 +1,26 @@ + +import decimal +import json + +from .netclass import netclass_root + +class _encoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, netclass_root): + return obj._contents + elif isinstance(obj, decimal.Decimal): + return float(obj) + else: + return super().default(obj) + +def from_json(t, j): + print(repr(j)) + val = json.loads(j) + if issubclass(t, netclass_root): + val = t.from_dict(val) + if not isinstance(val, t): + raise ValueError(f"the JSON value was of type {type(val)}, not {t}") + return val + +def to_json(obj): + return json.dumps(obj, cls=_encoder) diff --git a/netclass/netclass.py b/netclass/netclass.py new file mode 100644 index 0000000..988bc77 --- /dev/null +++ b/netclass/netclass.py @@ -0,0 +1,53 @@ + +# The netclasses derive this so they can be identified. +class netclass_root: + pass + +classes = {} + +def netclass(name, assembly, members): + member_names = [n for _, n in members] + class proxy(netclass_root): + def __init__(self, *args, **kwargs): + if args == (): + super().__setattr__("_contents", kwargs) + else: + super().__setattr__("_contents", dict(zip(member_names, args))) + self._validate() + + def _validate(self): + if set(member_names) != set(self._contents.keys()): + raise TypeError("The instance does not have the correct members") + for typ, name in members: + val = self._contents[name] + if not isinstance(val, typ): + raise TypeError(f"{name} is {val}, must be {typ}") + + @staticmethod + def from_dict(d): + cleaned = {} + for typ, name in members: + val = d[name] + if isinstance(val, dict) and issubclass(typ, netclass_root): + val = typ.from_dict(val) + cleaned[name] = val + return proxy(**cleaned) + + def __getattr__(self, member): + try: + return self._contents[member] + except KeyError: + raise AttributeError(f"No such member {member}") + + def __setattr__(self, member, value): + if member in self._members: + self._contents[member] = value + else: + raise AttributeError(f"No such member {member}") + + proxy._name = name + proxy._assembly = assembly + proxy._members = members + proxy.__name__ = name.split(".")[-1] + classes[name] = proxy + return proxy |
