Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get blobs from the EL's blob pool #5829

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
Update for new spec
michaelsproul committed Jul 22, 2024
commit dbbcc1f06fd09261d5d711cf534227410d3f5d1b
85 changes: 36 additions & 49 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
@@ -68,7 +68,6 @@ use crate::{
};
use derivative::Derivative;
use eth2::types::{BlockGossip, EventKind, PublishBlockRequest};
use execution_layer::versioned_hashes::extract_blob_transaction_ids;
use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
@@ -79,7 +78,9 @@ use slot_clock::SlotClock;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::per_block_processing::{
deneb::kzg_commitment_to_versioned_hash, errors::IntoWithIndex, is_merge_transition_block,
};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
@@ -1323,7 +1324,18 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
let block = blob_block;
let chain = blob_chain;

if !block.message().body().has_blobs() {
let versioned_hashes =
if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() {
kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect()
} else {
vec![]
};
let num_blobs = versioned_hashes.len();

if versioned_hashes.is_empty() {
debug!(chain.log, "Blobs from EL - none required");
return Ok(());
}
@@ -1332,78 +1344,52 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?;
let execution_payload = block.message().execution_payload()?;
let Some(transactions) = execution_payload.transactions() else {
debug!(chain.log, "Blobs from EL - blinded payload");
return Ok(());
};
debug!(chain.log, "Blobs from EL - decoding");
let blob_ids =
extract_blob_transaction_ids::<T::EthSpec>(transactions).map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(
execution_layer::Error::VerifyingVersionedHashes(e),
))
})?;
let num_blob_tx = blob_ids.len();

debug!(
chain.log,
"Blobs from EL - start request";
"num_blob_tx" => num_blob_tx,
"num_blobs" => num_blobs,
);
let mut blob_index = 0;
let blob_start_indices = blob_ids
.iter()
.map(|blob_id| {
let i = blob_index;
blob_index += blob_id.versioned_hashes.len();
i
})
.collect::<Vec<_>>();
let response = execution_layer.get_blobs(blob_ids).await.map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(e))
})?;
let num_fetched_tx = response.blobs.iter().filter(|b| b.is_some()).count();
if num_fetched_tx == 0 {
let response = execution_layer
.get_blobs(versioned_hashes)
.await
.map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(e))
})?;
let num_fetched_blobs = response.iter().filter(|b| b.is_some()).count();
if num_fetched_blobs == 0 {
debug!(chain.log, "Blobs from EL - response with none");
return Ok(());
} else if num_fetched_tx < num_blob_tx {
} else if num_fetched_blobs < num_blobs {
debug!(
chain.log,
"Blobs from EL - response with some";
"fetched" => num_fetched_tx,
"total" => num_blob_tx,
"fetched" => num_fetched_blobs,
"total" => num_blobs,
);
} else {
debug!(
chain.log,
"Blobs from EL - response with all";
"num_blob_tx" => num_fetched_tx
"num_blobs" => num_blobs
);
}
let (signed_block_header, kzg_commitments_proof) =
block.signed_block_header_and_kzg_commitments_proof()?;

