Skip to content
Snippets Groups Projects
Unverified Commit 988f6add authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

chain_head/tests: Check finalized block event before new block (#13680)


* chain_head/tests: Mock client for custom block notification

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Check finalized block event before new block

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/test_utils.rs

Co-authored-by: default avatarBastian Köcher <git@kchr.de>

* Update client/rpc-spec-v2/src/chain_head/test_utils.rs

Co-authored-by: default avatarBastian Köcher <git@kchr.de>

* Update client/rpc-spec-v2/src/chain_head/test_utils.rs

Co-authored-by: default avatarBastian Köcher <git@kchr.de>

* chain_head/tests: Run import events with 10min timeout

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Add comments about test

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: default avatarBastian Köcher <git@kchr.de>
parent 11841430
No related branches found
No related tags found
No related merge requests found
...@@ -9275,6 +9275,7 @@ dependencies = [ ...@@ -9275,6 +9275,7 @@ dependencies = [
"sc-chain-spec", "sc-chain-spec",
"sc-client-api", "sc-client-api",
"sc-transaction-pool-api", "sc-transaction-pool-api",
"sc-utils",
"serde", "serde",
"serde_json", "serde_json",
"sp-api", "sp-api",
...@@ -11954,7 +11955,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" ...@@ -11954,7 +11955,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"digest 0.10.6", "digest 0.10.6",
"rand 0.7.3", "rand 0.8.5",
"static_assertions", "static_assertions",
] ]
......
...@@ -43,4 +43,5 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" ...@@ -43,4 +43,5 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
assert_matches = "1.3.0" assert_matches = "1.3.0"
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
//! //!
//! Methods are prefixed by `chainHead`. //! Methods are prefixed by `chainHead`.
#[cfg(test)]
mod test_utils;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
......
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use parking_lot::Mutex;
use sc_client_api::{
execution_extensions::ExecutionExtensions, BlockBackend, BlockImportNotification,
BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, FinalityNotification,
FinalityNotifications, FinalizeSummary, ImportNotifications, KeysIter, PairsIter, StorageData,
StorageEventStream, StorageKey, StorageProvider,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::{CallApiAt, CallApiAtParams, NumberFor, RuntimeVersion};
use sp_blockchain::{BlockStatus, CachedHeaderMetadata, HeaderBackend, HeaderMetadata, Info};
use sp_consensus::BlockOrigin;
use sp_runtime::{
generic::SignedBlock,
traits::{Block as BlockT, Header as HeaderT},
Justifications,
};
use std::sync::Arc;
use substrate_test_runtime::{Block, Hash, Header};
pub struct ChainHeadMockClient<Client> {
client: Arc<Client>,
import_sinks: Mutex<Vec<TracingUnboundedSender<BlockImportNotification<Block>>>>,
finality_sinks: Mutex<Vec<TracingUnboundedSender<FinalityNotification<Block>>>>,
}
impl<Client> ChainHeadMockClient<Client> {
pub fn new(client: Arc<Client>) -> Self {
ChainHeadMockClient {
client,
import_sinks: Default::default(),
finality_sinks: Default::default(),
}
}
pub async fn trigger_import_stream(&self, header: Header) {
// Ensure the client called the `import_notification_stream`.
while self.import_sinks.lock().is_empty() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
// Build the notification.
let (sink, _stream) = tracing_unbounded("test_sink", 100_000);
let notification =
BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink);
for sink in self.import_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
}
}
pub async fn trigger_finality_stream(&self, header: Header) {
// Ensure the client called the `finality_notification_stream`.
while self.finality_sinks.lock().is_empty() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
// Build the notification.
let (sink, _stream) = tracing_unbounded("test_sink", 100_000);
let summary = FinalizeSummary {
header: header.clone(),
finalized: vec![header.hash()],
stale_heads: vec![],
};
let notification = FinalityNotification::from_summary(summary, sink);
for sink in self.finality_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
}
}
}
// ChainHead calls `import_notification_stream` and `finality_notification_stream` in order to
// subscribe to block events.
impl<Client> BlockchainEvents<Block> for ChainHeadMockClient<Client> {
fn import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("import_notification_stream", 1024);
self.import_sinks.lock().push(sink);
stream
}
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
unimplemented!()
}
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("finality_notification_stream", 1024);
self.finality_sinks.lock().push(sink);
stream
}
fn storage_changes_notification_stream(
&self,
_filter_keys: Option<&[StorageKey]>,
_child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> sp_blockchain::Result<StorageEventStream<Hash>> {
unimplemented!()
}
}
// The following implementations are imposed by the `chainHead` trait bounds.
impl<Block: BlockT, E: CallExecutor<Block>, Client: ExecutorProvider<Block, Executor = E>>
ExecutorProvider<Block> for ChainHeadMockClient<Client>
{
type Executor = <Client as ExecutorProvider<Block>>::Executor;
fn executor(&self) -> &Self::Executor {
self.client.executor()
}
fn execution_extensions(&self) -> &ExecutionExtensions<Block> {
self.client.execution_extensions()
}
}
impl<
BE: sc_client_api::backend::Backend<Block> + Send + Sync + 'static,
Block: BlockT,
Client: StorageProvider<Block, BE>,
> StorageProvider<Block, BE> for ChainHeadMockClient<Client>
{
fn storage(
&self,
hash: Block::Hash,
key: &StorageKey,
) -> sp_blockchain::Result<Option<StorageData>> {
self.client.storage(hash, key)
}
fn storage_hash(
&self,
hash: Block::Hash,
key: &StorageKey,
) -> sp_blockchain::Result<Option<Block::Hash>> {
self.client.storage_hash(hash, key)
}
fn storage_keys(
&self,
hash: Block::Hash,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<KeysIter<BE::State, Block>> {
self.client.storage_keys(hash, prefix, start_key)
}
fn storage_pairs(
&self,
hash: <Block as BlockT>::Hash,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<PairsIter<BE::State, Block>> {
self.client.storage_pairs(hash, prefix, start_key)
}
fn child_storage(
&self,
hash: Block::Hash,
child_info: &ChildInfo,
key: &StorageKey,
) -> sp_blockchain::Result<Option<StorageData>> {
self.client.child_storage(hash, child_info, key)
}
fn child_storage_keys(
&self,
hash: Block::Hash,
child_info: ChildInfo,
prefix: Option<&StorageKey>,
start_key: Option<&StorageKey>,
) -> sp_blockchain::Result<KeysIter<BE::State, Block>> {
self.client.child_storage_keys(hash, child_info, prefix, start_key)
}
fn child_storage_hash(
&self,
hash: Block::Hash,
child_info: &ChildInfo,
key: &StorageKey,
) -> sp_blockchain::Result<Option<Block::Hash>> {
self.client.child_storage_hash(hash, child_info, key)
}
}
impl<Block: BlockT, Client: CallApiAt<Block>> CallApiAt<Block> for ChainHeadMockClient<Client> {
type StateBackend = <Client as CallApiAt<Block>>::StateBackend;
fn call_api_at(
&self,
params: CallApiAtParams<Block, <Client as CallApiAt<Block>>::StateBackend>,
) -> Result<Vec<u8>, sp_api::ApiError> {
self.client.call_api_at(params)
}
fn runtime_version_at(&self, hash: Block::Hash) -> Result<RuntimeVersion, sp_api::ApiError> {
self.client.runtime_version_at(hash)
}
fn state_at(&self, at: Block::Hash) -> Result<Self::StateBackend, sp_api::ApiError> {
self.client.state_at(at)
}
}
impl<Block: BlockT, Client: BlockBackend<Block>> BlockBackend<Block>
for ChainHeadMockClient<Client>
{
fn block_body(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
self.client.block_body(hash)
}
fn block(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
self.client.block(hash)
}
fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result<sp_consensus::BlockStatus> {
self.client.block_status(hash)
}
fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
self.client.justifications(hash)
}
fn block_hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
self.client.block_hash(number)
}
fn indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
self.client.indexed_transaction(hash)
}
fn has_indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result<bool> {
self.client.has_indexed_transaction(hash)
}
fn block_indexed_body(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
self.client.block_indexed_body(hash)
}
fn requires_full_sync(&self) -> bool {
self.client.requires_full_sync()
}
}
impl<Block: BlockT, Client: HeaderMetadata<Block> + Send + Sync> HeaderMetadata<Block>
for ChainHeadMockClient<Client>
{
type Error = <Client as HeaderMetadata<Block>>::Error;
fn header_metadata(
&self,
hash: Block::Hash,
) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
self.client.header_metadata(hash)
}
fn insert_header_metadata(
&self,
hash: Block::Hash,
header_metadata: CachedHeaderMetadata<Block>,
) {
self.client.insert_header_metadata(hash, header_metadata)
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.client.remove_header_metadata(hash)
}
}
impl<Block: BlockT, Client: HeaderBackend<Block> + Send + Sync> HeaderBackend<Block>
for ChainHeadMockClient<Client>
{
fn header(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
self.client.header(hash)
}
fn info(&self) -> Info<Block> {
self.client.info()
}
fn status(&self, hash: Block::Hash) -> sc_client_api::blockchain::Result<BlockStatus> {
self.client.status(hash)
}
fn number(
&self,
hash: Block::Hash,
) -> sc_client_api::blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
self.client.number(hash)
}
fn hash(
&self,
number: <<Block as BlockT>::Header as HeaderT>::Number,
) -> sp_blockchain::Result<Option<Block::Hash>> {
self.client.hash(number)
}
}
use crate::chain_head::test_utils::ChainHeadMockClient;
use super::*; use super::*;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::Future;
use jsonrpsee::{ use jsonrpsee::{
core::{error::Error, server::rpc_module::Subscription as RpcSubscription}, core::{error::Error, server::rpc_module::Subscription as RpcSubscription},
types::{error::CallError, EmptyServerParams as EmptyParams}, types::{error::CallError, EmptyServerParams as EmptyParams},
...@@ -33,7 +36,7 @@ const CHILD_STORAGE_KEY: &[u8] = b"child"; ...@@ -33,7 +36,7 @@ const CHILD_STORAGE_KEY: &[u8] = b"child";
const CHILD_VALUE: &[u8] = b"child value"; const CHILD_VALUE: &[u8] = b"child value";
async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T { async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T {
let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()) let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next())
.await .await
.unwrap() .unwrap()
.unwrap() .unwrap()
...@@ -41,6 +44,12 @@ async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscriptio ...@@ -41,6 +44,12 @@ async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscriptio
event event
} }
async fn run_with_timeout<F: Future>(future: F) -> <F as Future>::Output {
tokio::time::timeout(std::time::Duration::from_secs(60 * 10), future)
.await
.unwrap()
}
async fn setup_api() -> ( async fn setup_api() -> (
Arc<Client<Backend>>, Arc<Client<Backend>>,
RpcModule<ChainHead<Backend, Block, Client<Backend>>>, RpcModule<ChainHead<Backend, Block, Client<Backend>>>,
...@@ -1317,3 +1326,95 @@ async fn follow_report_multiple_pruned_block() { ...@@ -1317,3 +1326,95 @@ async fn follow_report_multiple_pruned_block() {
}); });
assert_eq!(event, expected); assert_eq!(event, expected);
} }
#[tokio::test]
async fn follow_finalized_before_new_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
let api = ChainHead::new(
client_mock.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
)
.into_rpc();
// Make sure the block is imported for it to be pinned.
let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_1_hash = block_1.header.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
// Trigger the `FinalizedNotification` for block 1 before the `BlockImportNotification`, and
// expect for the `chainHead` to generate `NewBlock`, `BestBlock` and `Finalized` events.
// Trigger the Finalized notification before the NewBlock one.
run_with_timeout(client_mock.trigger_finality_stream(block_1.header.clone())).await;
// Initialized must always be reported first.
let finalized_hash = client.info().finalized_hash;
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", finalized_hash),
finalized_block_runtime: None,
runtime_updates: false,
});
assert_eq!(event, expected);
// Block 1 must be reported because we triggered the finalized notification.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::NewBlock(NewBlock {
block_hash: format!("{:?}", block_1_hash),
parent_block_hash: format!("{:?}", finalized_hash),
new_runtime: None,
runtime_updates: false,
});
assert_eq!(event, expected);
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: format!("{:?}", block_1_hash),
});
assert_eq!(event, expected);
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec![format!("{:?}", block_1_hash)],
pruned_block_hashes: vec![],
});
assert_eq!(event, expected);
let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_2_hash = block_2.header.hash();
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();
// Triggering the `BlockImportNotification` notification for block 1 should have no effect
// on the notification because the events were handled by the `FinalizedNotification`.
// Also trigger the `BlockImportNotification` notification for block 2 to ensure
// `NewBlock and `BestBlock` events are generated.
// Trigger NewBlock notification for block 1 and block 2.
run_with_timeout(client_mock.trigger_import_stream(block_1.header)).await;
run_with_timeout(client_mock.trigger_import_stream(block_2.header)).await;
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::NewBlock(NewBlock {
block_hash: format!("{:?}", block_2_hash),
parent_block_hash: format!("{:?}", block_1_hash),
new_runtime: None,
runtime_updates: false,
});
assert_eq!(event, expected);
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: format!("{:?}", block_2_hash),
});
assert_eq!(event, expected);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment