diff --git a/client/block-builder/src/lib.rs b/client/block-builder/src/lib.rs
index d97afadd40156a96842569bebaacfad365a22ba5..21f8e6fdd3999546f13877d7e40e345055bc961d 100644
--- a/client/block-builder/src/lib.rs
+++ b/client/block-builder/src/lib.rs
@@ -312,7 +312,9 @@ mod tests {
 	use sp_blockchain::HeaderBackend;
 	use sp_core::Blake2Hasher;
 	use sp_state_machine::Backend;
-	use substrate_test_runtime_client::{DefaultTestClientBuilderExt, TestClientBuilderExt};
+	use substrate_test_runtime_client::{
+		runtime::Extrinsic, DefaultTestClientBuilderExt, TestClientBuilderExt,
+	};
 
 	#[test]
 	fn block_building_storage_proof_does_not_include_runtime_by_default() {
@@ -345,4 +347,62 @@ mod tests {
 			.unwrap_err()
 			.contains("Database missing expected key"),);
 	}
+
+	#[test]
+	fn failing_extrinsic_rolls_back_changes_in_storage_proof() {
+		let builder = substrate_test_runtime_client::TestClientBuilder::new();
+		let backend = builder.backend();
+		let client = builder.build();
+
+		let mut block_builder = BlockBuilder::new(
+			&client,
+			client.info().best_hash,
+			client.info().best_number,
+			RecordProof::Yes,
+			Default::default(),
+			&*backend,
+		)
+		.unwrap();
+
+		block_builder.push(Extrinsic::ReadAndPanic(8)).unwrap_err();
+
+		let block = block_builder.build().unwrap();
+
+		let proof_with_panic = block.proof.expect("Proof is build on request").encoded_size();
+
+		let mut block_builder = BlockBuilder::new(
+			&client,
+			client.info().best_hash,
+			client.info().best_number,
+			RecordProof::Yes,
+			Default::default(),
+			&*backend,
+		)
+		.unwrap();
+
+		block_builder.push(Extrinsic::Read(8)).unwrap();
+
+		let block = block_builder.build().unwrap();
+
+		let proof_without_panic = block.proof.expect("Proof is build on request").encoded_size();
+
+		let block = BlockBuilder::new(
+			&client,
+			client.info().best_hash,
+			client.info().best_number,
+			RecordProof::Yes,
+			Default::default(),
+			&*backend,
+		)
+		.unwrap()
+		.build()
+		.unwrap();
+
+		let proof_empty_block = block.proof.expect("Proof is build on request").encoded_size();
+
+		// Ensure that we rolled back the changes of the panicked transaction.
+		assert!(proof_without_panic > proof_with_panic);
+		assert!(proof_without_panic > proof_empty_block);
+		assert_eq!(proof_empty_block, proof_with_panic);
+	}
 }
diff --git a/primitives/api/proc-macro/src/impl_runtime_apis.rs b/primitives/api/proc-macro/src/impl_runtime_apis.rs
index d0725ffd2ba54259c4645996ec6f10a391afdca4..0d265293ecf424d2c503c8d44cf074f77c29dbe7 100644
--- a/primitives/api/proc-macro/src/impl_runtime_apis.rs
+++ b/primitives/api/proc-macro/src/impl_runtime_apis.rs
@@ -243,7 +243,8 @@ fn generate_runtime_api_base_structures() -> Result<TokenStream> {
 				&self,
 				call: F,
 			) -> R where Self: Sized {
-				#crate_::OverlayedChanges::start_transaction(&mut std::cell::RefCell::borrow_mut(&self.changes));
+				self.start_transaction();
+
 				*std::cell::RefCell::borrow_mut(&self.commit_on_success) = false;
 				let res = call(self);
 				*std::cell::RefCell::borrow_mut(&self.commit_on_success) = true;
@@ -347,18 +348,51 @@ fn generate_runtime_api_base_structures() -> Result<TokenStream> {
 					transactions; qed";
 				if *std::cell::RefCell::borrow(&self.commit_on_success) {
 					let res = if commit {
-						#crate_::OverlayedChanges::commit_transaction(
+						let res = if let Some(recorder) = &self.recorder {
+							#crate_::ProofRecorder::<Block>::commit_transaction(&recorder)
+						} else {
+							Ok(())
+						};
+
+						let res2 = #crate_::OverlayedChanges::commit_transaction(
 							&mut std::cell::RefCell::borrow_mut(&self.changes)
-						)
+						);
+
+						// Will panic on an `Err` below, however we should call commit
+						// on the recorder and the changes together.
+						std::result::Result::and(res, std::result::Result::map_err(res2, drop))
 					} else {
-						#crate_::OverlayedChanges::rollback_transaction(
+						let res = if let Some(recorder) = &self.recorder {
+							#crate_::ProofRecorder::<Block>::rollback_transaction(&recorder)
+						} else {
+							Ok(())
+						};
+
+						let res2 = #crate_::OverlayedChanges::rollback_transaction(
 							&mut std::cell::RefCell::borrow_mut(&self.changes)
-						)
+						);
+
+						// Will panic on an `Err` below, however we should call commit
+						// on the recorder and the changes together.
+						std::result::Result::and(res, std::result::Result::map_err(res2, drop))
 					};
 
 					std::result::Result::expect(res, proof);
 				}
 			}
