ethereum.rlp

.. _rlp:

Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. contents:: Table of Contents :backlinks: none :local:

Introduction

Defines the serialization and deserialization format used throughout Ethereum.

  1"""
  2.. _rlp:
  3
  4Recursive Length Prefix (RLP) Encoding
  5^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  6
  7.. contents:: Table of Contents
  8    :backlinks: none
  9    :local:
 10
 11Introduction
 12------------
 13
 14Defines the serialization and deserialization format used throughout Ethereum.
 15"""
 16
 17from dataclasses import astuple, fields, is_dataclass
 18from typing import Any, List, Sequence, Tuple, Type, TypeVar, Union, cast
 19
 20from ethereum.crypto.hash import Hash32, keccak256
 21from ethereum.exceptions import RLPDecodingError, RLPEncodingError
 22from ethereum.utils.ensure import ensure
 23
 24from .base_types import Bytes, Bytes0, Bytes20, FixedBytes, FixedUint, Uint
 25
 26RLP = Any
 27
 28
 29#
 30# RLP Encode
 31#
 32
 33
 34def encode(raw_data: RLP) -> Bytes:
 35    """
 36    Encodes `raw_data` into a sequence of bytes using RLP.
 37
 38    Parameters
 39    ----------
 40    raw_data :
 41        A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable
 42        objects.
 43
 44    Returns
 45    -------
 46    encoded : `ethereum.base_types.Bytes`
 47        The RLP encoded bytes representing `raw_data`.
 48    """
 49    if isinstance(raw_data, (bytearray, bytes)):
 50        return encode_bytes(raw_data)
 51    elif isinstance(raw_data, (Uint, FixedUint)):
 52        return encode(raw_data.to_be_bytes())
 53    elif isinstance(raw_data, str):
 54        return encode_bytes(raw_data.encode())
 55    elif isinstance(raw_data, bool):
 56        if raw_data:
 57            return encode_bytes(b"\x01")
 58        else:
 59            return encode_bytes(b"")
 60    elif isinstance(raw_data, Sequence):
 61        return encode_sequence(raw_data)
 62    elif is_dataclass(raw_data):
 63        return encode(astuple(raw_data))
 64    else:
 65        raise RLPEncodingError(
 66            "RLP Encoding of type {} is not supported".format(type(raw_data))
 67        )
 68
 69
 70def encode_bytes(raw_bytes: Bytes) -> Bytes:
 71    """
 72    Encodes `raw_bytes`, a sequence of bytes, using RLP.
 73
 74    Parameters
 75    ----------
 76    raw_bytes :
 77        Bytes to encode with RLP.
 78
 79    Returns
 80    -------
 81    encoded : `ethereum.base_types.Bytes`
 82        The RLP encoded bytes representing `raw_bytes`.
 83    """
 84    len_raw_data = Uint(len(raw_bytes))
 85
 86    if len_raw_data == 1 and raw_bytes[0] < 0x80:
 87        return raw_bytes
 88    elif len_raw_data < 0x38:
 89        return bytes([0x80 + len_raw_data]) + raw_bytes
 90    else:
 91        # length of raw data represented as big endian bytes
 92        len_raw_data_as_be = len_raw_data.to_be_bytes()
 93        return (
 94            bytes([0xB7 + len(len_raw_data_as_be)])
 95            + len_raw_data_as_be
 96            + raw_bytes
 97        )
 98
 99
