AsyncSubject.swift 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. //
  2. // AsyncSubject.swift
  3. // RxSwift
  4. //
  5. // Created by Victor Galán on 07/01/2017.
  6. // Copyright © 2017 Krunoslav Zaher. All rights reserved.
  7. //
  8. /// An AsyncSubject emits the last value (and only the last value) emitted by the source Observable,
  9. /// and only after that source Observable completes.
  10. ///
  11. /// (If the source Observable does not emit any values, the AsyncSubject also completes without emitting any values.)
  12. public final class AsyncSubject<Element>
  13. : Observable<Element>
  14. , SubjectType
  15. , ObserverType
  16. , SynchronizedUnsubscribeType {
  17. public typealias SubjectObserverType = AsyncSubject<Element>
  18. typealias Observers = AnyObserver<Element>.s
  19. typealias DisposeKey = Observers.KeyType
  20. /// Indicates whether the subject has any observers
  21. public var hasObservers: Bool {
  22. self._lock.lock(); defer { self._lock.unlock() }
  23. return self._observers.count > 0
  24. }
  25. let _lock = RecursiveLock()
  26. // state
  27. private var _observers = Observers()
  28. private var _isStopped = false
  29. private var _stoppedEvent = nil as Event<Element>? {
  30. didSet {
  31. self._isStopped = self._stoppedEvent != nil
  32. }
  33. }
  34. private var _lastElement: Element?
  35. #if DEBUG
  36. private let _synchronizationTracker = SynchronizationTracker()
  37. #endif
  38. /// Creates a subject.
  39. public override init() {
  40. #if TRACE_RESOURCES
  41. _ = Resources.incrementTotal()
  42. #endif
  43. super.init()
  44. }
  45. /// Notifies all subscribed observers about next event.
  46. ///
  47. /// - parameter event: Event to send to the observers.
  48. public func on(_ event: Event<Element>) {
  49. #if DEBUG
  50. self._synchronizationTracker.register(synchronizationErrorMessage: .default)
  51. defer { self._synchronizationTracker.unregister() }
  52. #endif
  53. let (observers, event) = self._synchronized_on(event)
  54. switch event {
  55. case .next:
  56. dispatch(observers, event)
  57. dispatch(observers, .completed)
  58. case .completed:
  59. dispatch(observers, event)
  60. case .error:
  61. dispatch(observers, event)
  62. }
  63. }
  64. func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
  65. self._lock.lock(); defer { self._lock.unlock() }
  66. if self._isStopped {
  67. return (Observers(), .completed)
  68. }
  69. switch event {
  70. case .next(let element):
  71. self._lastElement = element
  72. return (Observers(), .completed)
  73. case .error:
  74. self._stoppedEvent = event
  75. let observers = self._observers
  76. self._observers.removeAll()
  77. return (observers, event)
  78. case .completed:
  79. let observers = self._observers
  80. self._observers.removeAll()
  81. if let lastElement = self._lastElement {
  82. self._stoppedEvent = .next(lastElement)
  83. return (observers, .next(lastElement))
  84. }
  85. else {
  86. self._stoppedEvent = event
  87. return (observers, .completed)
  88. }
  89. }
  90. }
  91. /// Subscribes an observer to the subject.
  92. ///
  93. /// - parameter observer: Observer to subscribe to the subject.
  94. /// - returns: Disposable object that can be used to unsubscribe the observer from the subject.
  95. public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
  96. self._lock.lock(); defer { self._lock.unlock() }
  97. return self._synchronized_subscribe(observer)
  98. }
  99. func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
  100. if let stoppedEvent = self._stoppedEvent {
  101. switch stoppedEvent {
  102. case .next:
  103. observer.on(stoppedEvent)
  104. observer.on(.completed)
  105. case .completed:
  106. observer.on(stoppedEvent)
  107. case .error:
  108. observer.on(stoppedEvent)
  109. }
  110. return Disposables.create()
  111. }
  112. let key = self._observers.insert(observer.on)
  113. return SubscriptionDisposable(owner: self, key: key)
  114. }
  115. func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
  116. self._lock.lock(); defer { self._lock.unlock() }
  117. self._synchronized_unsubscribe(disposeKey)
  118. }
  119. func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
  120. _ = self._observers.removeKey(disposeKey)
  121. }
  122. /// Returns observer interface for subject.
  123. public func asObserver() -> AsyncSubject<Element> {
  124. return self
  125. }
  126. #if TRACE_RESOURCES
  127. deinit {
  128. _ = Resources.decrementTotal()
  129. }
  130. #endif
  131. }