ethereum.rlp
.. _rlp:
Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. contents:: Table of Contents :backlinks: none :local:
Introduction
Defines the serialization and deserialization format used throughout Ethereum.
1""" 2.. _rlp: 3 4Recursive Length Prefix (RLP) Encoding 5^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 6 7.. contents:: Table of Contents 8 :backlinks: none 9 :local: 10 11Introduction 12------------ 13 14Defines the serialization and deserialization format used throughout Ethereum. 15""" 16 17from dataclasses import astuple, fields, is_dataclass 18from typing import Any, List, Sequence, Tuple, Type, TypeVar, Union, cast 19 20from ethereum.crypto.hash import Hash32, keccak256 21from ethereum.exceptions import RLPDecodingError, RLPEncodingError 22from ethereum.utils.ensure import ensure 23 24from .base_types import Bytes, Bytes0, Bytes20, FixedBytes, FixedUint, Uint 25 26RLP = Any 27 28 29# 30# RLP Encode 31# 32 33 34def encode(raw_data: RLP) -> Bytes: 35 """ 36 Encodes `raw_data` into a sequence of bytes using RLP. 37 38 Parameters 39 ---------- 40 raw_data : 41 A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable 42 objects. 43 44 Returns 45 ------- 46 encoded : `ethereum.base_types.Bytes` 47 The RLP encoded bytes representing `raw_data`. 48 """ 49 if isinstance(raw_data, (bytearray, bytes)): 50 return encode_bytes(raw_data) 51 elif isinstance(raw_data, (Uint, FixedUint)): 52 return encode(raw_data.to_be_bytes()) 53 elif isinstance(raw_data, str): 54 return encode_bytes(raw_data.encode()) 55 elif isinstance(raw_data, bool): 56 if raw_data: 57 return encode_bytes(b"\x01") 58 else: 59 return encode_bytes(b"") 60 elif isinstance(raw_data, Sequence): 61 return encode_sequence(raw_data) 62 elif is_dataclass(raw_data): 63 return encode(astuple(raw_data)) 64 else: 65 raise RLPEncodingError( 66 "RLP Encoding of type {} is not supported".format(type(raw_data)) 67 ) 68 69 70def encode_bytes(raw_bytes: Bytes) -> Bytes: 71 """ 72 Encodes `raw_bytes`, a sequence of bytes, using RLP. 73 74 Parameters 75 ---------- 76 raw_bytes : 77 Bytes to encode with RLP. 78 79 Returns 80 ------- 81 encoded : `ethereum.base_types.Bytes` 82 The RLP encoded bytes representing `raw_bytes`. 83 """ 84 len_raw_data = Uint(len(raw_bytes)) 85 86 if len_raw_data == 1 and raw_bytes[0] < 0x80: 87 return raw_bytes 88 elif len_raw_data < 0x38: 89 return bytes([0x80 + len_raw_data]) + raw_bytes 90 else: 91 # length of raw data represented as big endian bytes 92 len_raw_data_as_be = len_raw_data.to_be_bytes() 93 return ( 94 bytes([0xB7 + len(len_raw_data_as_be)]) 95 + len_raw_data_as_be 96 + raw_bytes 97 ) 98 99 100def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes: 101 """ 102 Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. 103 104 Parameters 105 ---------- 106 raw_sequence : 107 Sequence of RLP encodable objects. 108 109 Returns 110 ------- 111 encoded : `ethereum.base_types.Bytes` 112 The RLP encoded bytes representing `raw_sequence`. 113 """ 114 joined_encodings = get_joined_encodings(raw_sequence) 115 len_joined_encodings = Uint(len(joined_encodings)) 116 117 if len_joined_encodings < 0x38: 118 return Bytes([0xC0 + len_joined_encodings]) + joined_encodings 119 else: 120 len_joined_encodings_as_be = len_joined_encodings.to_be_bytes() 121 return ( 122 Bytes([0xF7 + len(len_joined_encodings_as_be)]) 123 + len_joined_encodings_as_be 124 + joined_encodings 125 ) 126 127 128def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes: 129 """ 130 Obtain concatenation of rlp encoding for each item in the sequence 131 raw_sequence. 132 133 Parameters 134 ---------- 135 raw_sequence : 136 Sequence to encode with RLP. 137 138 Returns 139 ------- 140 joined_encodings : `ethereum.base_types.Bytes` 141 The concatenated RLP encoded bytes for each item in sequence 142 raw_sequence. 143 """ 144 return b"".join(encode(item) for item in raw_sequence) 145 146 147# 148# RLP Decode 149# 150 151 152def decode(encoded_data: Bytes) -> RLP: 153 """ 154 Decodes an integer, byte sequence, or list of RLP encodable objects 155 from the byte sequence `encoded_data`, using RLP. 156 157 Parameters 158 ---------- 159 encoded_data : 160 A sequence of bytes, in RLP form. 161 162 Returns 163 ------- 164 decoded_data : `RLP` 165 Object decoded from `encoded_data`. 166 """ 167 # Raising error as there can never be empty encoded data for any 168 # given raw data (including empty raw data) 169 # RLP Encoding(b'') -> [0x80] # noqa: SC100 170 # RLP Encoding([]) -> [0xc0] # noqa: SC100 171 ensure( 172 len(encoded_data) > 0, 173 RLPDecodingError("Cannot decode empty bytestring"), 174 ) 175 176 if encoded_data[0] <= 0xBF: 177 # This means that the raw data is of type bytes 178 return decode_to_bytes(encoded_data) 179 else: 180 # This means that the raw data is of type sequence 181 return decode_to_sequence(encoded_data) 182 183 184T = TypeVar("T") 185 186 187def decode_to(cls: Type[T], encoded_data: Bytes) -> T: 188 """ 189 Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be 190 a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. 191 192 Parameters 193 ---------- 194 cls: `Type[T]` 195 The type to decode to. 196 encoded_data : 197 A sequence of bytes, in RLP form. 198 199 Returns 200 ------- 201 decoded_data : `T` 202 Object decoded from `encoded_data`. 203 """ 204 return _decode_to(cls, decode(encoded_data)) 205 206 207def _decode_to(cls: Type[T], raw_rlp: RLP) -> T: 208 """ 209 Decode the rlp structure in `encoded_data` to an object of type `cls`. 210 `cls` can be a `Bytes` subclass, a dataclass, `Uint`, `U256`, 211 `Tuple[cls, ...]`, `Tuple[cls1, cls2]` or `Union[Bytes, cls]`. 212 213 Parameters 214 ---------- 215 cls: `Type[T]` 216 The type to decode to. 217 raw_rlp : 218 A decoded rlp structure. 219 220 Returns 221 ------- 222 decoded_data : `T` 223 Object decoded from `encoded_data`. 224 """ 225 if isinstance(cls, type(Tuple[Uint, ...])) and cls._name == "Tuple": # type: ignore # noqa: E501 226 ensure(isinstance(raw_rlp, list), RLPDecodingError) 227 if cls.__args__[1] == ...: # type: ignore 228 args = [] 229 for raw_item in raw_rlp: 230 args.append(_decode_to(cls.__args__[0], raw_item)) # type: ignore # noqa: E501 231 return tuple(args) # type: ignore 232 else: 233 args = [] 234 ensure(len(raw_rlp) == len(cls.__args__), RLPDecodingError) # type: ignore # noqa: E501 235 for t, raw_item in zip(cls.__args__, raw_rlp): # type: ignore 236 args.append(_decode_to(t, raw_item)) 237 return tuple(args) # type: ignore 238 elif cls == Union[Bytes0, Bytes20]: 239 # We can't support Union types in general, so we support this one 240 # (which appears in the Transaction type) as a special case 241 ensure(isinstance(raw_rlp, Bytes), RLPDecodingError) 242 if len(raw_rlp) == 0: 243 return Bytes0() # type: ignore 244 elif len(raw_rlp) == 20: 245 return Bytes20(raw_rlp) # type: ignore 246 else: 247 raise RLPDecodingError( 248 "Bytes has length {}, expected 0 or 20".format(len(raw_rlp)) 249 ) 250 elif isinstance(cls, type(List[Bytes])) and cls._name == "List": # type: ignore # noqa: E501 251 ensure(isinstance(raw_rlp, list), RLPDecodingError) 252 items = [] 253 for raw_item in raw_rlp: 254 items.append(_decode_to(cls.__args__[0], raw_item)) # type: ignore 255 return items # type: ignore 256 elif isinstance(cls, type(Union[Bytes, List[Bytes]])) and cls.__origin__ == Union: # type: ignore # noqa: E501 257 if len(cls.__args__) != 2 or Bytes not in cls.__args__: # type: ignore 258 raise RLPDecodingError( 259 "RLP Decoding to type {} is not supported".format(cls) 260 ) 261 if isinstance(raw_rlp, Bytes): 262 return raw_rlp # type: ignore 263 elif cls.__args__[0] == Bytes: # type: ignore 264 return _decode_to(cls.__args__[1], raw_rlp) # type: ignore 265 else: 266 return _decode_to(cls.__args__[0], raw_rlp) # type: ignore 267 elif issubclass(cls, bool): 268 if raw_rlp == b"\x01": 269 return cls(True) # type: ignore 270 elif raw_rlp == b"": 271 return cls(False) # type: ignore 272 else: 273 raise TypeError("Cannot decode {} as {}".format(raw_rlp, cls)) 274 elif issubclass(cls, FixedBytes): 275 ensure(isinstance(raw_rlp, Bytes), RLPDecodingError) 276 ensure(len(raw_rlp) == cls.LENGTH, RLPDecodingError) 277 return raw_rlp 278 elif issubclass(cls, Bytes): 279 ensure(isinstance(raw_rlp, Bytes), RLPDecodingError) 280 return raw_rlp 281 elif issubclass(cls, (Uint, FixedUint)): 282 ensure(isinstance(raw_rlp, Bytes), RLPDecodingError) 283 try: 284 return cls.from_be_bytes(raw_rlp) # type: ignore 285 except ValueError: 286 raise RLPDecodingError 287 elif is_dataclass(cls): 288 ensure(isinstance(raw_rlp, list), RLPDecodingError) 289 assert isinstance(raw_rlp, list) 290 args = [] 291 ensure(len(fields(cls)) == len(raw_rlp), RLPDecodingError) 292 for field, rlp_item in zip(fields(cls), raw_rlp): 293 args.append(_decode_to(field.type, rlp_item)) 294 return cast(T, cls(*args)) 295 else: 296 raise RLPDecodingError( 297 "RLP Decoding to type {} is not supported".format(cls) 298 ) 299 300 301def decode_to_bytes(encoded_bytes: Bytes) -> Bytes: 302 """ 303 Decodes a rlp encoded byte stream assuming that the decoded data 304 should be of type `bytes`. 305 306 Parameters 307 ---------- 308 encoded_bytes : 309 RLP encoded byte stream. 310 311 Returns 312 ------- 313 decoded : `ethereum.base_types.Bytes` 314 RLP decoded Bytes data 315 """ 316 if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: 317 return encoded_bytes 318 elif encoded_bytes[0] <= 0xB7: 319 len_raw_data = encoded_bytes[0] - 0x80 320 ensure(len_raw_data < len(encoded_bytes), RLPDecodingError) 321 raw_data = encoded_bytes[1 : 1 + len_raw_data] 322 ensure( 323 not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError 324 ) 325 return raw_data 326 else: 327 # This is the index in the encoded data at which decoded data 328 # starts from. 329 decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 330 ensure( 331 decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError 332 ) 333 # Expectation is that the big endian bytes shouldn't start with 0 334 # while trying to decode using RLP, in which case is an error. 335 ensure(encoded_bytes[1] != 0, RLPDecodingError) 336 len_decoded_data = Uint.from_be_bytes( 337 encoded_bytes[1:decoded_data_start_idx] 338 ) 339 ensure(len_decoded_data >= 0x38, RLPDecodingError) 340 decoded_data_end_idx = decoded_data_start_idx + len_decoded_data 341 ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError) 342 return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] 343 344 345def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]: 346 """ 347 Decodes a rlp encoded byte stream assuming that the decoded data 348 should be of type `Sequence` of objects. 349 350 Parameters 351 ---------- 352 encoded_sequence : 353 An RLP encoded Sequence. 354 355 Returns 356 ------- 357 decoded : `Sequence[RLP]` 358 Sequence of objects decoded from `encoded_sequence`. 359 """ 360 if encoded_sequence[0] <= 0xF7: 361 len_joined_encodings = encoded_sequence[0] - 0xC0 362 ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError) 363 joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] 364 else: 365 joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 366 ensure( 367 joined_encodings_start_idx - 1 < len(encoded_sequence), 368 RLPDecodingError, 369 ) 370 # Expectation is that the big endian bytes shouldn't start with 0 371 # while trying to decode using RLP, in which case is an error. 372 ensure(encoded_sequence[1] != 0, RLPDecodingError) 373 len_joined_encodings = Uint.from_be_bytes( 374 encoded_sequence[1:joined_encodings_start_idx] 375 ) 376 ensure(len_joined_encodings >= 0x38, RLPDecodingError) 377 joined_encodings_end_idx = ( 378 joined_encodings_start_idx + len_joined_encodings 379 ) 380 ensure( 381 joined_encodings_end_idx - 1 < len(encoded_sequence), 382 RLPDecodingError, 383 ) 384 joined_encodings = encoded_sequence[ 385 joined_encodings_start_idx:joined_encodings_end_idx 386 ] 387 388 return decode_joined_encodings(joined_encodings) 389 390 391def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]: 392 """ 393 Decodes `joined_encodings`, which is a concatenation of RLP encoded 394 objects. 395 396 Parameters 397 ---------- 398 joined_encodings : 399 concatenation of RLP encoded objects 400 401 Returns 402 ------- 403 decoded : `List[RLP]` 404 A list of objects decoded from `joined_encodings`. 405 """ 406 decoded_sequence = [] 407 408 item_start_idx = 0 409 while item_start_idx < len(joined_encodings): 410 encoded_item_length = decode_item_length( 411 joined_encodings[item_start_idx:] 412 ) 413 ensure( 414 item_start_idx + encoded_item_length - 1 < len(joined_encodings), 415 RLPDecodingError, 416 ) 417 encoded_item = joined_encodings[ 418 item_start_idx : item_start_idx + encoded_item_length 419 ] 420 decoded_sequence.append(decode(encoded_item)) 421 item_start_idx += encoded_item_length 422 423 return decoded_sequence 424 425 426def decode_item_length(encoded_data: Bytes) -> int: 427 """ 428 Find the length of the rlp encoding for the first object in the 429 encoded sequence. 430 Here `encoded_data` refers to concatenation of rlp encoding for each 431 item in a sequence. 432 433 NOTE - This is a helper function not described in the spec. It was 434 introduced as the spec doesn't discuss about decoding the RLP encoded 435 data. 436 437 Parameters 438 ---------- 439 encoded_data : 440 RLP encoded data for a sequence of objects. 441 442 Returns 443 ------- 444 rlp_length : `int` 445 """ 446 # Can't decode item length for empty encoding 447 ensure(len(encoded_data) > 0, RLPDecodingError) 448 449 first_rlp_byte = Uint(encoded_data[0]) 450 451 # This is the length of the big endian representation of the length of 452 # rlp encoded object byte stream. 453 length_length = Uint(0) 454 decoded_data_length = 0 455 456 # This occurs only when the raw_data is a single byte whose value < 128 457 if first_rlp_byte < 0x80: 458 # We return 1 here, as the end formula 459 # 1 + length_length + decoded_data_length would be invalid for 460 # this case. 461 return 1 462 # This occurs only when the raw_data is a byte stream with length < 56 463 # and doesn't fall into the above cases 464 elif first_rlp_byte <= 0xB7: 465 decoded_data_length = first_rlp_byte - 0x80 466 # This occurs only when the raw_data is a byte stream and doesn't fall 467 # into the above cases 468 elif first_rlp_byte <= 0xBF: 469 length_length = first_rlp_byte - 0xB7 470 ensure(length_length < len(encoded_data), RLPDecodingError) 471 # Expectation is that the big endian bytes shouldn't start with 0 472 # while trying to decode using RLP, in which case is an error. 473 ensure(encoded_data[1] != 0, RLPDecodingError) 474 decoded_data_length = Uint.from_be_bytes( 475 encoded_data[1 : 1 + length_length] 476 ) 477 # This occurs only when the raw_data is a sequence of objects with 478 # length(concatenation of encoding of each object) < 56 479 elif first_rlp_byte <= 0xF7: 480 decoded_data_length = first_rlp_byte - 0xC0 481 # This occurs only when the raw_data is a sequence of objects and 482 # doesn't fall into the above cases. 483 elif first_rlp_byte <= 0xFF: 484 length_length = first_rlp_byte - 0xF7 485 ensure(length_length < len(encoded_data), RLPDecodingError) 486 # Expectation is that the big endian bytes shouldn't start with 0 487 # while trying to decode using RLP, in which case is an error. 488 ensure(encoded_data[1] != 0, RLPDecodingError) 489 decoded_data_length = Uint.from_be_bytes( 490 encoded_data[1 : 1 + length_length] 491 ) 492 493 return 1 + length_length + decoded_data_length 494 495 496def rlp_hash(data: RLP) -> Hash32: 497 """ 498 Obtain the keccak-256 hash of the rlp encoding of the passed in data. 499 500 Parameters 501 ---------- 502 data : 503 The data for which we need the rlp hash. 504 505 Returns 506 ------- 507 hash : `Hash32` 508 The rlp hash of the passed in data. 509 """ 510 return keccak256(encode(data))
35def encode(raw_data: RLP) -> Bytes: 36 """ 37 Encodes `raw_data` into a sequence of bytes using RLP. 38 39 Parameters 40 ---------- 41 raw_data : 42 A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable 43 objects. 44 45 Returns 46 ------- 47 encoded : `ethereum.base_types.Bytes` 48 The RLP encoded bytes representing `raw_data`. 49 """ 50 if isinstance(raw_data, (bytearray, bytes)): 51 return encode_bytes(raw_data) 52 elif isinstance(raw_data, (Uint, FixedUint)): 53 return encode(raw_data.to_be_bytes()) 54 elif isinstance(raw_data, str): 55 return encode_bytes(raw_data.encode()) 56 elif isinstance(raw_data, bool): 57 if raw_data: 58 return encode_bytes(b"\x01") 59 else: 60 return encode_bytes(b"") 61 elif isinstance(raw_data, Sequence): 62 return encode_sequence(raw_data) 63 elif is_dataclass(raw_data): 64 return encode(astuple(raw_data)) 65 else: 66 raise RLPEncodingError( 67 "RLP Encoding of type {} is not supported".format(type(raw_data)) 68 )
Encodes raw_data
into a sequence of bytes using RLP.
Parameters
raw_data :
A Bytes
, Uint
, Uint256
or sequence of RLP
encodable
objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_data
.
71def encode_bytes(raw_bytes: Bytes) -> Bytes: 72 """ 73 Encodes `raw_bytes`, a sequence of bytes, using RLP. 74 75 Parameters 76 ---------- 77 raw_bytes : 78 Bytes to encode with RLP. 79 80 Returns 81 ------- 82 encoded : `ethereum.base_types.Bytes` 83 The RLP encoded bytes representing `raw_bytes`. 84 """ 85 len_raw_data = Uint(len(raw_bytes)) 86 87 if len_raw_data == 1 and raw_bytes[0] < 0x80: 88 return raw_bytes 89 elif len_raw_data < 0x38: 90 return bytes([0x80 + len_raw_data]) + raw_bytes 91 else: 92 # length of raw data represented as big endian bytes 93 len_raw_data_as_be = len_raw_data.to_be_bytes() 94 return ( 95 bytes([0xB7 + len(len_raw_data_as_be)]) 96 + len_raw_data_as_be 97 + raw_bytes 98 )
Encodes raw_bytes
, a sequence of bytes, using RLP.
Parameters
raw_bytes : Bytes to encode with RLP.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_bytes
.
101def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes: 102 """ 103 Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. 104 105 Parameters 106 ---------- 107 raw_sequence : 108 Sequence of RLP encodable objects. 109 110 Returns 111 ------- 112 encoded : `ethereum.base_types.Bytes` 113 The RLP encoded bytes representing `raw_sequence`. 114 """ 115 joined_encodings = get_joined_encodings(raw_sequence) 116 len_joined_encodings = Uint(len(joined_encodings)) 117 118 if len_joined_encodings < 0x38: 119 return Bytes([0xC0 + len_joined_encodings]) + joined_encodings 120 else: 121 len_joined_encodings_as_be = len_joined_encodings.to_be_bytes() 122 return ( 123 Bytes([0xF7 + len(len_joined_encodings_as_be)]) 124 + len_joined_encodings_as_be 125 + joined_encodings 126 )
Encodes a list of RLP encodable objects (raw_sequence
) using RLP.
Parameters
raw_sequence : Sequence of RLP encodable objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_sequence
.
129def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes: 130 """ 131 Obtain concatenation of rlp encoding for each item in the sequence 132 raw_sequence. 133 134 Parameters 135 ---------- 136 raw_sequence : 137 Sequence to encode with RLP. 138 139 Returns 140 ------- 141 joined_encodings : `ethereum.base_types.Bytes` 142 The concatenated RLP encoded bytes for each item in sequence 143 raw_sequence. 144 """ 145 return b"".join(encode(item) for item in raw_sequence)
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
Parameters
raw_sequence : Sequence to encode with RLP.
Returns
joined_encodings : ethereum.base_types.Bytes
The concatenated RLP encoded bytes for each item in sequence
raw_sequence.
153def decode(encoded_data: Bytes) -> RLP: 154 """ 155 Decodes an integer, byte sequence, or list of RLP encodable objects 156 from the byte sequence `encoded_data`, using RLP. 157 158 Parameters 159 ---------- 160 encoded_data : 161 A sequence of bytes, in RLP form. 162 163 Returns 164 ------- 165 decoded_data : `RLP` 166 Object decoded from `encoded_data`. 167 """ 168 # Raising error as there can never be empty encoded data for any 169 # given raw data (including empty raw data) 170 # RLP Encoding(b'') -> [0x80] # noqa: SC100 171 # RLP Encoding([]) -> [0xc0] # noqa: SC100 172 ensure( 173 len(encoded_data) > 0, 174 RLPDecodingError("Cannot decode empty bytestring"), 175 ) 176 177 if encoded_data[0] <= 0xBF: 178 # This means that the raw data is of type bytes 179 return decode_to_bytes(encoded_data) 180 else: 181 # This means that the raw data is of type sequence 182 return decode_to_sequence(encoded_data)
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data
, using RLP.
Parameters
encoded_data : A sequence of bytes, in RLP form.
Returns
decoded_data : RLP
Object decoded from encoded_data
.
188def decode_to(cls: Type[T], encoded_data: Bytes) -> T: 189 """ 190 Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be 191 a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. 192 193 Parameters 194 ---------- 195 cls: `Type[T]` 196 The type to decode to. 197 encoded_data : 198 A sequence of bytes, in RLP form. 199 200 Returns 201 ------- 202 decoded_data : `T` 203 Object decoded from `encoded_data`. 204 """ 205 return _decode_to(cls, decode(encoded_data))
Decode the bytes in encoded_data
to an object of type cls
. cls
can be
a Bytes
subclass, a dataclass, Uint
, U256
or Tuple[cls]
.
Parameters
cls: Type[T]
The type to decode to.
encoded_data :
A sequence of bytes, in RLP form.
Returns
decoded_data : T
Object decoded from encoded_data
.
302def decode_to_bytes(encoded_bytes: Bytes) -> Bytes: 303 """ 304 Decodes a rlp encoded byte stream assuming that the decoded data 305 should be of type `bytes`. 306 307 Parameters 308 ---------- 309 encoded_bytes : 310 RLP encoded byte stream. 311 312 Returns 313 ------- 314 decoded : `ethereum.base_types.Bytes` 315 RLP decoded Bytes data 316 """ 317 if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: 318 return encoded_bytes 319 elif encoded_bytes[0] <= 0xB7: 320 len_raw_data = encoded_bytes[0] - 0x80 321 ensure(len_raw_data < len(encoded_bytes), RLPDecodingError) 322 raw_data = encoded_bytes[1 : 1 + len_raw_data] 323 ensure( 324 not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError 325 ) 326 return raw_data 327 else: 328 # This is the index in the encoded data at which decoded data 329 # starts from. 330 decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 331 ensure( 332 decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError 333 ) 334 # Expectation is that the big endian bytes shouldn't start with 0 335 # while trying to decode using RLP, in which case is an error. 336 ensure(encoded_bytes[1] != 0, RLPDecodingError) 337 len_decoded_data = Uint.from_be_bytes( 338 encoded_bytes[1:decoded_data_start_idx] 339 ) 340 ensure(len_decoded_data >= 0x38, RLPDecodingError) 341 decoded_data_end_idx = decoded_data_start_idx + len_decoded_data 342 ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError) 343 return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes
.
Parameters
encoded_bytes : RLP encoded byte stream.
Returns
decoded : ethereum.base_types.Bytes
RLP decoded Bytes data
346def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]: 347 """ 348 Decodes a rlp encoded byte stream assuming that the decoded data 349 should be of type `Sequence` of objects. 350 351 Parameters 352 ---------- 353 encoded_sequence : 354 An RLP encoded Sequence. 355 356 Returns 357 ------- 358 decoded : `Sequence[RLP]` 359 Sequence of objects decoded from `encoded_sequence`. 360 """ 361 if encoded_sequence[0] <= 0xF7: 362 len_joined_encodings = encoded_sequence[0] - 0xC0 363 ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError) 364 joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] 365 else: 366 joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 367 ensure( 368 joined_encodings_start_idx - 1 < len(encoded_sequence), 369 RLPDecodingError, 370 ) 371 # Expectation is that the big endian bytes shouldn't start with 0 372 # while trying to decode using RLP, in which case is an error. 373 ensure(encoded_sequence[1] != 0, RLPDecodingError) 374 len_joined_encodings = Uint.from_be_bytes( 375 encoded_sequence[1:joined_encodings_start_idx] 376 ) 377 ensure(len_joined_encodings >= 0x38, RLPDecodingError) 378 joined_encodings_end_idx = ( 379 joined_encodings_start_idx + len_joined_encodings 380 ) 381 ensure( 382 joined_encodings_end_idx - 1 < len(encoded_sequence), 383 RLPDecodingError, 384 ) 385 joined_encodings = encoded_sequence[ 386 joined_encodings_start_idx:joined_encodings_end_idx 387 ] 388 389 return decode_joined_encodings(joined_encodings)
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence
of objects.
Parameters
encoded_sequence : An RLP encoded Sequence.
Returns
decoded : Sequence[RLP]
Sequence of objects decoded from encoded_sequence
.
392def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]: 393 """ 394 Decodes `joined_encodings`, which is a concatenation of RLP encoded 395 objects. 396 397 Parameters 398 ---------- 399 joined_encodings : 400 concatenation of RLP encoded objects 401 402 Returns 403 ------- 404 decoded : `List[RLP]` 405 A list of objects decoded from `joined_encodings`. 406 """ 407 decoded_sequence = [] 408 409 item_start_idx = 0 410 while item_start_idx < len(joined_encodings): 411 encoded_item_length = decode_item_length( 412 joined_encodings[item_start_idx:] 413 ) 414 ensure( 415 item_start_idx + encoded_item_length - 1 < len(joined_encodings), 416 RLPDecodingError, 417 ) 418 encoded_item = joined_encodings[ 419 item_start_idx : item_start_idx + encoded_item_length 420 ] 421 decoded_sequence.append(decode(encoded_item)) 422 item_start_idx += encoded_item_length 423 424 return decoded_sequence
Decodes joined_encodings
, which is a concatenation of RLP encoded
objects.
Parameters
joined_encodings : concatenation of RLP encoded objects
Returns
decoded : List[RLP]
A list of objects decoded from joined_encodings
.
427def decode_item_length(encoded_data: Bytes) -> int: 428 """ 429 Find the length of the rlp encoding for the first object in the 430 encoded sequence. 431 Here `encoded_data` refers to concatenation of rlp encoding for each 432 item in a sequence. 433 434 NOTE - This is a helper function not described in the spec. It was 435 introduced as the spec doesn't discuss about decoding the RLP encoded 436 data. 437 438 Parameters 439 ---------- 440 encoded_data : 441 RLP encoded data for a sequence of objects. 442 443 Returns 444 ------- 445 rlp_length : `int` 446 """ 447 # Can't decode item length for empty encoding 448 ensure(len(encoded_data) > 0, RLPDecodingError) 449 450 first_rlp_byte = Uint(encoded_data[0]) 451 452 # This is the length of the big endian representation of the length of 453 # rlp encoded object byte stream. 454 length_length = Uint(0) 455 decoded_data_length = 0 456 457 # This occurs only when the raw_data is a single byte whose value < 128 458 if first_rlp_byte < 0x80: 459 # We return 1 here, as the end formula 460 # 1 + length_length + decoded_data_length would be invalid for 461 # this case. 462 return 1 463 # This occurs only when the raw_data is a byte stream with length < 56 464 # and doesn't fall into the above cases 465 elif first_rlp_byte <= 0xB7: 466 decoded_data_length = first_rlp_byte - 0x80 467 # This occurs only when the raw_data is a byte stream and doesn't fall 468 # into the above cases 469 elif first_rlp_byte <= 0xBF: 470 length_length = first_rlp_byte - 0xB7 471 ensure(length_length < len(encoded_data), RLPDecodingError) 472 # Expectation is that the big endian bytes shouldn't start with 0 473 # while trying to decode using RLP, in which case is an error. 474 ensure(encoded_data[1] != 0, RLPDecodingError) 475 decoded_data_length = Uint.from_be_bytes( 476 encoded_data[1 : 1 + length_length] 477 ) 478 # This occurs only when the raw_data is a sequence of objects with 479 # length(concatenation of encoding of each object) < 56 480 elif first_rlp_byte <= 0xF7: 481 decoded_data_length = first_rlp_byte - 0xC0 482 # This occurs only when the raw_data is a sequence of objects and 483 # doesn't fall into the above cases. 484 elif first_rlp_byte <= 0xFF: 485 length_length = first_rlp_byte - 0xF7 486 ensure(length_length < len(encoded_data), RLPDecodingError) 487 # Expectation is that the big endian bytes shouldn't start with 0 488 # while trying to decode using RLP, in which case is an error. 489 ensure(encoded_data[1] != 0, RLPDecodingError) 490 decoded_data_length = Uint.from_be_bytes( 491 encoded_data[1 : 1 + length_length] 492 ) 493 494 return 1 + length_length + decoded_data_length
Find the length of the rlp encoding for the first object in the
encoded sequence.
Here encoded_data
refers to concatenation of rlp encoding for each
item in a sequence.
NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.
Parameters
encoded_data : RLP encoded data for a sequence of objects.
Returns
rlp_length : int
497def rlp_hash(data: RLP) -> Hash32: 498 """ 499 Obtain the keccak-256 hash of the rlp encoding of the passed in data. 500 501 Parameters 502 ---------- 503 data : 504 The data for which we need the rlp hash. 505 506 Returns 507 ------- 508 hash : `Hash32` 509 The rlp hash of the passed in data. 510 """ 511 return keccak256(encode(data))
Obtain the keccak-256 hash of the rlp encoding of the passed in data.
Parameters
data : The data for which we need the rlp hash.
Returns
hash : Hash32
The rlp hash of the passed in data.