100def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes:
101    """
102    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
103
104    Parameters
105    ----------
106    raw_sequence :
107            Sequence of RLP encodable objects.
108
109    Returns
110    -------
111    encoded : `ethereum.base_types.Bytes`
112        The RLP encoded bytes representing `raw_sequence`.
113    """
114    joined_encodings = get_joined_encodings(raw_sequence)
115    len_joined_encodings = Uint(len(joined_encodings))
116
117    if len_joined_encodings < 0x38:
118        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
119    else:
120        len_joined_encodings_as_be = len_joined_encodings.to_be_bytes()
121        return (
122            Bytes([0xF7 + len(len_joined_encodings_as_be)])
123            + len_joined_encodings_as_be
124            + joined_encodings
125        )
126
127
128def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes:
129    """
130    Obtain concatenation of rlp encoding for each item in the sequence
131    raw_sequence.
132
133    Parameters
134    ----------
135    raw_sequence :
136        Sequence to encode with RLP.
137
138    Returns
139    -------
140    joined_encodings : `ethereum.base_types.Bytes`
141        The concatenated RLP encoded bytes for each item in sequence
142        raw_sequence.
143    """
144    return b"".join(encode(item) for item in raw_sequence)
145
146
147#
148# RLP Decode
149#
150
151
152def decode(encoded_data: Bytes) -> RLP:
153    """
154    Decodes an integer, byte sequence, or list of RLP encodable objects
155    from the byte sequence `encoded_data`, using RLP.
156
157    Parameters
158    ----------
159    encoded_data :
160        A sequence of bytes, in RLP form.
161
162    Returns
163    -------
164    decoded_data : `RLP`
165        Object decoded from `encoded_data`.
166    """
167    # Raising error as there can never be empty encoded data for any
168    # given raw data (including empty raw data)
169    # RLP Encoding(b'') -> [0x80]  # noqa: SC100
170    # RLP Encoding([])  -> [0xc0]  # noqa: SC100
171    ensure(
172        len(encoded_data) > 0,
173        RLPDecodingError("Cannot decode empty bytestring"),
174    )
175
176    if encoded_data[0] <= 0xBF:
177        # This means that the raw data is of type bytes
178        return decode_to_bytes(encoded_data)
179    else:
180        # This means that the raw data is of type sequence
181        return decode_to_sequence(encoded_data)
182
183
184T = TypeVar("T")
185
186
187def decode_to(cls: Type[T], encoded_data: Bytes) -> T:
188    """
189    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
190    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
191
192    Parameters
193    ----------
194    cls: `Type[T]`
195        The type to decode to.
196    encoded_data :
197        A sequence of bytes, in RLP form.
198
199    Returns
200    -------
201    decoded_data : `T`
202        Object decoded from `encoded_data`.
203    """
204    return _decode_to(cls, decode(encoded_data))
205
206
207def _decode_to(cls: Type[T], raw_rlp: RLP) -> T:
208    """
209    Decode the rlp structure in `encoded_data` to an object of type `cls`.
210    `cls` can be a `Bytes` subclass, a dataclass, `Uint`, `U256`,
211    `Tuple[cls, ...]`, `Tuple[cls1, cls2]` or `Union[Bytes, cls]`.
212
213    Parameters
214    ----------
215    cls: `Type[T]`
216        The type to decode to.
217    raw_rlp :
218        A decoded rlp structure.
219
220    Returns
221    -------
222    decoded_data : `T`
223        Object decoded from `encoded_data`.
224    """
225    if isinstance(cls, type(Tuple[Uint, ...])) and cls._name == "Tuple":  # type: ignore # noqa: E501
226        ensure(isinstance(raw_rlp, list), RLPDecodingError)
227        if cls.__args__[1] == ...:  # type: ignore
228            args = []
229            for raw_item in raw_rlp:
230                args.append(_decode_to(cls.__args__[0], raw_item))  # type: ignore # noqa: E501
231            return tuple(args)  # type: ignore
232        else:
233            args = []
234            ensure(len(raw_rlp) == len(cls.__args__), RLPDecodingError)  # type: ignore # noqa: E501
235            for t, raw_item in zip(cls.__args__, raw_rlp):  # type: ignore
236                args.append(_decode_to(t, raw_item))
237            return tuple(args)  # type: ignore
238    elif cls == Union[Bytes0, Bytes20]:
239        # We can't support Union types in general, so we support this one
240        # (which appears in the Transaction type) as a special case
241        ensure(isinstance(raw_rlp, Bytes), RLPDecodingError)
242        if len(raw_rlp) == 0:
243            return Bytes0()  # type: ignore
244        elif len(raw_rlp) == 20:
245            return Bytes20(raw_rlp)  # type: ignore
246        else:
247            raise RLPDecodingError(
248                "Bytes has length {}, expected 0 or 20".format(len(raw_rlp))
249            )
250    elif isinstance(cls, type(List[Bytes])) and cls._name == "List":  # type: ignore # noqa: E501
251        ensure(isinstance(raw_rlp, list), RLPDecodingError)
252        items = []
253        for raw_item in raw_rlp:
254            items.append(_decode_to(cls.__args__[0], raw_item))  # type: ignore
255        return items  # type: ignore
256    elif isinstance(cls, type(Union[Bytes, List[Bytes]])) and cls.__origin__ == Union:  # type: ignore # noqa: E501
257        if len(cls.__args__) != 2 or Bytes not in cls.__args__:  # type: ignore
258            raise RLPDecodingError(
259                "RLP Decoding to type {} is not supported".format(cls)
260            )
261        if isinstance(raw_rlp, Bytes):
262            return raw_rlp  # type: ignore
263        elif cls.__args__[0] == Bytes:  # type: ignore
264            return _decode_to(cls.__args__[1], raw_rlp)  # type: ignore
265        else:
266            return _decode_to(cls.__args__[0], raw_rlp)  # type: ignore
267    elif issubclass(cls, bool):
268        if raw_rlp == b"\x01":
269            return cls(True)  # type: ignore
270        elif raw_rlp == b"":
271            return cls(False)  # type: ignore
272        else:
273            raise TypeError("Cannot decode {} as {}".format(raw_rlp, cls))
274    elif issubclass(cls, FixedBytes):
275        ensure(isinstance(raw_rlp, Bytes), RLPDecodingError)
276        ensure(len(raw_rlp) == cls.LENGTH, RLPDecodingError)
277        return raw_rlp
278    elif issubclass(cls, Bytes):
279        ensure(isinstance(raw_rlp, Bytes), RLPDecodingError)
280        return raw_rlp
281    elif issubclass(cls, (Uint, FixedUint)):
282        ensure(isinstance(raw_rlp, Bytes), RLPDecodingError)
283        try:
284            return cls.from_be_bytes(raw_rlp)  # type: ignore
285        except ValueError:
286            raise RLPDecodingError
287    elif is_dataclass(cls):
288        ensure(isinstance(raw_rlp, list), RLPDecodingError)
289        assert isinstance(raw_rlp, list)
290        args = []
291        ensure(len(fields(cls)) == len(raw_rlp), RLPDecodingError)
292        for field, rlp_item in zip(fields(cls), raw_rlp):
293            args.append(_decode_to(field.type, rlp_item))
294        return cast(T, cls(*args))
295    else:
296        raise RLPDecodingError(
297            "RLP Decoding to type {} is not supported".format(cls)
298        )
299
300
301def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
302    """
303    Decodes a rlp encoded byte stream assuming that the decoded data
304    should be of type `bytes`.
305
306    Parameters
307    ----------
308    encoded_bytes :
309        RLP encoded byte stream.
310
311    Returns
312    -------
313    decoded : `ethereum.base_types.Bytes`
314        RLP decoded Bytes data
315    """
316    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
317        return encoded_bytes
318    elif encoded_bytes[0] <= 0xB7:
319        len_raw_data = encoded_bytes[0] - 0x80
320        ensure(len_raw_data < len(encoded_bytes), RLPDecodingError)
321        raw_data = encoded_bytes[1 : 1 + len_raw_data]
322        ensure(
323            not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError
324        )
325        return raw_data
326    else:
327        # This is the index in the encoded data at which decoded data
328        # starts from.
329        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
330        ensure(
331            decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError
332        )
333        # Expectation is that the big endian bytes shouldn't start with 0
334        # while trying to decode using RLP, in which case is an error.
335        ensure(encoded_bytes[1] != 0, RLPDecodingError)
336        len_decoded_data = Uint.from_be_bytes(
337            encoded_bytes[1:decoded_data_start_idx]
338        )
339        ensure(len_decoded_data >= 0x38, RLPDecodingError)
340        decoded_data_end_idx = decoded_data_start_idx + len_decoded_data
341        ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError)
342        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]
343
344
345def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]:
346    """
347    Decodes a rlp encoded byte stream assuming that the decoded data
348    should be of type `Sequence` of objects.
349
350    Parameters
351    ----------
352    encoded_sequence :
353        An RLP encoded Sequence.
354
355    Returns
356    -------
357    decoded : `Sequence[RLP]`
358        Sequence of objects decoded from `encoded_sequence`.
359    """
360    if encoded_sequence[0] <= 0xF7:
361        len_joined_encodings = encoded_sequence[0] - 0xC0
362        ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError)
363        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
364    else:
365        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
366        ensure(
367            joined_encodings_start_idx - 1 < len(encoded_sequence),
368            RLPDecodingError,
369        )
370        # Expectation is that the big endian bytes shouldn't start with 0
371        # while trying to decode using RLP, in which case is an error.
372        ensure(encoded_sequence[1] != 0, RLPDecodingError)
373        len_joined_encodings = Uint.from_be_bytes(
374            encoded_sequence[1:joined_encodings_start_idx]
375        )
376        ensure(len_joined_encodings >= 0x38, RLPDecodingError)
377        joined_encodings_end_idx = (
378            joined_encodings_start_idx + len_joined_encodings
379        )
380        ensure(
381            joined_encodings_end_idx - 1 < len(encoded_sequence),
382            RLPDecodingError,
383        )
384        joined_encodings = encoded_sequence[
385            joined_encodings_start_idx:joined_encodings_end_idx
386        ]
387
388    return decode_joined_encodings(joined_encodings)
389
390
391def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]:
392    """
393    Decodes `joined_encodings`, which is a concatenation of RLP encoded
394    objects.
395
396    Parameters
397    ----------
398    joined_encodings :
399        concatenation of RLP encoded objects
400
401    Returns
402    -------
403    decoded : `List[RLP]`
404        A list of objects decoded from `joined_encodings`.
405    """
406    decoded_sequence = []
407
408    item_start_idx = 0
409    while item_start_idx < len(joined_encodings):
410        encoded_item_length = decode_item_length(
411            joined_encodings[item_start_idx:]
412        )
413        ensure(
414            item_start_idx + encoded_item_length - 1 < len(joined_encodings),
415            RLPDecodingError,
416        )
417        encoded_item = joined_encodings[
418            item_start_idx : item_start_idx + encoded_item_length
419        ]
420        decoded_sequence.append(decode(encoded_item))
421        item_start_idx += encoded_item_length
422
423    return decoded_sequence
424
425
426def decode_item_length(encoded_data: Bytes) -> int:
427    """
428    Find the length of the rlp encoding for the first object in the
429    encoded sequence.
430    Here `encoded_data` refers to concatenation of rlp encoding for each
431    item in a sequence.
432
433    NOTE - This is a helper function not described in the spec. It was
434    introduced as the spec doesn't discuss about decoding the RLP encoded
435    data.
436
437    Parameters
438    ----------
439    encoded_data :
440        RLP encoded data for a sequence of objects.
441
442    Returns
443    -------
444    rlp_length : `int`
445    """
446    # Can't decode item length for empty encoding
447    ensure(len(encoded_data) > 0, RLPDecodingError)
448
449    first_rlp_byte = Uint(encoded_data[0])
450
451    # This is the length of the big endian representation of the length of
452    # rlp encoded object byte stream.
453    length_length = Uint(0)
454    decoded_data_length = 0
455
456    # This occurs only when the raw_data is a single byte whose value < 128
457    if first_rlp_byte < 0x80:
458        # We return 1 here, as the end formula
459        # 1 + length_length + decoded_data_length would be invalid for
460        # this case.
461        return 1
462    # This occurs only when the raw_data is a byte stream with length < 56
463    # and doesn't fall into the above cases
464    elif first_rlp_byte <= 0xB7:
465        decoded_data_length = first_rlp_byte - 0x80
466    # This occurs only when the raw_data is a byte stream and doesn't fall
467    # into the above cases
468    elif first_rlp_byte <= 0xBF:
469        length_length = first_rlp_byte - 0xB7
470        ensure(length_length < len(encoded_data), RLPDecodingError)
471        # Expectation is that the big endian bytes shouldn't start with 0
472        # while trying to decode using RLP, in which case is an error.
473        ensure(encoded_data[1] != 0, RLPDecodingError)
474        decoded_data_length = Uint.from_be_bytes(
475            encoded_data[1 : 1 + length_length]
476        )
477    # This occurs only when the raw_data is a sequence of objects with
478    # length(concatenation of encoding of each object) < 56
479    elif first_rlp_byte <= 0xF7:
480        decoded_data_length = first_rlp_byte - 0xC0
481    # This occurs only when the raw_data is a sequence of objects and
482    # doesn't fall into the above cases.
483    elif first_rlp_byte <= 0xFF:
484        length_length = first_rlp_byte - 0xF7
485        ensure(length_length < len(encoded_data), RLPDecodingError)
486        # Expectation is that the big endian bytes shouldn't start with 0
487        # while trying to decode using RLP, in which case is an error.
488        ensure(encoded_data[1] != 0, RLPDecodingError)
489        decoded_data_length = Uint.from_be_bytes(
490            encoded_data[1 : 1 + length_length]
491        )
492
493    return 1 + length_length + decoded_data_length
494
495
496def rlp_hash(data: RLP) -> Hash32:
497    """
498    Obtain the keccak-256 hash of the rlp encoding of the passed in data.
499
500    Parameters
501    ----------
502    data :
503        The data for which we need the rlp hash.
504
505    Returns
506    -------
507    hash : `Hash32`
508        The rlp hash of the passed in data.
509    """
510    return keccak256(encode(data))
def encode(raw_data: Any) -> bytes:
35def encode(raw_data: RLP) -> Bytes:
36    """
37    Encodes `raw_data` into a sequence of bytes using RLP.
38
39    Parameters
40    ----------
41    raw_data :
42        A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable
43        objects.
44
45    Returns
46    -------
47    encoded : `ethereum.base_types.Bytes`
48        The RLP encoded bytes representing `raw_data`.
49    """
50    if isinstance(raw_data, (bytearray, bytes)):
51        return encode_bytes(raw_data)
52    elif isinstance(raw_data, (Uint, FixedUint)):
53        return encode(raw_data.to_be_bytes())
54    elif isinstance(raw_data, str):
55        return encode_bytes(raw_data.encode())
56    elif isinstance(raw_data, bool):
57        if raw_data:
58            return encode_bytes(b"\x01")
59        else:
60            return encode_bytes(b"")
61    elif isinstance(raw_data, Sequence):
62        return encode_sequence(raw_data)
63    elif is_dataclass(raw_data):
64        return encode(astuple(raw_data))
65    else:
66        raise RLPEncodingError(
67            "RLP Encoding of type {} is not supported".format(type(raw_data))
68        )

