RACSubject.m 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. //
  2. // RACSubject.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 3/9/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACSubject.h"
  9. #import <ReactiveObjC/RACEXTScope.h>
  10. #import "RACCompoundDisposable.h"
  11. #import "RACPassthroughSubscriber.h"
  12. @interface RACSubject ()
  13. // Contains all current subscribers to the receiver.
  14. //
  15. // This should only be used while synchronized on `self`.
  16. @property (nonatomic, strong, readonly) NSMutableArray *subscribers;
  17. // Contains all of the receiver's subscriptions to other signals.
  18. @property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
  19. // Enumerates over each of the receiver's `subscribers` and invokes `block` for
  20. // each.
  21. - (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block;
  22. @end
  23. @implementation RACSubject
  24. #pragma mark Lifecycle
  25. + (instancetype)subject {
  26. return [[self alloc] init];
  27. }
  28. - (instancetype)init {
  29. self = [super init];
  30. if (self == nil) return nil;
  31. _disposable = [RACCompoundDisposable compoundDisposable];
  32. _subscribers = [[NSMutableArray alloc] initWithCapacity:1];
  33. return self;
  34. }
  35. - (void)dealloc {
  36. [self.disposable dispose];
  37. }
  38. #pragma mark Subscription
  39. - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
  40. NSCParameterAssert(subscriber != nil);
  41. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  42. subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
  43. NSMutableArray *subscribers = self.subscribers;
  44. @synchronized (subscribers) {
  45. [subscribers addObject:subscriber];
  46. }
  47. [disposable addDisposable:[RACDisposable disposableWithBlock:^{
  48. @synchronized (subscribers) {
  49. // Since newer subscribers are generally shorter-lived, search
  50. // starting from the end of the list.
  51. NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
  52. return obj == subscriber;
  53. }];
  54. if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
  55. }
  56. }]];
  57. return disposable;
  58. }
  59. - (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
  60. NSArray *subscribers;
  61. @synchronized (self.subscribers) {
  62. subscribers = [self.subscribers copy];
  63. }
  64. for (id<RACSubscriber> subscriber in subscribers) {
  65. block(subscriber);
  66. }
  67. }
  68. #pragma mark RACSubscriber
  69. - (void)sendNext:(id)value {
  70. [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
  71. [subscriber sendNext:value];
  72. }];
  73. }
  74. - (void)sendError:(NSError *)error {
  75. [self.disposable dispose];
  76. [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
  77. [subscriber sendError:error];
  78. }];
  79. }
  80. - (void)sendCompleted {
  81. [self.disposable dispose];
  82. [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
  83. [subscriber sendCompleted];
  84. }];
  85. }
  86. - (void)didSubscribeWithDisposable:(RACCompoundDisposable *)d {
  87. if (d.disposed) return;
  88. [self.disposable addDisposable:d];
  89. @weakify(self, d);
  90. [d addDisposable:[RACDisposable disposableWithBlock:^{
  91. @strongify(self, d);
  92. [self.disposable removeDisposable:d];
  93. }]];
  94. }
  95. @end