diff --git a/Examples/xx-messenger/Sources/AppFeature/AppEnvironment+Live.swift b/Examples/xx-messenger/Sources/AppFeature/AppEnvironment+Live.swift
index e307596b1edf9b23aeaa81ff3794d704d62b3f6a..93a844439a8eb591ee0b8fa6c2361a56402b3d57 100644
--- a/Examples/xx-messenger/Sources/AppFeature/AppEnvironment+Live.swift
+++ b/Examples/xx-messenger/Sources/AppFeature/AppEnvironment+Live.swift
@@ -96,6 +96,10 @@ extension AppEnvironment {
           messenger: messenger,
           dbManager: dbManager,
           authHandler: authHandler,
+          messageListener: .live(
+            messenger: messenger,
+            db: dbManager.getDB
+          ),
           mainQueue: mainQueue,
           bgQueue: bgQueue,
           register: {
diff --git a/Examples/xx-messenger/Sources/HomeFeature/HomeFeature.swift b/Examples/xx-messenger/Sources/HomeFeature/HomeFeature.swift
index ca1766676746386dbf634cedfe6ef8df774fdd3a..894c5aca32ca1170d0acd47ae89a8242f2e73cb8 100644
--- a/Examples/xx-messenger/Sources/HomeFeature/HomeFeature.swift
+++ b/Examples/xx-messenger/Sources/HomeFeature/HomeFeature.swift
@@ -15,6 +15,7 @@ public struct HomeState: Equatable {
   public init(
     failure: String? = nil,
     authFailure: String? = nil,
+    messageListenerFailure: String? = nil,
     isNetworkHealthy: Bool? = nil,
     networkNodesReport: NodeRegistrationReport? = nil,
     isDeletingAccount: Bool = false,
@@ -25,6 +26,7 @@ public struct HomeState: Equatable {
   ) {
     self.failure = failure
     self.authFailure = authFailure
+    self.messageListenerFailure = messageListenerFailure
     self.isNetworkHealthy = isNetworkHealthy
     self.isDeletingAccount = isDeletingAccount
     self.alert = alert
@@ -35,6 +37,7 @@ public struct HomeState: Equatable {
 
   public var failure: String?
   public var authFailure: String?
+  public var messageListenerFailure: String?
   public var isNetworkHealthy: Bool?
   public var networkNodesReport: NodeRegistrationReport?
   public var isDeletingAccount: Bool
@@ -59,6 +62,13 @@ public enum HomeAction: Equatable {
     case failureDismissed
   }
 
+  public enum MessageListener: Equatable {
+    case start
+    case stop
+    case failure(NSError)
+    case failureDismissed
+  }
+
   public enum NetworkMonitor: Equatable {
     case start
     case stop
@@ -75,6 +85,7 @@ public enum HomeAction: Equatable {
 
   case messenger(Messenger)
   case authHandler(AuthHandler)
+  case messageListener(MessageListener)
   case networkMonitor(NetworkMonitor)
   case deleteAccount(DeleteAccount)
   case didDismissAlert
@@ -93,6 +104,7 @@ public struct HomeEnvironment {
     messenger: Messenger,
     dbManager: DBManager,
     authHandler: AuthCallbackHandler,
+    messageListener: MessageListenerHandler,
     mainQueue: AnySchedulerOf<DispatchQueue>,
     bgQueue: AnySchedulerOf<DispatchQueue>,
     register: @escaping () -> RegisterEnvironment,
@@ -102,6 +114,7 @@ public struct HomeEnvironment {
     self.messenger = messenger
     self.dbManager = dbManager
     self.authHandler = authHandler
+    self.messageListener = messageListener
     self.mainQueue = mainQueue
     self.bgQueue = bgQueue
     self.register = register
@@ -112,6 +125,7 @@ public struct HomeEnvironment {
   public var messenger: Messenger
   public var dbManager: DBManager
   public var authHandler: AuthCallbackHandler
+  public var messageListener: MessageListenerHandler
   public var mainQueue: AnySchedulerOf<DispatchQueue>
   public var bgQueue: AnySchedulerOf<DispatchQueue>
   public var register: () -> RegisterEnvironment
@@ -124,6 +138,7 @@ extension HomeEnvironment {
     messenger: .unimplemented,
     dbManager: .unimplemented,
     authHandler: .unimplemented,
+    messageListener: .unimplemented,
     mainQueue: .unimplemented,
     bgQueue: .unimplemented,
     register: { .unimplemented },
@@ -137,11 +152,13 @@ public let homeReducer = Reducer<HomeState, HomeAction, HomeEnvironment>
   enum NetworkHealthEffectId {}
   enum NetworkNodesEffectId {}
   enum AuthCallbacksEffectId {}
+  enum MessageListenerEffectId {}
 
   switch action {
   case .messenger(.start):
     return .merge(
       Effect(value: .authHandler(.start)),
+      Effect(value: .messageListener(.start)),
       Effect(value: .networkMonitor(.stop)),
       Effect.result {
         do {
@@ -203,6 +220,29 @@ public let homeReducer = Reducer<HomeState, HomeAction, HomeEnvironment>
     state.authFailure = nil
     return .none
 
+  case .messageListener(.start):
+    return Effect.run { subscriber in
+      let cancellable = env.messageListener(onError: { error in
+        subscriber.send(.messageListener(.failure(error as NSError)))
+      })
+      return AnyCancellable { cancellable.cancel() }
+    }
+    .subscribe(on: env.bgQueue)
+    .receive(on: env.mainQueue)
+    .eraseToEffect()
+    .cancellable(id: MessageListenerEffectId.self, cancelInFlight: true)
+
+  case .messageListener(.stop):
+    return .cancel(id: MessageListenerEffectId.self)
+
+  case .messageListener(.failure(let error)):
+    state.messageListenerFailure = error.localizedDescription
+    return .none
+
+  case .messageListener(.failureDismissed):
+    state.messageListenerFailure = nil
+    return .none
+
   case .networkMonitor(.start):
     return .merge(
       Effect.run { subscriber in
diff --git a/Examples/xx-messenger/Sources/HomeFeature/HomeView.swift b/Examples/xx-messenger/Sources/HomeFeature/HomeView.swift
index f6a964896e6e67544e693d83cb26378756414b86..8cd7259b6c57a7054b1bae658df8772b0a7efa3d 100644
--- a/Examples/xx-messenger/Sources/HomeFeature/HomeView.swift
+++ b/Examples/xx-messenger/Sources/HomeFeature/HomeView.swift
@@ -16,6 +16,7 @@ public struct HomeView: View {
   struct ViewState: Equatable {
     var failure: String?
     var authFailure: String?
+    var messageListenerFailure: String?
     var isNetworkHealthy: Bool?
     var networkNodesReport: NodeRegistrationReport?
     var isDeletingAccount: Bool
@@ -23,6 +24,7 @@ public struct HomeView: View {
     init(state: HomeState) {
       failure = state.failure
       authFailure = state.authFailure
+      messageListenerFailure = state.messageListenerFailure
       isNetworkHealthy = state.isNetworkHealthy
       isDeletingAccount = state.isDeletingAccount
       networkNodesReport = state.networkNodesReport
@@ -59,6 +61,19 @@ public struct HomeView: View {
             }
           }
 
+          if let messageListenerFailure = viewStore.messageListenerFailure {
+            Section {
+              Text(messageListenerFailure)
+              Button {
+                viewStore.send(.messageListener(.failureDismissed))
+              } label: {
+                Text("Dismiss")
+              }
+            } header: {
+              Text("Message Listener Error")
+            }
+          }
+
           Section {
             HStack {
               Text("Health")
diff --git a/Examples/xx-messenger/Tests/HomeFeatureTests/HomeFeatureTests.swift b/Examples/xx-messenger/Tests/HomeFeatureTests/HomeFeatureTests.swift
index b125f11db13fa3d69c766956ba026dfd084e82d6..fc671b1ec7406823bd903d07307c8146e948ed39 100644
--- a/Examples/xx-messenger/Tests/HomeFeatureTests/HomeFeatureTests.swift
+++ b/Examples/xx-messenger/Tests/HomeFeatureTests/HomeFeatureTests.swift
@@ -24,6 +24,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { messengerDidStartWithTimeout.append($0) }
     store.environment.messenger.isConnected.run = { false }
     store.environment.messenger.connect.run = { messengerDidConnect += 1 }
@@ -38,12 +39,14 @@ final class HomeFeatureTests: XCTestCase {
     XCTAssertNoDifference(messengerDidListenForMessages, 1)
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.didStartUnregistered)) {
       $0.register = RegisterState()
     }
 
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testMessengerStartRegistered() {
@@ -61,6 +64,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { messengerDidStartWithTimeout.append($0) }
     store.environment.messenger.isConnected.run = { false }
     store.environment.messenger.connect.run = { messengerDidConnect += 1 }
@@ -86,12 +90,14 @@ final class HomeFeatureTests: XCTestCase {
     XCTAssertNoDifference(messengerDidLogIn, 1)
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.didStartRegistered))
     store.receive(.networkMonitor(.start))
 
     store.send(.networkMonitor(.stop))
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testRegisterFinished() {
@@ -109,6 +115,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { messengerDidStartWithTimeout.append($0) }
     store.environment.messenger.isConnected.run = { true }
     store.environment.messenger.isLoggedIn.run = { false }
@@ -134,12 +141,14 @@ final class HomeFeatureTests: XCTestCase {
     XCTAssertNoDifference(messengerDidLogIn, 1)
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.didStartRegistered))
     store.receive(.networkMonitor(.start))
 
     store.send(.networkMonitor(.stop))
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testMessengerStartFailure() {
@@ -155,17 +164,20 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { _ in throw error }
 
     store.send(.messenger(.start))
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.failure(error as NSError))) {
       $0.failure = error.localizedDescription
     }
 
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testMessengerStartConnectFailure() {
@@ -181,6 +193,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { _ in }
     store.environment.messenger.isConnected.run = { false }
     store.environment.messenger.connect.run = { throw error }
@@ -188,12 +201,14 @@ final class HomeFeatureTests: XCTestCase {
     store.send(.messenger(.start))
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.failure(error as NSError))) {
       $0.failure = error.localizedDescription
     }
 
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testMessengerStartIsRegisteredFailure() {
@@ -209,6 +224,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { _ in }
     store.environment.messenger.isConnected.run = { true }
     store.environment.messenger.isLoggedIn.run = { false }
@@ -217,12 +233,14 @@ final class HomeFeatureTests: XCTestCase {
     store.send(.messenger(.start))
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.failure(error as NSError))) {
       $0.failure = error.localizedDescription
     }
 
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testMessengerStartLogInFailure() {
@@ -238,6 +256,7 @@ final class HomeFeatureTests: XCTestCase {
     store.environment.bgQueue = .immediate
     store.environment.mainQueue = .immediate
     store.environment.authHandler.run = { _ in Cancellable {} }
+    store.environment.messageListener.run = { _ in Cancellable {} }
     store.environment.messenger.start.run = { _ in }
     store.environment.messenger.isConnected.run = { true }
     store.environment.messenger.isLoggedIn.run = { false }
@@ -247,12 +266,14 @@ final class HomeFeatureTests: XCTestCase {
     store.send(.messenger(.start))
 
     store.receive(.authHandler(.start))
+    store.receive(.messageListener(.start))
     store.receive(.networkMonitor(.stop))
     store.receive(.messenger(.failure(error as NSError))) {
       $0.failure = error.localizedDescription
     }
 
     store.send(.authHandler(.stop))
+    store.send(.messageListener(.stop))
   }
 
   func testNetworkMonitorStart() {
@@ -565,4 +586,45 @@ final class HomeFeatureTests: XCTestCase {
 
     authHandlerOnError.first?(AuthHandlerError(id: 2))
   }
+
+  func testMessageListener() {
+    let store = TestStore(
+      initialState: HomeState(),
+      reducer: homeReducer,
+      environment: .unimplemented
+    )
+
+    var didRunMessageListener = 0
+    var didCancelMessageListener = 0
+    var messageListenerOnError: [MessageListenerHandler.OnError] = []
+
+    store.environment.mainQueue = .immediate
+    store.environment.bgQueue = .immediate
+    store.environment.messageListener.run = { onError in
+      didRunMessageListener += 1
+      messageListenerOnError.append(onError)
+      return Cancellable { didCancelMessageListener += 1 }
+    }
+
+    store.send(.messageListener(.start))
+
+    XCTAssertNoDifference(didRunMessageListener, 1)
+
+    struct MessageListenerError: Error { var id: Int }
+    messageListenerOnError.first?(MessageListenerError(id: 1))
+
+    store.receive(.messageListener(.failure(MessageListenerError(id: 1) as NSError))) {
+      $0.messageListenerFailure = MessageListenerError(id: 1).localizedDescription
+    }
+
+    store.send(.messageListener(.failureDismissed)) {
+      $0.messageListenerFailure = nil
+    }
+
+    store.send(.messageListener(.stop))
+
+    XCTAssertNoDifference(didCancelMessageListener, 1)
+
+    messageListenerOnError.first?(MessageListenerError(id: 2))
+  }
 }