Encodes raw_data into a sequence of bytes using RLP.

Parameters

raw_data : A Bytes, Uint, Uint256 or sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_data.

def encode_bytes(raw_bytes: bytes) -> bytes:
71def encode_bytes(raw_bytes: Bytes) -> Bytes:
72    """
73    Encodes `raw_bytes`, a sequence of bytes, using RLP.
74
75    Parameters
76    ----------
77    raw_bytes :
78        Bytes to encode with RLP.
79
80    Returns
81    -------
82    encoded : `ethereum.base_types.Bytes`
83        The RLP encoded bytes representing `raw_bytes`.
84    """
85    len_raw_data = Uint(len(raw_bytes))
86
87    if len_raw_data == 1 and raw_bytes[0] < 0x80:
88        return raw_bytes
89    elif len_raw_data < 0x38:
90        return bytes([0x80 + len_raw_data]) + raw_bytes
91    else:
92        # length of raw data represented as big endian bytes
93        len_raw_data_as_be = len_raw_data.to_be_bytes()
94        return (
95            bytes([0xB7 + len(len_raw_data_as_be)])
96            + len_raw_data_as_be
97            + raw_bytes
98        )

Encodes raw_bytes, a sequence of bytes, using RLP.

Parameters

raw_bytes : Bytes to encode with RLP.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_bytes.

