diff --git a/Cargo.lock b/Cargo.lock index 174456883688bdbca5f465bc414d9a95dd459737..1b9e47064d58323c0c7089b2e6f8b3346a47aa45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9000,6 +9000,7 @@ dependencies = [ "sc-network-light", "sc-network-sync", "sc-service", + "sc-utils", "sp-blockchain", "sp-consensus", "sp-consensus-babe", diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs index 89c05b71b9ea60aba6e4defd40af904310395d75..c299966015c2656054ccafd97b77bbccfccea692 100644 --- a/client/authority-discovery/src/error.rs +++ b/client/authority-discovery/src/error.rs @@ -25,6 +25,7 @@ pub type Result<T> = std::result::Result<T, Error>; /// Error type for the authority discovery module. #[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] pub enum Error { #[error("Received dht value found event with records with different keys.")] ReceivingDhtValueFoundEventWithDifferentKeys, @@ -76,4 +77,7 @@ pub enum Error { #[error("Received authority record without a valid signature for the remote peer id.")] MissingPeerIdSignature, + + #[error("Unable to fetch best block.")] + BestBlockFetchingError, } diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index a3c6699091297aa81f90396342279f022f317f88..6bb12804cada34d1b3f87b4249fc466fcf253a9a 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -28,6 +28,7 @@ //! See [`Worker`] and [`Service`] for more documentation. pub use crate::{ + error::Error, service::Service, worker::{AuthorityDiscovery, NetworkProvider, Role, Worker}, }; @@ -148,7 +149,7 @@ pub fn new_worker_and_service_with_config<Client, Network, Block, DhtEventStream where Block: BlockT + Unpin + 'static, Network: NetworkProvider, - Client: AuthorityDiscovery<Block> + HeaderBackend<Block> + 'static, + Client: AuthorityDiscovery<Block> + 'static, DhtEventStream: Stream<Item = DhtEvent> + Unpin, { let (to_worker, from_service) = mpsc::channel(0); diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 034d72902e65d54320d8fbed090ebae53c3180e9..0b2055c818a9bab3cb39df3909c81ab5d90b0dce 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -157,12 +157,15 @@ pub trait AuthorityDiscovery<Block: BlockT> { /// Retrieve authority identifiers of the current and next authority set. async fn authorities(&self, at: Block::Hash) -> std::result::Result<Vec<AuthorityId>, ApiError>; + + /// Retrieve best block hash + async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>; } #[async_trait::async_trait] impl<Block, T> AuthorityDiscovery<Block> for T where - T: ProvideRuntimeApi<Block> + Send + Sync, + T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync, T::Api: AuthorityDiscoveryApi<Block>, Block: BlockT, { @@ -172,13 +175,17 @@ where ) -> std::result::Result<Vec<AuthorityId>, ApiError> { self.runtime_api().authorities(at) } + + async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> { + Ok(self.info().best_hash) + } } impl<Client, Network, Block, DhtEventStream> Worker<Client, Network, Block, DhtEventStream> where Block: BlockT + Unpin + 'static, Network: NetworkProvider, - Client: AuthorityDiscovery<Block> + HeaderBackend<Block> + 'static, + Client: AuthorityDiscovery<Block> + 'static, DhtEventStream: Stream<Item = DhtEvent> + Unpin, { /// Construct a [`Worker`]. @@ -377,7 +384,7 @@ where } async fn refill_pending_lookups_queue(&mut self) -> Result<()> { - let best_hash = self.client.info().best_hash; + let best_hash = self.client.best_hash().await?; let local_keys = match &self.role { Role::PublishAndDiscover(key_store) => key_store @@ -597,7 +604,7 @@ where .into_iter() .collect::<HashSet<_>>(); - let best_hash = client.info().best_hash; + let best_hash = client.best_hash().await?; let authorities = client .authorities(best_hash) .await diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 46e7229273d794181404da52570a073b2814a693..2f101307229da426a3b027a927306da4b3e857c6 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -76,7 +76,7 @@ impl<B: BlockT> InformantDisplay<B> { let best_number = info.chain.best_number; let best_hash = info.chain.best_hash; let finalized_number = info.chain.finalized_number; - let num_connected_peers = net_status.num_connected_peers; + let num_connected_peers = sync_status.num_connected_peers; let speed = speed::<B>(best_number, self.last_number, self.last_update); let total_bytes_inbound = net_status.total_bytes_inbound; let total_bytes_outbound = net_status.total_bytes_outbound; diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index 130f354b70050dc3e08dd24bbede12981c0709e1..d02a81379aea03ab340a971a0dc4124a15dd3297 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -94,6 +94,8 @@ pub struct SyncStatus<Block: BlockT> { pub best_seen_block: Option<NumberFor<Block>>, /// Number of peers participating in syncing. pub num_peers: u32, + /// Number of peers known to `SyncingEngine` (both full and light). + pub num_connected_peers: u32, /// Number of blocks queued for import pub queued_blocks: u32, /// State sync status in progress, if any. diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 925a7795d290f44449ea17210fb59d0425061879..781ae9c786694aaaa1870b3b67394c34c062ed3d 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -22,6 +22,7 @@ //! See the documentation of [`Params`]. pub use crate::{ + protocol::NotificationsSink, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -31,9 +32,15 @@ pub use crate::{ use codec::Encode; use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; -pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT}; +pub use sc_network_common::{ + role::{Role, Roles}, + sync::warp::WarpSyncProvider, + ExHashT, +}; +use sc_utils::mpsc::TracingUnboundedSender; use zeroize::Zeroize; +use sp_runtime::traits::Block as BlockT; use std::{ error::Error, fmt, fs, @@ -44,7 +51,6 @@ use std::{ path::{Path, PathBuf}, pin::Pin, str::{self, FromStr}, - sync::Arc, }; pub use libp2p::{ @@ -688,7 +694,7 @@ impl NetworkConfiguration { } /// Network initialization parameters. -pub struct Params<Client> { +pub struct Params<Block: BlockT> { /// Assigned role for our node (full, light, ...). pub role: Role, @@ -698,12 +704,12 @@ pub struct Params<Client> { /// Network layer configuration. pub network_config: NetworkConfiguration, - /// Client that contains the blockchain. - pub chain: Arc<Client>, - /// Legacy name of the protocol to use on the wire. Should be different for each chain. pub protocol_id: ProtocolId, + /// Genesis hash of the chain + pub genesis_hash: Block::Hash, + /// Fork ID to distinguish protocols of different hard forks. Part of the standard protocol /// name on the wire. pub fork_id: Option<String>, @@ -714,6 +720,9 @@ pub struct Params<Client> { /// Block announce protocol configuration pub block_announce_config: NonDefaultSetConfig, + /// TX channel for direct communication with `SyncingEngine` and `Protocol`. + pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>, + /// Request response protocol configurations pub request_response_protocol_configs: Vec<RequestResponseConfig>, } diff --git a/client/network/src/event.rs b/client/network/src/event.rs index 3ecd8f931142904da95b5123bb3be88139ef9a36..975fde0e40a287ec4589c763d0d02665a3fe0fc6 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -19,12 +19,14 @@ //! Network event types. These are are not the part of the protocol, but rather //! events that happen on the network like DHT get/put results received. -use crate::types::ProtocolName; +use crate::{types::ProtocolName, NotificationsSink}; use bytes::Bytes; +use futures::channel::oneshot; use libp2p::{core::PeerId, kad::record::Key}; -use sc_network_common::role::ObservedRole; +use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake}; +use sp_runtime::traits::Block as BlockT; /// Events generated by DHT as a response to get_value and put_value requests. #[derive(Debug, Clone)] @@ -90,3 +92,44 @@ pub enum Event { messages: Vec<(ProtocolName, Bytes)>, }, } + +/// Event sent to `SyncingEngine` +// TODO: remove once `NotificationService` is implemented. +pub enum SyncEvent<B: BlockT> { + /// Opened a substream with the given node with the given notifications protocol. + /// + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// Received handshake. + received_handshake: BlockAnnouncesHandshake<B>, + /// Notification sink. + sink: NotificationsSink, + /// Channel for reporting accept/reject of the substream. + tx: oneshot::Sender<bool>, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + }, + + /// Notification sink was replaced. + NotificationSinkReplaced { + /// Node we closed the substream with. + remote: PeerId, + /// Notification sink. + sink: NotificationsSink, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec<Bytes>, + }, +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index c290f4b94db533b98b3db057074e79784ee09a2d..5374ac13435be713a75d6736504ba49128386f77 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -259,7 +259,7 @@ pub mod request_responses; pub mod types; pub mod utils; -pub use event::{DhtEvent, Event}; +pub use event::{DhtEvent, Event, SyncEvent}; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig}; @@ -278,8 +278,8 @@ pub use service::{ NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT, NotificationSenderError, NotificationSenderReady, }, - DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure, - PublicKey, + DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink, + OutboundFailure, PublicKey, }; pub use types::ProtocolName; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 06ca02c0ca8d58c58ef2e570bc802a71ae954649..a7e6f36ef6215356392276720456cfc65883b6eb 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -24,6 +24,7 @@ use crate::{ use bytes::Bytes; use codec::{DecodeAll, Encode}; +use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use libp2p::{ core::connection::ConnectionId, swarm::{ @@ -35,11 +36,14 @@ use libp2p::{ use log::{debug, error, warn}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ collections::{HashMap, HashSet, VecDeque}, + future::Future, iter, + pin::Pin, task::Poll, }; @@ -68,6 +72,9 @@ mod rep { pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } +type PendingSyncSubstreamValidation = + Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>; + // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT> { /// Pending list of messages to return from `poll` as a priority. @@ -87,6 +94,8 @@ pub struct Protocol<B: BlockT> { bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>, /// Connected peers. peers: HashMap<PeerId, Roles>, + sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>, + tx: TracingUnboundedSender<crate::event::SyncEvent<B>>, _marker: std::marker::PhantomData<B>, } @@ -96,6 +105,7 @@ impl<B: BlockT> Protocol<B> { roles: Roles, network_config: &config::NetworkConfiguration, block_announces_protocol: config::NonDefaultSetConfig, + tx: TracingUnboundedSender<crate::event::SyncEvent<B>>, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let mut known_addresses = Vec::new(); @@ -179,6 +189,8 @@ impl<B: BlockT> Protocol<B> { .collect(), bad_handshake_substreams: Default::default(), peers: HashMap::new(), + sync_substream_validations: FuturesUnordered::new(), + tx, // TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol` _marker: Default::default(), }; @@ -418,6 +430,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } + } + let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -440,16 +469,29 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { best_hash: handshake.best_hash, genesis_hash: handshake.genesis_hash, }; - self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)].clone(), - negotiated_fallback, - received_handshake: handshake.encode(), - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + + CustomMessageOutcome::None }, Ok(msg) => { debug!( @@ -469,15 +511,27 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { let roles = handshake.roles; self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)] - .clone(), - negotiated_fallback, - received_handshake, - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + CustomMessageOutcome::None }, Err(err2) => { log::debug!( @@ -535,6 +589,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } => if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced { + remote: peer_id, + sink: notifications_sink, + }); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamReplaced { remote: peer_id, @@ -548,6 +608,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { // handshake. The outer layers have never received an opening event about this // substream, and consequently shouldn't receive a closing event either. CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed { + remote: peer_id, + }); + self.peers.remove(&peer_id); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, @@ -558,6 +624,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { NotificationsOut::Notification { peer_id, set_id, message } => { if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived { + remote: peer_id, + messages: vec![message.freeze()], + }); + CustomMessageOutcome::None } else { let protocol_name = self.notification_protocols[usize::from(set_id)].clone(); CustomMessageOutcome::NotificationsReceived { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6dc00b36ceb53f34674f64dc775deb7138fc510a..9708b24d29b521e9f3238f6908e6789e353760d8 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -36,7 +36,7 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready}, + protocol::{self, NotifsHandlerError, Protocol, Ready}, request_responses::{IfDisconnected, RequestFailure}, service::{ signature::{Signature, SigningError}, @@ -73,8 +73,7 @@ use parking_lot::Mutex; use sc_network_common::ExHashT; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_blockchain::HeaderBackend; -use sp_runtime::traits::{Block as BlockT, Zero}; +use sp_runtime::traits::Block as BlockT; use std::{ cmp, @@ -92,6 +91,7 @@ use std::{ pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +pub use protocol::NotificationsSink; mod metrics; mod out_events; @@ -147,9 +147,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new<Client: HeaderBackend<B> + 'static>( - mut params: Params<Client>, - ) -> Result<Self, Error> { + pub fn new(mut params: Params<B>) -> Result<Self, Error> { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -230,6 +228,7 @@ where From::from(¶ms.role), ¶ms.network_config, params.block_announce_config, + params.tx, )?; // List of multiaddresses that we know in the network. @@ -277,13 +276,11 @@ where config.discovery_limit( u64::from(params.network_config.default_peers_set.out_peers) + 15, ); - let genesis_hash = params - .chain - .hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"); - config.with_kademlia(genesis_hash, params.fork_id.as_deref(), ¶ms.protocol_id); + config.with_kademlia( + params.genesis_hash, + params.fork_id.as_deref(), + ¶ms.protocol_id, + ); config.with_dht_random_walk(params.network_config.enable_dht_random_walk); config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht); config.use_kademlia_disjoint_query_paths( diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index e6e62101ffcd07cb6b08318c03dc660e44679334..4310db22bd622372b3611439ba687c85abfcbfdd 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -24,8 +24,8 @@ use crate::{ ChainSync, ClientError, SyncingService, }; -use codec::{Decode, DecodeAll, Encode}; -use futures::{FutureExt, Stream, StreamExt}; +use codec::{Decode, Encode}; +use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use lru::LruCache; @@ -39,9 +39,8 @@ use sc_network::{ config::{ NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, }, - event::Event, utils::LruHashSet, - ProtocolName, + NotificationsSink, ProtocolName, }; use sc_network_common::{ role::Roles, @@ -66,7 +65,6 @@ use sp_runtime::{ use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, - pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -174,6 +172,8 @@ pub struct Peer<B: BlockT> { pub info: ExtendedPeerInfo<B>, /// Holds a set of blocks known to this peer. pub known_blocks: LruHashSet<B::Hash>, + /// Notification sink. + sink: NotificationsSink, } pub struct SyncingEngine<B: BlockT, Client> { @@ -196,6 +196,9 @@ pub struct SyncingEngine<B: BlockT, Client> { /// Channel for receiving service commands service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>, + /// Channel for receiving inbound connections from `Protocol`. + rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>, + /// Assigned roles. roles: Roles, @@ -266,6 +269,7 @@ where block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option<ProtocolName>, + rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>, ) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> { let mode = match network_config.sync_mode { SyncOperationMode::Full => SyncMode::Full, @@ -359,6 +363,7 @@ where num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, + rx, genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), @@ -566,11 +571,7 @@ where data: Some(data.clone()), }; - self.network_service.write_notification( - *who, - self.block_announce_protocol_name.clone(), - message.encode(), - ); + peer.sink.send_sync_notification(message.encode()); } } } @@ -587,17 +588,13 @@ where ) } - pub async fn run(mut self, mut stream: Pin<Box<dyn Stream<Item = Event> + Send>>) { + pub async fn run(mut self) { loop { - futures::future::poll_fn(|cx| self.poll(cx, &mut stream)).await; + futures::future::poll_fn(|cx| self.poll(cx)).await; } } - pub fn poll( - &mut self, - cx: &mut std::task::Context, - event_stream: &mut Pin<Box<dyn Stream<Item = Event> + Send>>, - ) -> Poll<()> { + pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.num_connected.store(self.peers.len(), Ordering::Relaxed); self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); @@ -607,84 +604,6 @@ where self.tick_timeout.reset(TICK_TIMEOUT); } - while let Poll::Ready(Some(event)) = event_stream.poll_next_unpin(cx) { - match event { - Event::NotificationStreamOpened { - remote, protocol, received_handshake, .. - } => { - if protocol != self.block_announce_protocol_name { - continue - } - - match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all( - &mut &received_handshake[..], - ) { - Ok(handshake) => { - if self.on_sync_peer_connected(remote, handshake).is_err() { - log::debug!( - target: "sync", - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - } - }, - Err(err) => { - log::debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - remote, - received_handshake, - err, - ); - self.network_service.report_peer(remote, rep::BAD_MESSAGE); - }, - } - }, - Event::NotificationStreamClosed { remote, protocol } => { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.on_sync_peer_disconnected(remote).is_err() { - log::trace!( - target: "sync", - "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", - remote - ); - } - }, - Event::NotificationsReceived { remote, messages } => { - for (protocol, message) in messages { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.peers.contains_key(&remote) { - if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { - self.push_block_announce_validation(remote, announce); - - // Make sure that the newly added block announce validation future - // was polled once to be registered in the task. - if let Poll::Ready(res) = - self.chain_sync.poll_block_announce_validation(cx) - { - self.process_block_announce_validation_result(res) - } - } else { - log::warn!(target: "sub-libp2p", "Failed to decode block announce"); - } - } else { - log::trace!( - target: "sync", - "Received sync for peer earlier refused by sync layer: {}", - remote - ); - } - } - }, - _ => {}, - } - } - while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { @@ -723,7 +642,9 @@ where ToServiceCommand::NewBestBlockImported(hash, number) => self.new_best_block_imported(hash, number), ToServiceCommand::Status(tx) => { - let _ = tx.send(self.chain_sync.status()); + let mut status = self.chain_sync.status(); + status.num_connected_peers = self.peers.len() as u32; + let _ = tx.send(status); }, ToServiceCommand::NumActivePeers(tx) => { let _ = tx.send(self.chain_sync.num_active_peers()); @@ -756,6 +677,70 @@ where } } + while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { + match event { + sc_network::SyncEvent::NotificationStreamOpened { + remote, + received_handshake, + sink, + tx, + } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: "sync", + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, + }, + sc_network::SyncEvent::NotificationStreamClosed { remote } => { + if self.on_sync_peer_disconnected(remote).is_err() { + log::trace!( + target: "sync", + "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", + remote + ); + } + }, + sc_network::SyncEvent::NotificationsReceived { remote, messages } => { + for message in messages { + if self.peers.contains_key(&remote) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { + self.push_block_announce_validation(remote, announce); + + // Make sure that the newly added block announce validation future + // was polled once to be registered in the task. + if let Poll::Ready(res) = + self.chain_sync.poll_block_announce_validation(cx) + { + self.process_block_announce_validation_result(res) + } + } else { + log::warn!(target: "sub-libp2p", "Failed to decode block announce"); + } + } else { + log::trace!( + target: "sync", + "Received sync for peer earlier refused by sync layer: {}", + remote + ); + } + } + }, + sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { + if let Some(peer) = self.peers.get_mut(&remote) { + peer.sink = sink; + } + }, + } + } + + // poll `ChainSync` last because of a block announcement was received through the + // event stream between `SyncingEngine` and `Protocol` and the validation finished + // right after it as queued, the resulting block request (if any) can be sent right away. while let Poll::Ready(result) = self.chain_sync.poll(cx) { self.process_block_announce_validation_result(result); } @@ -767,13 +752,13 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); - } else { - log::debug!(target: "sync", "{} disconnected", peer); - } - if self.peers.remove(&peer).is_some() { + if self.important_peers.contains(&peer) { + log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + } else { + log::debug!(target: "sync", "{} disconnected", peer); + } + self.chain_sync.peer_disconnected(&peer); self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams @@ -792,7 +777,8 @@ where pub fn on_sync_peer_connected( &mut self, who: PeerId, - status: BlockAnnouncesHandshake<B>, + status: &BlockAnnouncesHandshake<B>, + sink: NotificationsSink, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -804,8 +790,6 @@ where if status.genesis_hash != self.genesis_hash { self.network_service.report_peer(who, rep::GENESIS_MISMATCH); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); if self.important_peers.contains(&who) { log::error!( @@ -869,8 +853,6 @@ where this_peer_reserved_slot { log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -879,8 +861,6 @@ where { // Make sure that not all slots are occupied by light clients. log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -893,14 +873,13 @@ where known_blocks: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"), ), + sink, }; let req = if peer.info.roles.is_full() { match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); return Err(()) }, diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 45d14ffa7bb37cf50998c6dcdf9e42b95086206c..28959e7f9c8861bf51f0fdcb786c4744931f793e 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -523,6 +523,7 @@ where state: sync_state, best_seen_block, num_peers: self.peers.len() as u32, + num_connected_peers: 0u32, queued_blocks: self.queue_blocks.len() as u32, state_sync: self.state_sync.as_ref().map(|s| s.progress()), warp_sync: warp_sync_progress, diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 8368fa278712aaae1163018ff010f3c4896e77d5..9763feed5ea544bfaa5039ddabda97c88fadaad2 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -26,6 +26,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-network-light = { version = "0.10.0-dev", path = "../light" } sc-network-sync = { version = "0.10.0-dev", path = "../sync" } sc-service = { version = "0.10.0-dev", default-features = false, features = ["test-helpers"], path = "../../service" } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 75b8287b08dcff9f114230a76ebf653334583d0f..f85d6ed63c2478929ad9d5041e54bed5af84eb8a 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -55,8 +55,8 @@ use sc_network::{ }, request_responses::ProtocolConfig as RequestResponseConfig, types::ProtocolName, - Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo, - NetworkSyncForkRequest, NetworkWorker, + Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, + NetworkWorker, }; use sc_network_common::{ role::Roles, @@ -83,7 +83,7 @@ use sp_core::H256; use sp_runtime::{ codec::{Decode, Encode}, generic::BlockId, - traits::{Block as BlockT, Header as HeaderT, NumberFor}, + traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}, Justification, Justifications, }; use substrate_test_runtime_client::AccountKeyring; @@ -896,6 +896,7 @@ where let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -911,22 +912,26 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), + rx, ) .unwrap(); let sync_service_import_queue = Box::new(sync_service.clone()); let sync_service = Arc::new(sync_service.clone()); + let genesis_hash = + client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: Box::new(|f| { tokio::spawn(f); }), network_config, - chain: client.clone(), + genesis_hash, protocol_id, fork_id, metrics_registry: None, block_announce_config, + tx, request_response_protocol_configs: [ block_request_protocol_config, state_request_protocol_config, @@ -948,9 +953,8 @@ where import_queue.run(sync_service_import_queue).await; }); - let service = network.service().clone(); tokio::spawn(async move { - engine.run(service.event_stream("syncing")).await; + engine.run().await; }); self.mut_peers(move |peers| { diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index b1de2a91ebcc9b185cb00a1e3ba535d790a41f6b..5871860a7c4a6dccd4195b6b2708fc6ab100a425 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -34,7 +34,8 @@ use sc_network_sync::{ service::network::{NetworkServiceHandle, NetworkServiceProvider}, state_request_handler::StateRequestHandler, }; -use sp_runtime::traits::Block as BlockT; +use sp_blockchain::HeaderBackend; +use sp_runtime::traits::{Block as BlockT, Zero}; use substrate_test_runtime_client::{ runtime::{Block as TestBlock, Hash as TestHash}, TestClientBuilder, TestClientBuilderExt as _, @@ -176,6 +177,7 @@ impl TestNetworkBuilder { let (chain_sync_network_provider, chain_sync_network_handle) = self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -191,20 +193,23 @@ impl TestNetworkBuilder { block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), None, + rx, ) .unwrap(); let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone())); + let genesis_hash = + client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); let worker = NetworkWorker::< substrate_test_runtime_client::runtime::Block, substrate_test_runtime_client::runtime::Hash, - >::new(config::Params { + >::new(config::Params::<substrate_test_runtime_client::runtime::Block> { block_announce_config, role: config::Role::Full, executor: Box::new(|f| { tokio::spawn(f); }), + genesis_hash, network_config, - chain: client.clone(), protocol_id, fork_id, metrics_registry: None, @@ -214,6 +219,7 @@ impl TestNetworkBuilder { light_client_request_protocol_config, ] .to_vec(), + tx, }) .unwrap(); @@ -231,8 +237,7 @@ impl TestNetworkBuilder { tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }); - let stream = worker.service().event_stream("syncing"); - tokio::spawn(engine.run(stream)); + tokio::spawn(engine.run()); TestNetwork::new(worker) } diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index d87b03fb3a78cc4b8a70b485c61d5c2e9e36afb3..af46d15a2bacd70f60a3c548ca0e4648134de36e 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -414,7 +414,7 @@ async fn can_sync_small_non_best_forks() { // poll until the two nodes connect, otherwise announcing the block will not work futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); - if net.peer(0).num_peers() == 0 { + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { Poll::Pending } else { Poll::Ready(()) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 91ef65cf134e421001e11837fdf3db6bcdf96026..1d8cbae004eee1eefed92d14b0d5b104edb763f8 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -38,9 +38,7 @@ use sc_client_db::{Backend, DatabaseSettings}; use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; -use sc_network::{ - config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider, -}; +use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{role::Roles, sync::warp::WarpSyncParams}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -882,6 +880,7 @@ where protocol_config }; + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), @@ -897,6 +896,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), + rx, )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); @@ -907,7 +907,8 @@ where protocol_config })); - let mut network_params = sc_network::config::Params { + let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); + let mut network_params = sc_network::config::Params::<TBl> { role: config.role.clone(), executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -916,11 +917,12 @@ where }) }, network_config: config.network.clone(), - chain: client.clone(), + genesis_hash, protocol_id: protocol_id.clone(), fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, + tx, request_response_protocol_configs: request_response_protocol_configs .into_iter() .chain([ @@ -960,15 +962,13 @@ where )?; spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_blocking( "chain-sync-network-service-provider", Some("networking"), chain_sync_network_provider.run(network.clone()), ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue))); - - let event_stream = network.event_stream("syncing"); - spawn_handle.spawn("syncing", None, engine.run(event_stream)); + spawn_handle.spawn_blocking("syncing", None, engine.run()); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); spawn_handle.spawn( diff --git a/frame/nomination-pools/src/lib.rs b/frame/nomination-pools/src/lib.rs index e73d4b4173ecf585717d7d2d62af107b02c5ffca..cfde05ffeeabef6ca4d15b406d5e69ee8ad1f01c 100644 --- a/frame/nomination-pools/src/lib.rs +++ b/frame/nomination-pools/src/lib.rs @@ -1496,7 +1496,7 @@ pub mod pallet { use sp_runtime::Perbill; /// The current storage version. - const STORAGE_VERSION: StorageVersion = StorageVersion::new(4); + const STORAGE_VERSION: StorageVersion = StorageVersion::new(5); #[pallet::pallet] #[pallet::storage_version(STORAGE_VERSION)] diff --git a/frame/nomination-pools/src/migration.rs b/frame/nomination-pools/src/migration.rs index f6f166d7f5de2942d842150046a43ad067ec739d..45d6424119900b4049bc80f2e89c80278f3c9379 100644 --- a/frame/nomination-pools/src/migration.rs +++ b/frame/nomination-pools/src/migration.rs @@ -478,9 +478,22 @@ pub mod v4 { } } + /// Migrates from `v3` directly to `v5` to avoid the broken `v4` migration. + #[allow(deprecated)] + pub type MigrateV3ToV5<T, U> = (v4::MigrateToV4<T, U>, v5::MigrateToV5<T>); + + /// # Warning + /// + /// To avoid mangled storage please use `MigrateV3ToV5` instead. + /// See: github.com/paritytech/substrate/pull/13715 + /// /// This migration adds a `commission` field to every `BondedPoolInner`, if /// any. + #[deprecated( + note = "To avoid mangled storage please use `MigrateV3ToV5` instead. See: github.com/paritytech/substrate/pull/13715" + )] pub struct MigrateToV4<T, U>(sp_std::marker::PhantomData<(T, U)>); + #[allow(deprecated)] impl<T: Config, U: Get<Perbill>> OnRuntimeUpgrade for MigrateToV4<T, U> { fn on_runtime_upgrade() -> Weight { let current = Pallet::<T>::current_storage_version(); @@ -493,7 +506,8 @@ pub mod v4 { onchain ); - if current == 4 && onchain == 3 { + if onchain == 3 { + log!(warn, "Please run MigrateToV5 immediately after this migration. See github.com/paritytech/substrate/pull/13715"); let initial_global_max_commission = U::get(); GlobalMaxCommission::<T>::set(Some(initial_global_max_commission)); log!( @@ -508,7 +522,7 @@ pub mod v4 { Some(old_value.migrate_to_v4()) }); - current.put::<Pallet<T>>(); + StorageVersion::new(4).put::<Pallet<T>>(); log!(info, "Upgraded {} pools, storage to version {:?}", translated, current); // reads: translated + onchain version. @@ -548,3 +562,147 @@ pub mod v4 { } } } + +pub mod v5 { + use super::*; + + #[derive(Decode)] + pub struct OldRewardPool<T: Config> { + last_recorded_reward_counter: T::RewardCounter, + last_recorded_total_payouts: BalanceOf<T>, + total_rewards_claimed: BalanceOf<T>, + } + + impl<T: Config> OldRewardPool<T> { + fn migrate_to_v5(self) -> RewardPool<T> { + RewardPool { + last_recorded_reward_counter: self.last_recorded_reward_counter, + last_recorded_total_payouts: self.last_recorded_total_payouts, + total_rewards_claimed: self.total_rewards_claimed, + total_commission_pending: Zero::zero(), + total_commission_claimed: Zero::zero(), + } + } + } + + /// This migration adds `total_commission_pending` and `total_commission_claimed` field to every + /// `RewardPool`, if any. + pub struct MigrateToV5<T>(sp_std::marker::PhantomData<T>); + impl<T: Config> OnRuntimeUpgrade for MigrateToV5<T> { + fn on_runtime_upgrade() -> Weight { + let current = Pallet::<T>::current_storage_version(); + let onchain = Pallet::<T>::on_chain_storage_version(); + + log!( + info, + "Running migration with current storage version {:?} / onchain {:?}", + current, + onchain + ); + + if current == 5 && onchain == 4 { + let mut translated = 0u64; + RewardPools::<T>::translate::<OldRewardPool<T>, _>(|_id, old_value| { + translated.saturating_inc(); + Some(old_value.migrate_to_v5()) + }); + + current.put::<Pallet<T>>(); + log!(info, "Upgraded {} pools, storage to version {:?}", translated, current); + + // reads: translated + onchain version. + // writes: translated + current.put. + T::DbWeight::get().reads_writes(translated + 1, translated + 1) + } else { + log!(info, "Migration did not execute. This probably should be removed"); + T::DbWeight::get().reads(1) + } + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result<Vec<u8>, &'static str> { + ensure!( + Pallet::<T>::current_storage_version() > Pallet::<T>::on_chain_storage_version(), + "the on_chain version is equal or more than the current one" + ); + + let rpool_keys = RewardPools::<T>::iter_keys().count(); + let rpool_values = RewardPools::<T>::iter_values().count(); + if rpool_keys != rpool_values { + log!(info, "🔥 There are {} undecodable RewardPools in storage. This migration will try to correct them. keys: {}, values: {}", rpool_keys.saturating_sub(rpool_values), rpool_keys, rpool_values); + } + + ensure!( + PoolMembers::<T>::iter_keys().count() == PoolMembers::<T>::iter_values().count(), + "There are undecodable PoolMembers in storage. This migration will not fix that." + ); + ensure!( + BondedPools::<T>::iter_keys().count() == BondedPools::<T>::iter_values().count(), + "There are undecodable BondedPools in storage. This migration will not fix that." + ); + ensure!( + SubPoolsStorage::<T>::iter_keys().count() == + SubPoolsStorage::<T>::iter_values().count(), + "There are undecodable SubPools in storage. This migration will not fix that." + ); + ensure!( + Metadata::<T>::iter_keys().count() == Metadata::<T>::iter_values().count(), + "There are undecodable Metadata in storage. This migration will not fix that." + ); + + Ok((rpool_values as u64).encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(data: Vec<u8>) -> Result<(), &'static str> { + let old_rpool_values: u64 = Decode::decode(&mut &data[..]).unwrap(); + let rpool_keys = RewardPools::<T>::iter_keys().count() as u64; + let rpool_values = RewardPools::<T>::iter_values().count() as u64; + ensure!( + rpool_keys == rpool_values, + "There are STILL undecodable RewardPools - migration failed" + ); + + if old_rpool_values != rpool_values { + log!( + info, + "🎉 Fixed {} undecodable RewardPools.", + rpool_values.saturating_sub(old_rpool_values) + ); + } + + // ensure all RewardPools items now contain `total_commission_pending` and + // `total_commission_claimed` field. + ensure!( + RewardPools::<T>::iter().all(|(_, reward_pool)| reward_pool + .total_commission_pending + .is_zero() && reward_pool + .total_commission_claimed + .is_zero()), + "a commission value has been incorrectly set" + ); + ensure!(Pallet::<T>::on_chain_storage_version() == 5, "wrong storage version"); + + // These should not have been touched - just in case. + ensure!( + PoolMembers::<T>::iter_keys().count() == PoolMembers::<T>::iter_values().count(), + "There are undecodable PoolMembers in storage." + ); + ensure!( + BondedPools::<T>::iter_keys().count() == BondedPools::<T>::iter_values().count(), + "There are undecodable BondedPools in storage." + ); + ensure!( + SubPoolsStorage::<T>::iter_keys().count() == + SubPoolsStorage::<T>::iter_values().count(), + "There are undecodable SubPools in storage." + ); + ensure!( + Metadata::<T>::iter_keys().count() == Metadata::<T>::iter_values().count(), + "There are undecodable Metadata in storage." + ); + + Ok(()) + } + } +}