Completable+AndThen.swift 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. //
  2. // Completable+AndThen.swift
  3. // RxSwift
  4. //
  5. // Created by Krunoslav Zaher on 7/2/17.
  6. // Copyright © 2017 Krunoslav Zaher. All rights reserved.
  7. //
  8. extension PrimitiveSequenceType where Trait == CompletableTrait, Element == Never {
  9. /**
  10. Concatenates the second observable sequence to `self` upon successful termination of `self`.
  11. - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
  12. - parameter second: Second observable sequence.
  13. - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
  14. */
  15. public func andThen<Element>(_ second: Single<Element>) -> Single<Element> {
  16. let completable = self.primitiveSequence.asObservable()
  17. return Single(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
  18. }
  19. /**
  20. Concatenates the second observable sequence to `self` upon successful termination of `self`.
  21. - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
  22. - parameter second: Second observable sequence.
  23. - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
  24. */
  25. public func andThen<Element>(_ second: Maybe<Element>) -> Maybe<Element> {
  26. let completable = self.primitiveSequence.asObservable()
  27. return Maybe(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
  28. }
  29. /**
  30. Concatenates the second observable sequence to `self` upon successful termination of `self`.
  31. - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
  32. - parameter second: Second observable sequence.
  33. - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
  34. */
  35. public func andThen(_ second: Completable) -> Completable {
  36. let completable = self.primitiveSequence.asObservable()
  37. return Completable(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
  38. }
  39. /**
  40. Concatenates the second observable sequence to `self` upon successful termination of `self`.
  41. - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
  42. - parameter second: Second observable sequence.
  43. - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
  44. */
  45. public func andThen<Element>(_ second: Observable<Element>) -> Observable<Element> {
  46. let completable = self.primitiveSequence.asObservable()
  47. return ConcatCompletable(completable: completable, second: second.asObservable())
  48. }
  49. }
  50. final private class ConcatCompletable<Element>: Producer<Element> {
  51. fileprivate let _completable: Observable<Never>
  52. fileprivate let _second: Observable<Element>
  53. init(completable: Observable<Never>, second: Observable<Element>) {
  54. self._completable = completable
  55. self._second = second
  56. }
  57. override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
  58. let sink = ConcatCompletableSink(parent: self, observer: observer, cancel: cancel)
  59. let subscription = sink.run()
  60. return (sink: sink, subscription: subscription)
  61. }
  62. }
  63. final private class ConcatCompletableSink<Observer: ObserverType>
  64. : Sink<Observer>
  65. , ObserverType {
  66. typealias Element = Never
  67. typealias Parent = ConcatCompletable<Observer.Element>
  68. private let _parent: Parent
  69. private let _subscription = SerialDisposable()
  70. init(parent: Parent, observer: Observer, cancel: Cancelable) {
  71. self._parent = parent
  72. super.init(observer: observer, cancel: cancel)
  73. }
  74. func on(_ event: Event<Element>) {
  75. switch event {
  76. case .error(let error):
  77. self.forwardOn(.error(error))
  78. self.dispose()
  79. case .next:
  80. break
  81. case .completed:
  82. let otherSink = ConcatCompletableSinkOther(parent: self)
  83. self._subscription.disposable = self._parent._second.subscribe(otherSink)
  84. }
  85. }
  86. func run() -> Disposable {
  87. let subscription = SingleAssignmentDisposable()
  88. self._subscription.disposable = subscription
  89. subscription.setDisposable(self._parent._completable.subscribe(self))
  90. return self._subscription
  91. }
  92. }
  93. final private class ConcatCompletableSinkOther<Observer: ObserverType>
  94. : ObserverType {
  95. typealias Element = Observer.Element
  96. typealias Parent = ConcatCompletableSink<Observer>
  97. private let _parent: Parent
  98. init(parent: Parent) {
  99. self._parent = parent
  100. }
  101. func on(_ event: Event<Observer.Element>) {
  102. self._parent.forwardOn(event)
  103. if event.isStopEvent {
  104. self._parent.dispose()
  105. }
  106. }
  107. }