def encode_sequence(raw_sequence: Sequence[Any]) -> bytes:
101def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes:
102    """
103    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
104
105    Parameters
106    ----------
107    raw_sequence :
108            Sequence of RLP encodable objects.
109
110    Returns
111    -------
112    encoded : `ethereum.base_types.Bytes`
113        The RLP encoded bytes representing `raw_sequence`.
114    """
115    joined_encodings = get_joined_encodings(raw_sequence)
116    len_joined_encodings = Uint(len(joined_encodings))
117
118    if len_joined_encodings < 0x38:
119        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
120    else:
121        len_joined_encodings_as_be = len_joined_encodings.to_be_bytes()
122        return (
123            Bytes([0xF7 + len(len_joined_encodings_as_be)])
124            + len_joined_encodings_as_be
125            + joined_encodings
126        )

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

Parameters

raw_sequence : Sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_sequence.

def get_joined_encodings(raw_sequence: Sequence[Any]) -> bytes:
129def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes:
130    """
131    Obtain concatenation of rlp encoding for each item in the sequence
132    raw_sequence.
133
134    Parameters
135    ----------
136    raw_sequence :
137        Sequence to encode with RLP.
138
139    Returns
140    -------
141    joined_encodings : `ethereum.base_types.Bytes`
142        The concatenated RLP encoded bytes for each item in sequence
143        raw_sequence.
144    """
145    return b"".join(encode(item) for item in raw_sequence)

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