+
+			fn start_transaction(&self) {
+				if !*std::cell::RefCell::borrow(&self.commit_on_success) {
+					return
+				}
+
+				#crate_::OverlayedChanges::start_transaction(
+					&mut std::cell::RefCell::borrow_mut(&self.changes)
+				);
+				if let Some(recorder) = &self.recorder {
+					#crate_::ProofRecorder::<Block>::start_transaction(&recorder);
+				}
+			}
 		}
 	))
 }
@@ -450,11 +484,7 @@ impl<'a> ApiRuntimeImplToApiRuntimeApiImpl<'a> {
 				params: std::vec::Vec<u8>,
 				fn_name: &dyn Fn(#crate_::RuntimeVersion) -> &'static str,
 			) -> std::result::Result<std::vec::Vec<u8>, #crate_::ApiError> {
-				if *std::cell::RefCell::borrow(&self.commit_on_success) {
-					#crate_::OverlayedChanges::start_transaction(
-						&mut std::cell::RefCell::borrow_mut(&self.changes)
-					);
-				}
+				self.start_transaction();
 
 				let res = (|| {
 					let version = #crate_::CallApiAt::<__SrApiBlock__>::runtime_version_at(
diff --git a/primitives/trie/src/recorder.rs b/primitives/trie/src/recorder.rs
index 3bdfda01532cce739514607d6a49ffc4e0ff781f..728dc836205b57f9d9ce7b8fd8190c0ef8eac681 100644
--- a/primitives/trie/src/recorder.rs
+++ b/primitives/trie/src/recorder.rs
@@ -25,7 +25,7 @@ use codec::Encode;
 use hash_db::Hasher;
 use parking_lot::Mutex;
 use std::{
-	collections::HashMap,
+	collections::{HashMap, HashSet},
 	marker::PhantomData,
 	mem,
 	ops::DerefMut,
@@ -38,17 +38,43 @@ use trie_db::{RecordedForKey, TrieAccess};
 
 const LOG_TARGET: &str = "trie-recorder";
 
+/// Stores all the information per transaction.
+#[derive(Default)]
+struct Transaction<H> {
+	/// Stores transaction information about [`RecorderInner::recorded_keys`].
+	///
+	/// For each transaction we only store the `storage_root` and the old states per key. `None`
+	/// state means that the key wasn't recorded before.
+	recorded_keys: HashMap<H, HashMap<Arc<[u8]>, Option<RecordedForKey>>>,
+	/// Stores transaction information about [`RecorderInner::accessed_nodes`].
+	///
+	/// For each transaction we only store the hashes of added nodes.
+	accessed_nodes: HashSet<H>,
+}
+
 /// The internals of [`Recorder`].
 struct RecorderInner<H> {
 	/// The keys for that we have recorded the trie nodes and if we have recorded up to the value.
-	recorded_keys: HashMap<H, HashMap<Vec<u8>, RecordedForKey>>,
+	///
+	/// Mapping: `StorageRoot -> (Key -> RecordedForKey)`.
+	recorded_keys: HashMap<H, HashMap<Arc<[u8]>, RecordedForKey>>,
+
+	/// Currently active transactions.
+	transactions: Vec<Transaction<H>>,
+
 	/// The encoded nodes we accessed while recording.
+	///
+	/// Mapping: `Hash(Node) -> Node`.
 	accessed_nodes: HashMap<H, Vec<u8>>,
 }
 
 impl<H> Default for RecorderInner<H> {
 	fn default() -> Self {
-		Self { recorded_keys: Default::default(), accessed_nodes: Default::default() }
+		Self {
+			recorded_keys: Default::default(),
+			accessed_nodes: Default::default(),
+			transactions: Vec::new(),
+		}
 	}
 }
 
@@ -83,6 +109,8 @@ impl<H: Hasher> Recorder<H> {
 	///
 	/// - `storage_root`: The storage root of the trie for which accesses are recorded. This is
 	///   important when recording access to different tries at once (like top and child tries).
+	///
+	/// NOTE: This locks a mutex that stays locked until the return value is dropped.
 	#[inline]
 	pub fn as_trie_recorder(
 		&self,
@@ -135,6 +163,72 @@ impl<H: Hasher> Recorder<H> {
 		mem::take(&mut *self.inner.lock());
 		self.encoded_size_estimation.store(0, Ordering::Relaxed);
 	}
+
+	/// Start a new transaction.
+	pub fn start_transaction(&self) {
+		let mut inner = self.inner.lock();
+		inner.transactions.push(Default::default());
+	}
+
+	/// Rollback the latest transaction.
+	///
+	/// Returns an error if there wasn't any active transaction.
+	pub fn rollback_transaction(&self) -> Result<(), ()> {
+		let mut inner = self.inner.lock();
+
+		// We locked `inner` and can just update the encoded size locally and then store it back to
+		// the atomic.
+		let mut new_encoded_size_estimation = self.encoded_size_estimation.load(Ordering::Relaxed);
+		let transaction = inner.transactions.pop().ok_or(())?;
+
+		transaction.accessed_nodes.into_iter().for_each(|n| {
+			if let Some(old) = inner.accessed_nodes.remove(&n) {
+				new_encoded_size_estimation =
+					new_encoded_size_estimation.saturating_sub(old.encoded_size());
+			}
+		});
+
+		transaction.recorded_keys.into_iter().for_each(|(storage_root, keys)| {
+			keys.into_iter().for_each(|(k, old_state)| {
+				if let Some(state) = old_state {
+					inner.recorded_keys.entry(storage_root).or_default().insert(k, state);
+				} else {
+					inner.recorded_keys.entry(storage_root).or_default().remove(&k);
+				}
+			});
+		});
+
+		self.encoded_size_estimation
+			.store(new_encoded_size_estimation, Ordering::Relaxed);
+
+		Ok(())
+	}
+
+	/// Commit the latest transaction.
+	///
+	/// Returns an error if there wasn't any active transaction.
+	pub fn commit_transaction(&self) -> Result<(), ()> {
+		let mut inner = self.inner.lock();
+
+		let transaction = inner.transactions.pop().ok_or(())?;
+
+		if let Some(parent_transaction) = inner.transactions.last_mut() {
+			parent_transaction.accessed_nodes.extend(transaction.accessed_nodes);
+
+			transaction.recorded_keys.into_iter().for_each(|(storage_root, keys)| {
+				keys.into_iter().for_each(|(k, old_state)| {
+					parent_transaction
+						.recorded_keys
+						.entry(storage_root)
+						.or_default()
+						.entry(k)
+						.or_insert(old_state);
+				})
+			});
+		}
+
+		Ok(())
+	}
 }
 
 /// The [`TrieRecorder`](trie_db::TrieRecorder) implementation.
@@ -145,6 +239,50 @@ struct TrieRecorder<H: Hasher, I> {
 	_phantom: PhantomData<H>,
 }
 
+impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> TrieRecorder<H, I> {
+	/// Update the recorded keys entry for the given `full_key`.
+	fn update_recorded_keys(&mut self, full_key: &[u8], access: RecordedForKey) {
+		let inner = self.inner.deref_mut();
+
+		let entry =
+			inner.recorded_keys.entry(self.storage_root).or_default().entry(full_key.into());
+
+		let key = entry.key().clone();
+
+		// We don't need to update the record if we only accessed the `Hash` for the given
+		// `full_key`. Only `Value` access can be an upgrade from `Hash`.
+		let entry = if matches!(access, RecordedForKey::Value) {
+			entry.and_modify(|e| {
+				if let Some(tx) = inner.transactions.last_mut() {
+					// Store the previous state only once per transaction.
+					tx.recorded_keys
+						.entry(self.storage_root)
+						.or_default()
+						.entry(key.clone())
+						.or_insert(Some(*e));
+				}
+
+				*e = access;
+			})
+		} else {
+			entry
+		};
+
+		entry.or_insert_with(|| {
+			if let Some(tx) = inner.transactions.last_mut() {
+				// The key wasn't yet recorded, so there isn't any old state.
+				tx.recorded_keys
+					.entry(self.storage_root)
+					.or_default()
+					.entry(key)
+					.or_insert(None);
+			}
+
+			access
+		});
+	}
+}
+
 impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecorder<H::Out>
 	for TrieRecorder<H, I>
 {
@@ -159,11 +297,17 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 					"Recording node",
 				);
 
-				self.inner.accessed_nodes.entry(hash).or_insert_with(|| {
+				let inner = self.inner.deref_mut();
+
+				inner.accessed_nodes.entry(hash).or_insert_with(|| {
 					let node = node_owned.to_encoded::<NodeCodec<H>>();
 
 					encoded_size_update += node.encoded_size();
 
+					if let Some(tx) = inner.transactions.last_mut() {
+						tx.accessed_nodes.insert(hash);
+					}
+
 					node
 				});
 			},
@@ -174,11 +318,17 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 					"Recording node",
 				);
 
-				self.inner.accessed_nodes.entry(hash).or_insert_with(|| {
+				let inner = self.inner.deref_mut();
+
+				inner.accessed_nodes.entry(hash).or_insert_with(|| {
 					let node = encoded_node.into_owned();
 
 					encoded_size_update += node.encoded_size();
 
+					if let Some(tx) = inner.transactions.last_mut() {
+						tx.accessed_nodes.insert(hash);
+					}
+
 					node
 				});
 			},
@@ -190,21 +340,21 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 					"Recording value",
 				);
 
-				self.inner.accessed_nodes.entry(hash).or_insert_with(|| {
+				let inner = self.inner.deref_mut();
+
+				inner.accessed_nodes.entry(hash).or_insert_with(|| {
 					let value = value.into_owned();
 
 					encoded_size_update += value.encoded_size();
 
+					if let Some(tx) = inner.transactions.last_mut() {
+						tx.accessed_nodes.insert(hash);
+					}
+
 					value
 				});
 
-				self.inner
-					.recorded_keys
-					.entry(self.storage_root)
-					.or_default()
-					.entry(full_key.to_vec())
-					.and_modify(|e| *e = RecordedForKey::Value)
-					.or_insert(RecordedForKey::Value);
+				self.update_recorded_keys(full_key, RecordedForKey::Value);
 			},
 			TrieAccess::Hash { full_key } => {
 				tracing::trace!(
@@ -215,12 +365,7 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 
 				// We don't need to update the `encoded_size_update` as the hash was already
 				// accounted for by the recorded node that holds the hash.
-				self.inner
-					.recorded_keys
-					.entry(self.storage_root)
-					.or_default()
-					.entry(full_key.to_vec())
-					.or_insert(RecordedForKey::Hash);
+				self.update_recorded_keys(full_key, RecordedForKey::Hash);
 			},
 			TrieAccess::NonExisting { full_key } => {
 				tracing::trace!(
@@ -232,13 +377,7 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 				// Non-existing access means we recorded all trie nodes up to the value.
 				// Not the actual value, as it doesn't exist, but all trie nodes to know
 				// that the value doesn't exist in the trie.
-				self.inner
-					.recorded_keys
-					.entry(self.storage_root)
-					.or_default()
-					.entry(full_key.to_vec())
-					.and_modify(|e| *e = RecordedForKey::Value)
-					.or_insert(RecordedForKey::Value);
+				self.update_recorded_keys(full_key, RecordedForKey::Value);
 			},
 		};
 
@@ -256,14 +395,15 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecord
 
 #[cfg(test)]
 mod tests {
-	use trie_db::{Trie, TrieDBBuilder, TrieDBMutBuilder, TrieHash, TrieMut};
+	use super::*;
+	use trie_db::{Trie, TrieDBBuilder, TrieDBMutBuilder, TrieHash, TrieMut, TrieRecorder};
 
 	type MemoryDB = crate::MemoryDB<sp_core::Blake2Hasher>;
 	type Layout = crate::LayoutV1<sp_core::Blake2Hasher>;
 	type Recorder = super::Recorder<sp_core::Blake2Hasher>;
 
 	const TEST_DATA: &[(&[u8], &[u8])] =
-		&[(b"key1", b"val1"), (b"key2", b"val2"), (b"key3", b"val3"), (b"key4", b"val4")];
+		&[(b"key1", &[1; 64]), (b"key2", &[2; 64]), (b"key3", &[3; 64]), (b"key4", &[4; 64])];
 
 	fn create_trie() -> (MemoryDB, TrieHash<Layout>) {
 		let mut db = MemoryDB::default();
@@ -300,4 +440,271 @@ mod tests {
 		let trie = TrieDBBuilder::<Layout>::new(&memory_db, &root).build();
 		assert_eq!(TEST_DATA[0].1.to_vec(), trie.get(TEST_DATA[0].0).unwrap().unwrap());
 	}
+
+	#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+	struct RecorderStats {
+		accessed_nodes: usize,
+		recorded_keys: usize,
+		estimated_size: usize,
+	}
+
+	impl RecorderStats {
+		fn extract(recorder: &Recorder) -> Self {
+			let inner = recorder.inner.lock();
+
+			let recorded_keys =
+				inner.recorded_keys.iter().flat_map(|(_, keys)| keys.keys()).count();
+
+			Self {
+				recorded_keys,
+				accessed_nodes: inner.accessed_nodes.len(),
+				estimated_size: recorder.estimate_encoded_size(),
+			}
+		}
+	}
+
+	#[test]
+	fn recorder_transactions_rollback_work() {
+		let (db, root) = create_trie();
+
+		let recorder = Recorder::default();
+		let mut stats = vec![RecorderStats::default()];
+
+		for i in 0..4 {
+			recorder.start_transaction();
+			{
+				let mut trie_recorder = recorder.as_trie_recorder(root);
+				let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+					.with_recorder(&mut trie_recorder)
+					.build();
+
+				assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+			}
+			stats.push(RecorderStats::extract(&recorder));
+		}
+
+		assert_eq!(4, recorder.inner.lock().transactions.len());
+
+		for i in 0..5 {
+			assert_eq!(stats[4 - i], RecorderStats::extract(&recorder));
+
+			let storage_proof = recorder.to_storage_proof();
+			let memory_db: MemoryDB = storage_proof.into_memory_db();
+
+			// Check that we recorded the required data
+			let trie = TrieDBBuilder::<Layout>::new(&memory_db, &root).build();
+
+			// Check that the required data is still present.
+			for a in 0..4 {
+				if a < 4 - i {
+					assert_eq!(TEST_DATA[a].1.to_vec(), trie.get(TEST_DATA[a].0).unwrap().unwrap());
+				} else {
+					// All the data that we already rolled back, should be gone!
+					assert!(trie.get(TEST_DATA[a].0).is_err());
+				}
+			}
+
+			if i < 4 {
+				recorder.rollback_transaction().unwrap();
+			}
+		}
+
+		assert_eq!(0, recorder.inner.lock().transactions.len());
+	}
+
+	#[test]
+	fn recorder_transactions_commit_work() {
+		let (db, root) = create_trie();
+
+		let recorder = Recorder::default();
+
+		for i in 0..4 {
+			recorder.start_transaction();
+			{
+				let mut trie_recorder = recorder.as_trie_recorder(root);
+				let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+					.with_recorder(&mut trie_recorder)
+					.build();
+
+				assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+			}
+		}
+
+		let stats = RecorderStats::extract(&recorder);
+		assert_eq!(4, recorder.inner.lock().transactions.len());
+
+		for _ in 0..4 {
+			recorder.commit_transaction().unwrap();
+		}
+		assert_eq!(0, recorder.inner.lock().transactions.len());
+		assert_eq!(stats, RecorderStats::extract(&recorder));
+
+		let storage_proof = recorder.to_storage_proof();
+		let memory_db: MemoryDB = storage_proof.into_memory_db();
+
+		// Check that we recorded the required data
+		let trie = TrieDBBuilder::<Layout>::new(&memory_db, &root).build();
+
+		// Check that the required data is still present.
+		for i in 0..4 {
+			assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+		}
+	}
+
+	#[test]
+	fn recorder_transactions_commit_and_rollback_work() {
+		let (db, root) = create_trie();
+
+		let recorder = Recorder::default();
+
+		for i in 0..2 {
+			recorder.start_transaction();
+			{
+				let mut trie_recorder = recorder.as_trie_recorder(root);
+				let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+					.with_recorder(&mut trie_recorder)
+					.build();
+
+				assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+			}
+		}
+
+		recorder.rollback_transaction().unwrap();
+
+		for i in 2..4 {
+			recorder.start_transaction();
+			{
+				let mut trie_recorder = recorder.as_trie_recorder(root);
+				let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+					.with_recorder(&mut trie_recorder)
+					.build();
+
+				assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+			}
+		}
+
+		recorder.rollback_transaction().unwrap();
+
+		assert_eq!(2, recorder.inner.lock().transactions.len());
+
+		for _ in 0..2 {
+			recorder.commit_transaction().unwrap();
+		}
+
+		assert_eq!(0, recorder.inner.lock().transactions.len());
+
+		let storage_proof = recorder.to_storage_proof();
+		let memory_db: MemoryDB = storage_proof.into_memory_db();
+
+		// Check that we recorded the required data
+		let trie = TrieDBBuilder::<Layout>::new(&memory_db, &root).build();
+
+		// Check that the required data is still present.
+		for i in 0..4 {
+			if i % 2 == 0 {
+				assert_eq!(TEST_DATA[i].1.to_vec(), trie.get(TEST_DATA[i].0).unwrap().unwrap());
+			} else {
+				assert!(trie.get(TEST_DATA[i].0).is_err());
+			}
+		}
+	}
+
+	#[test]
+	fn recorder_transaction_accessed_keys_works() {
+		let key = TEST_DATA[0].0;
+		let (db, root) = create_trie();
+
+		let recorder = Recorder::default();
+
+		{
+			let trie_recorder = recorder.as_trie_recorder(root);
+			assert!(matches!(trie_recorder.trie_nodes_recorded_for_key(key), RecordedForKey::None));
+		}
+
+		recorder.start_transaction();
+		{
+			let mut trie_recorder = recorder.as_trie_recorder(root);
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut trie_recorder)
+				.build();
+
+			assert_eq!(
+				sp_core::Blake2Hasher::hash(TEST_DATA[0].1),
+				trie.get_hash(TEST_DATA[0].0).unwrap().unwrap()
+			);
+			assert!(matches!(trie_recorder.trie_nodes_recorded_for_key(key), RecordedForKey::Hash));
+		}
+
+		recorder.start_transaction();
+		{
+			let mut trie_recorder = recorder.as_trie_recorder(root);
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut trie_recorder)
+				.build();
+
+			assert_eq!(TEST_DATA[0].1.to_vec(), trie.get(TEST_DATA[0].0).unwrap().unwrap());
+			assert!(matches!(
+				trie_recorder.trie_nodes_recorded_for_key(key),
+				RecordedForKey::Value,
+			));
+		}
+
+		recorder.rollback_transaction().unwrap();
+		{
+			let trie_recorder = recorder.as_trie_recorder(root);
+			assert!(matches!(trie_recorder.trie_nodes_recorded_for_key(key), RecordedForKey::Hash));
+		}
+
+		recorder.rollback_transaction().unwrap();
+		{
+			let trie_recorder = recorder.as_trie_recorder(root);
+			assert!(matches!(trie_recorder.trie_nodes_recorded_for_key(key), RecordedForKey::None));
+		}
+
+		recorder.start_transaction();
+		{
+			let mut trie_recorder = recorder.as_trie_recorder(root);
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut trie_recorder)
+				.build();
+
+			assert_eq!(TEST_DATA[0].1.to_vec(), trie.get(TEST_DATA[0].0).unwrap().unwrap());
+			assert!(matches!(
+				trie_recorder.trie_nodes_recorded_for_key(key),
+				RecordedForKey::Value,
+			));
+		}
+
+		recorder.start_transaction();
+		{
+			let mut trie_recorder = recorder.as_trie_recorder(root);
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut trie_recorder)
+				.build();
+
+			assert_eq!(
+				sp_core::Blake2Hasher::hash(TEST_DATA[0].1),
+				trie.get_hash(TEST_DATA[0].0).unwrap().unwrap()
+			);
+			assert!(matches!(
+				trie_recorder.trie_nodes_recorded_for_key(key),
+				RecordedForKey::Value
+			));
+		}
+
+		recorder.rollback_transaction().unwrap();
+		{
+			let trie_recorder = recorder.as_trie_recorder(root);
+			assert!(matches!(
+				trie_recorder.trie_nodes_recorded_for_key(key),
+				RecordedForKey::Value
+			));
+		}
+
+		recorder.rollback_transaction().unwrap();
+		{
+			let trie_recorder = recorder.as_trie_recorder(root);
+			assert!(matches!(trie_recorder.trie_nodes_recorded_for_key(key), RecordedForKey::None));
+		}
+	}
 }
diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs
index c9a0ac04d63ba5a9d4419d5a011f49c20309fa25..b5600843c274997781dbec25aff42d85dc2157be 100644
--- a/test-utils/runtime/src/lib.rs
+++ b/test-utils/runtime/src/lib.rs
@@ -164,13 +164,21 @@ pub enum Extrinsic {
 	OffchainIndexSet(Vec<u8>, Vec<u8>),
 	OffchainIndexClear(Vec<u8>),
 	Store(Vec<u8>),
+	/// Read X times from the state some data and then panic!
+	///
+	/// Returns `Ok` if it didn't read anything.
+	ReadAndPanic(u32),
+	/// Read X times from the state some data.
+	///
+	/// Panics if it can not read `X` times.
+	Read(u32),
 }
 
 #[cfg(feature = "std")]
 impl serde::Serialize for Extrinsic {
 	fn serialize<S>(&self, seq: S) -> Result<S::Ok, S::Error>
 	where
-		S: ::serde::Serializer,
+		S: serde::Serializer,
 	{
 		self.using_encoded(|bytes| seq.serialize_bytes(bytes))
 	}
@@ -210,6 +218,8 @@ impl BlindCheckable for Extrinsic {
 			Extrinsic::OffchainIndexSet(key, value) => Ok(Extrinsic::OffchainIndexSet(key, value)),
 			Extrinsic::OffchainIndexClear(key) => Ok(Extrinsic::OffchainIndexClear(key)),
 			Extrinsic::Store(data) => Ok(Extrinsic::Store(data)),
+			Extrinsic::ReadAndPanic(i) => Ok(Extrinsic::ReadAndPanic(i)),
+			Extrinsic::Read(i) => Ok(Extrinsic::Read(i)),
 		}
 	}
 }
diff --git a/test-utils/runtime/src/system.rs b/test-utils/runtime/src/system.rs
index 12ebf486bb1b99e7a50fb8c7228632de7941a928..fc750531529b685fb18d95504209010e2075feae 100644
--- a/test-utils/runtime/src/system.rs
+++ b/test-utils/runtime/src/system.rs
@@ -275,6 +275,32 @@ fn execute_transaction_backend(utx: &Extrinsic, extrinsic_index: u32) -> ApplyEx
 			Ok(Ok(()))
 		},
 		Extrinsic::Store(data) => execute_store(data.clone()),
+		Extrinsic::ReadAndPanic(i) => execute_read(*i, true),
+		Extrinsic::Read(i) => execute_read(*i, false),
+	}
+}
+
+fn execute_read(read: u32, panic_at_end: bool) -> ApplyExtrinsicResult {
+	let mut next_key = vec![];
+	for _ in 0..(read as usize) {
+		if let Some(next) = sp_io::storage::next_key(&next_key) {
+			// Read the value
+			sp_io::storage::get(&next);
+
+			next_key = next;
+		} else {
+			if panic_at_end {
+				return Ok(Ok(()))
+			} else {
+				panic!("Could not read {read} times from the state");
+			}
+		}
+	}
+
+	if panic_at_end {
+		panic!("BYE")
+	} else {
+		Ok(Ok(()))
 	}
 }