PublishSubject.swift 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. //
  2. // PublishSubject.swift
  3. // RxSwift
  4. //
  5. // Created by Krunoslav Zaher on 2/11/15.
  6. // Copyright © 2015 Krunoslav Zaher. All rights reserved.
  7. //
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. ///
  10. /// Each notification is broadcasted to all subscribed observers.
  11. public final class PublishSubject<Element>
  12. : Observable<Element>
  13. , SubjectType
  14. , Cancelable
  15. , ObserverType
  16. , SynchronizedUnsubscribeType {
  17. public typealias SubjectObserverType = PublishSubject<Element>
  18. typealias Observers = AnyObserver<Element>.s
  19. typealias DisposeKey = Observers.KeyType
  20. /// Indicates whether the subject has any observers
  21. public var hasObservers: Bool {
  22. self._lock.lock()
  23. let count = self._observers.count > 0
  24. self._lock.unlock()
  25. return count
  26. }
  27. private let _lock = RecursiveLock()
  28. // state
  29. private var _isDisposed = false
  30. private var _observers = Observers()
  31. private var _stopped = false
  32. private var _stoppedEvent = nil as Event<Element>?
  33. #if DEBUG
  34. private let _synchronizationTracker = SynchronizationTracker()
  35. #endif
  36. /// Indicates whether the subject has been isDisposed.
  37. public var isDisposed: Bool {
  38. return self._isDisposed
  39. }
  40. /// Creates a subject.
  41. public override init() {
  42. super.init()
  43. #if TRACE_RESOURCES
  44. _ = Resources.incrementTotal()
  45. #endif
  46. }
  47. /// Notifies all subscribed observers about next event.
  48. ///
  49. /// - parameter event: Event to send to the observers.
  50. public func on(_ event: Event<Element>) {
  51. #if DEBUG
  52. self._synchronizationTracker.register(synchronizationErrorMessage: .default)
  53. defer { self._synchronizationTracker.unregister() }
  54. #endif
  55. dispatch(self._synchronized_on(event), event)
  56. }
  57. func _synchronized_on(_ event: Event<Element>) -> Observers {
  58. self._lock.lock(); defer { self._lock.unlock() }
  59. switch event {
  60. case .next:
  61. if self._isDisposed || self._stopped {
  62. return Observers()
  63. }
  64. return self._observers
  65. case .completed, .error:
  66. if self._stoppedEvent == nil {
  67. self._stoppedEvent = event
  68. self._stopped = true
  69. let observers = self._observers
  70. self._observers.removeAll()
  71. return observers
  72. }
  73. return Observers()
  74. }
  75. }
  76. /**
  77. Subscribes an observer to the subject.
  78. - parameter observer: Observer to subscribe to the subject.
  79. - returns: Disposable object that can be used to unsubscribe the observer from the subject.
  80. */
  81. public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
  82. self._lock.lock()
  83. let subscription = self._synchronized_subscribe(observer)
  84. self._lock.unlock()
  85. return subscription
  86. }
  87. func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
  88. if let stoppedEvent = self._stoppedEvent {
  89. observer.on(stoppedEvent)
  90. return Disposables.create()
  91. }
  92. if self._isDisposed {
  93. observer.on(.error(RxError.disposed(object: self)))
  94. return Disposables.create()
  95. }
  96. let key = self._observers.insert(observer.on)
  97. return SubscriptionDisposable(owner: self, key: key)
  98. }
  99. func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
  100. self._lock.lock()
  101. self._synchronized_unsubscribe(disposeKey)
  102. self._lock.unlock()
  103. }
  104. func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
  105. _ = self._observers.removeKey(disposeKey)
  106. }
  107. /// Returns observer interface for subject.
  108. public func asObserver() -> PublishSubject<Element> {
  109. return self
  110. }
  111. /// Unsubscribe all observers and release resources.
  112. public func dispose() {
  113. self._lock.lock()
  114. self._synchronized_dispose()
  115. self._lock.unlock()
  116. }
  117. final func _synchronized_dispose() {
  118. self._isDisposed = true
  119. self._observers.removeAll()
  120. self._stoppedEvent = nil
  121. }
  122. #if TRACE_RESOURCES
  123. deinit {
  124. _ = Resources.decrementTotal()
  125. }
  126. #endif
  127. }