Parameters

raw_sequence : Sequence to encode with RLP.

Returns

joined_encodings : ethereum.base_types.Bytes The concatenated RLP encoded bytes for each item in sequence raw_sequence.

def decode(encoded_data: bytes) -> Any:
153def decode(encoded_data: Bytes) -> RLP:
154    """
155    Decodes an integer, byte sequence, or list of RLP encodable objects
156    from the byte sequence `encoded_data`, using RLP.
157
158    Parameters
159    ----------
160    encoded_data :
161        A sequence of bytes, in RLP form.
162
163    Returns
164    -------
165    decoded_data : `RLP`
166        Object decoded from `encoded_data`.
167    """
168    # Raising error as there can never be empty encoded data for any
169    # given raw data (including empty raw data)
170    # RLP Encoding(b'') -> [0x80]  # noqa: SC100
171    # RLP Encoding([])  -> [0xc0]  # noqa: SC100
172    ensure(
173        len(encoded_data) > 0,
174        RLPDecodingError("Cannot decode empty bytestring"),
175    )
176
177    if encoded_data[0] <= 0xBF:
178        # This means that the raw data is of type bytes
179        return decode_to_bytes(encoded_data)
180    else:
181        # This means that the raw data is of type sequence
182        return decode_to_sequence(encoded_data)

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