let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default();
for (i, (blob, kzg_proof)) in blob_start_indices
for (i, blob_and_proof) in response
.into_iter()
.zip(response.blobs)
.filter_map(|(start_index, blobs)| blobs.map(|blobs| (start_index, blobs)))
.flat_map(|(start_index, blob)| {
blob.blobs
.into_iter()
.zip(blob.proofs)
.enumerate()
.map(move |(i, v)| (start_index + i, v))
})
.enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
{
match BlobSidecar::new_efficiently(
i,
blob,
blob_and_proof.blob,
&block,
signed_block_header.clone(),
&kzg_commitments_proof,
kzg_proof,
blob_and_proof.proof,
) {
Ok(blob) => {
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(i) {
@@ -1428,10 +1414,10 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
debug!(
chain.log,
"Blobs from EL - start processing";
"num_blobs" => fixed_blob_sidecar_list.iter().filter(|b| b.is_some()).count(),
"num_blobs" => num_blobs,
);
chain
.process_rpc_blobs(block.slot(), block_root, fixed_blob_sidecar_list)
.process_engine_blobs(block.slot(), block_root, fixed_blob_sidecar_list)
.await
.map(|_| {
debug!(chain.log, "Blobs from EL - processed");
@@ -1446,6 +1432,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
.spawn_handle(blob_fetcher_future, "execution_blob_fetcher")
.ok_or(BeaconChainError::RuntimeShutdown)?;
// FIXME(sproul): should we wait for this handle?
// FIXME(sproul): should do blob broadcast on P2P in here somewhere
drop(blob_fetcher_handle);

// Define a future that will verify the execution payload with an execution engine.
7 changes: 3 additions & 4 deletions beacon_node/execution_layer/src/engine_api/http.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@
use super::*;
use crate::auth::Auth;
use crate::json_structures::*;
use crate::versioned_hashes::BlobTransactionId;
use lazy_static::lazy_static;
use lighthouse_version::{COMMIT_PREFIX, VERSION};
use reqwest::header::CONTENT_TYPE;
@@ -710,9 +709,9 @@ impl HttpJsonRpc {

pub async fn get_blobs<E: EthSpec>(
&self,
blob_ids: Vec<BlobTransactionId>,
) -> Result<GetBlobsResponse<E>, Error> {
let params = json!([blob_ids]);
versioned_hashes: Vec<Hash256>,
) -> Result<Vec<Option<BlobAndProofV1<E>>>, Error> {
let params = json!([versioned_hashes]);

self.rpc_request(
ENGINE_GET_BLOBS_V1,
10 changes: 6 additions & 4 deletions beacon_node/execution_layer/src/engine_api/json_structures.rs
Original file line number Diff line number Diff line change
@@ -5,8 +5,8 @@ use superstruct::superstruct;
use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::BlobsList;
use types::{
DepositRequest, ExecutionLayerWithdrawalRequest, FixedVector, PublicKeyBytes, Signature,
Unsigned,
Blob, DepositRequest, ExecutionLayerWithdrawalRequest, FixedVector, KzgProof, PublicKeyBytes,
Signature, Unsigned,
};

#[derive(Debug, PartialEq, Serialize, Deserialize)]
@@ -566,8 +566,10 @@ impl<E: EthSpec> From<JsonBlobsBundleV1<E>> for BlobsBundle<E> {

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(bound = "E: EthSpec", rename_all = "camelCase")]
pub struct GetBlobsResponse<E: EthSpec> {
pub blobs: Vec<Option<JsonBlobsBundleV1<E>>>,
pub struct BlobAndProofV1<E: EthSpec> {
#[serde(with = "ssz_types::serde_utils::hex_fixed_vec")]
pub blob: Blob<E>,
pub proof: KzgProof,
}

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
9 changes: 3 additions & 6 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -4,9 +4,8 @@
//! This crate only provides useful functionality for "The Merge", it does not provide any of the
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use crate::json_structures::GetBlobsResponse;
use crate::json_structures::BlobAndProofV1;
use crate::payload_cache::PayloadCache;
use crate::versioned_hashes::BlobTransactionId;
use arc_swap::ArcSwapOption;
use auth::{strip_prefix, Auth, JwtKey};
pub use block_hash::calculate_execution_block_hash;
@@ -1850,10 +1849,8 @@ impl<E: EthSpec> ExecutionLayer<E> {

pub async fn get_blobs(
&self,
query: Vec<BlobTransactionId>,
) -> Result<GetBlobsResponse<E>, Error> {
// FIXME(sproul): try IPC again?
// .ipc_request(|ipc| ipc.get_blobs(query))
query: Vec<Hash256>,
) -> Result<Vec<Option<BlobAndProofV1<E>>>, Error> {
self.engine()
.request(|engine| async move { engine.api.get_blobs(query).await })
.await
32 changes: 0 additions & 32 deletions beacon_node/execution_layer/src/versioned_hashes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use alloy_consensus::TxEnvelope;
use alloy_rlp::Decodable;
use serde::{Deserialize, Serialize};
use types::{EthSpec, ExecutionPayloadRef, Hash256, Unsigned, VersionedHash};

#[derive(Debug)]
@@ -58,37 +57,6 @@ pub fn extract_versioned_hashes_from_transactions<E: EthSpec>(
Ok(versioned_hashes)
}

#[derive(Deserialize, Serialize)]
pub struct BlobTransactionId {
pub tx_hash: Hash256,
pub versioned_hashes: Vec<VersionedHash>,
}

pub fn extract_blob_transaction_ids<E: EthSpec>(
transactions: &types::Transactions<E>,
) -> Result<Vec<BlobTransactionId>, Error> {
let mut transaction_ids = vec![];

for tx in transactions {
if let TxEnvelope::Eip4844(signed_tx_eip4844) = beacon_tx_to_tx_envelope(tx)? {
let tx_hash = signed_tx_eip4844.hash();
let versioned_hashes = signed_tx_eip4844
.tx()
.tx()
.blob_versioned_hashes
.iter()
.map(|fb| Hash256::from(fb.0))
.collect();
transaction_ids.push(BlobTransactionId {
tx_hash: Hash256::from_slice(tx_hash.as_slice()),
versioned_hashes,
});
}
}

Ok(transaction_ids)
}

pub fn beacon_tx_to_tx_envelope<N: Unsigned>(
tx: &types::Transaction<N>,
) -> Result<TxEnvelope, Error> {