RACCommand.m 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. //
  2. // RACCommand.m
  3. // ReactiveObjC
  4. //
  5. // Created by Josh Abernathy on 3/3/12.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACCommand.h"
  9. #import <ReactiveObjC/RACEXTScope.h>
  10. #import "NSArray+RACSequenceAdditions.h"
  11. #import "NSObject+RACDeallocating.h"
  12. #import "NSObject+RACDescription.h"
  13. #import "NSObject+RACPropertySubscribing.h"
  14. #import "RACMulticastConnection.h"
  15. #import "RACReplaySubject.h"
  16. #import "RACScheduler.h"
  17. #import "RACSequence.h"
  18. #import "RACSignal+Operations.h"
  19. #import <libkern/OSAtomic.h>
  20. NSErrorDomain const RACCommandErrorDomain = @"RACCommandErrorDomain";
  21. NSString * const RACUnderlyingCommandErrorKey = @"RACUnderlyingCommandErrorKey";
  22. @interface RACCommand () {
  23. // Atomic backing variable for `allowsConcurrentExecution`.
  24. volatile uint32_t _allowsConcurrentExecution;
  25. }
  26. /// A subject that sends added execution signals.
  27. @property (nonatomic, strong, readonly) RACSubject *addedExecutionSignalsSubject;
  28. /// A subject that sends the new value of `allowsConcurrentExecution` whenever it changes.
  29. @property (nonatomic, strong, readonly) RACSubject *allowsConcurrentExecutionSubject;
  30. // `enabled`, but without a hop to the main thread.
  31. //
  32. // Values from this signal may arrive on any thread.
  33. @property (nonatomic, strong, readonly) RACSignal *immediateEnabled;
  34. // The signal block that the receiver was initialized with.
  35. @property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);
  36. @end
  37. @implementation RACCommand
  38. #pragma mark Properties
  39. - (BOOL)allowsConcurrentExecution {
  40. return _allowsConcurrentExecution != 0;
  41. }
  42. - (void)setAllowsConcurrentExecution:(BOOL)allowed {
  43. if (allowed) {
  44. OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
  45. } else {
  46. OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
  47. }
  48. [self.allowsConcurrentExecutionSubject sendNext:@(_allowsConcurrentExecution)];
  49. }
  50. #pragma mark Lifecycle
  51. - (instancetype)init {
  52. NSCAssert(NO, @"Use -initWithSignalBlock: instead");
  53. return nil;
  54. }
  55. - (instancetype)initWithSignalBlock:(RACSignal<id> * (^)(id input))signalBlock {
  56. return [self initWithEnabled:nil signalBlock:signalBlock];
  57. }
  58. - (void)dealloc {
  59. [_addedExecutionSignalsSubject sendCompleted];
  60. [_allowsConcurrentExecutionSubject sendCompleted];
  61. }
  62. - (instancetype)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal<id> * (^)(id input))signalBlock {
  63. NSCParameterAssert(signalBlock != nil);
  64. self = [super init];
  65. _addedExecutionSignalsSubject = [RACSubject new];
  66. _allowsConcurrentExecutionSubject = [RACSubject new];
  67. _signalBlock = [signalBlock copy];
  68. _executionSignals = [[[self.addedExecutionSignalsSubject
  69. map:^(RACSignal *signal) {
  70. return [signal catchTo:[RACSignal empty]];
  71. }]
  72. deliverOn:RACScheduler.mainThreadScheduler]
  73. setNameWithFormat:@"%@ -executionSignals", self];
  74. // `errors` needs to be multicasted so that it picks up all
  75. // `activeExecutionSignals` that are added.
  76. //
  77. // In other words, if someone subscribes to `errors` _after_ an execution
  78. // has started, it should still receive any error from that execution.
  79. RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject
  80. flattenMap:^(RACSignal *signal) {
  81. return [[signal
  82. ignoreValues]
  83. catch:^(NSError *error) {
  84. return [RACSignal return:error];
  85. }];
  86. }]
  87. deliverOn:RACScheduler.mainThreadScheduler]
  88. publish];
  89. _errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
  90. [errorsConnection connect];
  91. RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject
  92. flattenMap:^(RACSignal *signal) {
  93. return [[[signal
  94. catchTo:[RACSignal empty]]
  95. then:^{
  96. return [RACSignal return:@-1];
  97. }]
  98. startWith:@1];
  99. }]
  100. scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) {
  101. return @(running.integerValue + next.integerValue);
  102. }]
  103. map:^(NSNumber *count) {
  104. return @(count.integerValue > 0);
  105. }]
  106. startWith:@NO];
  107. _executing = [[[[[immediateExecuting
  108. deliverOn:RACScheduler.mainThreadScheduler]
  109. // This is useful before the first value arrives on the main thread.
  110. startWith:@NO]
  111. distinctUntilChanged]
  112. replayLast]
  113. setNameWithFormat:@"%@ -executing", self];
  114. RACSignal *moreExecutionsAllowed = [RACSignal
  115. if:[self.allowsConcurrentExecutionSubject startWith:@NO]
  116. then:[RACSignal return:@YES]
  117. else:[immediateExecuting not]];
  118. if (enabledSignal == nil) {
  119. enabledSignal = [RACSignal return:@YES];
  120. } else {
  121. enabledSignal = [enabledSignal startWith:@YES];
  122. }
  123. _immediateEnabled = [[[[RACSignal
  124. combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
  125. and]
  126. takeUntil:self.rac_willDeallocSignal]
  127. replayLast];
  128. _enabled = [[[[[self.immediateEnabled
  129. take:1]
  130. concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
  131. distinctUntilChanged]
  132. replayLast]
  133. setNameWithFormat:@"%@ -enabled", self];
  134. return self;
  135. }
  136. #pragma mark Execution
  137. - (RACSignal *)execute:(id)input {
  138. // `immediateEnabled` is guaranteed to send a value upon subscription, so
  139. // -first is acceptable here.
  140. BOOL enabled = [[self.immediateEnabled first] boolValue];
  141. if (!enabled) {
  142. NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
  143. NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
  144. RACUnderlyingCommandErrorKey: self
  145. }];
  146. return [RACSignal error:error];
  147. }
  148. RACSignal *signal = self.signalBlock(input);
  149. NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
  150. // We subscribe to the signal on the main thread so that it occurs _after_
  151. // -addActiveExecutionSignal: completes below.
  152. //
  153. // This means that `executing` and `enabled` will send updated values before
  154. // the signal actually starts performing work.
  155. RACMulticastConnection *connection = [[signal
  156. subscribeOn:RACScheduler.mainThreadScheduler]
  157. multicast:[RACReplaySubject subject]];
  158. [self.addedExecutionSignalsSubject sendNext:connection.signal];
  159. [connection connect];
  160. return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
  161. }
  162. @end