Parameters

encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : RLP Object decoded from encoded_data.

def decode_to(cls: Type[~T], encoded_data: bytes) -> ~T:
188def decode_to(cls: Type[T], encoded_data: Bytes) -> T:
189    """
190    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
191    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
192
193    Parameters
194    ----------
195    cls: `Type[T]`
196        The type to decode to.
197    encoded_data :
198        A sequence of bytes, in RLP form.
199
200    Returns
201    -------
202    decoded_data : `T`
203        Object decoded from `encoded_data`.
204    """
205    return _decode_to(cls, decode(encoded_data))

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

Parameters

cls: Type[T] The type to decode to. encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : T Object decoded from encoded_data.

def decode_to_bytes(encoded_bytes: bytes) -> bytes:
302def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
303    """
304    Decodes a rlp encoded byte stream assuming that the decoded data
305    should be of type `bytes`.
306
307    Parameters
308    ----------
309    encoded_bytes :
310        RLP encoded byte stream.
311
312    Returns
313    -------
314    decoded : `ethereum.base_types.Bytes`
315        RLP decoded Bytes data
316    """
317    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
318        return encoded_bytes
319    elif encoded_bytes[0] <= 0xB7:
320        len_raw_data = encoded_bytes[0] - 0x80
321        ensure(len_raw_data < len(encoded_bytes), RLPDecodingError)
322        raw_data = encoded_bytes[1 : 1 + len_raw_data]
323        ensure(
324            not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError
325        )
326        return raw_data
327    else:
328        # This is the index in the encoded data at which decoded data
329        # starts from.
330        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
331        ensure(
332            decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError
333        )
334        # Expectation is that the big endian bytes shouldn't start with 0
335        # while trying to decode using RLP, in which case is an error.
336        ensure(encoded_bytes[1] != 0, RLPDecodingError)
337        len_decoded_data = Uint.from_be_bytes(
338            encoded_bytes[1:decoded_data_start_idx]
339        )
340        ensure(len_decoded_data >= 0x38, RLPDecodingError)
341        decoded_data_end_idx = decoded_data_start_idx + len_decoded_data
342        ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError)
343        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

Parameters

encoded_bytes : RLP encoded byte stream.

Returns

decoded : ethereum.base_types.Bytes RLP decoded Bytes data

def decode_to_sequence(encoded_sequence: bytes) -> List[Any]:
346def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]:
347    """
348    Decodes a rlp encoded byte stream assuming that the decoded data
349    should be of type `Sequence` of objects.
350
351    Parameters
352    ----------
353    encoded_sequence :
354        An RLP encoded Sequence.
355
356    Returns
357    -------
358    decoded : `Sequence[RLP]`
359        Sequence of objects decoded from `encoded_sequence`.
360    """
361    if encoded_sequence[0] <= 0xF7:
362        len_joined_encodings = encoded_sequence[0] - 0xC0
363        ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError)
364        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
365    else:
366        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
367        ensure(
368            joined_encodings_start_idx - 1 < len(encoded_sequence),
369            RLPDecodingError,
370        )
371        # Expectation is that the big endian bytes shouldn't start with 0
372        # while trying to decode using RLP, in which case is an error.
373        ensure(encoded_sequence[1] != 0, RLPDecodingError)
374        len_joined_encodings = Uint.from_be_bytes(
375            encoded_sequence[1:joined_encodings_start_idx]
376        )
377        ensure(len_joined_encodings >= 0x38, RLPDecodingError)
378        joined_encodings_end_idx = (
379            joined_encodings_start_idx + len_joined_encodings
380        )
381        ensure(
382            joined_encodings_end_idx - 1 < len(encoded_sequence),
383            RLPDecodingError,
384        )
385        joined_encodings = encoded_sequence[
386            joined_encodings_start_idx:joined_encodings_end_idx
387        ]
388
389    return decode_joined_encodings(joined_encodings)

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

Parameters

encoded_sequence : An RLP encoded Sequence.

Returns

decoded : Sequence[RLP] Sequence of objects decoded from encoded_sequence.

def decode_joined_encodings(joined_encodings: bytes) -> List[Any]:
392def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]:
393    """
394    Decodes `joined_encodings`, which is a concatenation of RLP encoded
395    objects.
396
397    Parameters
398    ----------
399    joined_encodings :
400        concatenation of RLP encoded objects
401
402    Returns
403    -------
404    decoded : `List[RLP]`
405        A list of objects decoded from `joined_encodings`.
406    """
407    decoded_sequence = []
408
409    item_start_idx = 0
410    while item_start_idx < len(joined_encodings):
411        encoded_item_length = decode_item_length(
412            joined_encodings[item_start_idx:]
413        )
414        ensure(
415            item_start_idx + encoded_item_length - 1 < len(joined_encodings),
416            RLPDecodingError,
417        )
418        encoded_item = joined_encodings[
419            item_start_idx : item_start_idx + encoded_item_length
420        ]
421        decoded_sequence.append(decode(encoded_item))
422        item_start_idx += encoded_item_length
423
424    return decoded_sequence

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

Parameters

joined_encodings : concatenation of RLP encoded objects

Returns

decoded : List[RLP] A list of objects decoded from joined_encodings.

def decode_item_length(encoded_data: bytes) -> int:
427def decode_item_length(encoded_data: Bytes) -> int:
428    """
429    Find the length of the rlp encoding for the first object in the
430    encoded sequence.
431    Here `encoded_data` refers to concatenation of rlp encoding for each
432    item in a sequence.
433
434    NOTE - This is a helper function not described in the spec. It was
435    introduced as the spec doesn't discuss about decoding the RLP encoded
436    data.
437
438    Parameters
439    ----------
440    encoded_data :
441        RLP encoded data for a sequence of objects.
442
443    Returns
444    -------
445    rlp_length : `int`
446    """
447    # Can't decode item length for empty encoding
448    ensure(len(encoded_data) > 0, RLPDecodingError)
449
450    first_rlp_byte = Uint(encoded_data[0])
451
452    # This is the length of the big endian representation of the length of
453    # rlp encoded object byte stream.
454    length_length = Uint(0)
455    decoded_data_length = 0
456
457    # This occurs only when the raw_data is a single byte whose value < 128
458    if first_rlp_byte < 0x80:
459        # We return 1 here, as the end formula
460        # 1 + length_length + decoded_data_length would be invalid for
461        # this case.
462        return 1
463    # This occurs only when the raw_data is a byte stream with length < 56
464    # and doesn't fall into the above cases
465    elif first_rlp_byte <= 0xB7:
466        decoded_data_length = first_rlp_byte - 0x80
467    # This occurs only when the raw_data is a byte stream and doesn't fall
468    # into the above cases
469    elif first_rlp_byte <= 0xBF:
470        length_length = first_rlp_byte - 0xB7
471        ensure(length_length < len(encoded_data), RLPDecodingError)
472        # Expectation is that the big endian bytes shouldn't start with 0
473        # while trying to decode using RLP, in which case is an error.
474        ensure(encoded_data[1] != 0, RLPDecodingError)
475        decoded_data_length = Uint.from_be_bytes(
476            encoded_data[1 : 1 + length_length]
477        )
478    # This occurs only when the raw_data is a sequence of objects with
479    # length(concatenation of encoding of each object) < 56
480    elif first_rlp_byte <= 0xF7:
481        decoded_data_length = first_rlp_byte - 0xC0
482    # This occurs only when the raw_data is a sequence of objects and
483    # doesn't fall into the above cases.
484    elif first_rlp_byte <= 0xFF:
485        length_length = first_rlp_byte - 0xF7
486        ensure(length_length < len(encoded_data), RLPDecodingError)
487        # Expectation is that the big endian bytes shouldn't start with 0
488        # while trying to decode using RLP, in which case is an error.
489        ensure(encoded_data[1] != 0, RLPDecodingError)
490        decoded_data_length = Uint.from_be_bytes(
491            encoded_data[1 : 1 + length_length]
492        )
493
494    return 1 + length_length + decoded_data_length

Find the length of the rlp encoding for the first object in the encoded sequence. Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.

Parameters

encoded_data : RLP encoded data for a sequence of objects.

Returns

rlp_length : int

def rlp_hash(data: Any) -> ethereum.base_types.Bytes32:
497def rlp_hash(data: RLP) -> Hash32:
498    """
499    Obtain the keccak-256 hash of the rlp encoding of the passed in data.
500
501    Parameters
502    ----------
503    data :
504        The data for which we need the rlp hash.
505
506    Returns
507    -------
508    hash : `Hash32`
509        The rlp hash of the passed in data.
510    """
511    return keccak256(encode(data))

Obtain the keccak-256 hash of the rlp encoding of the passed in data.

Parameters

data : The data for which we need the rlp hash.

Returns

hash : Hash32 The rlp hash of the passed in data.