RACSignal+Operations.m 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330
  1. //
  2. // RACSignal+Operations.m
  3. // ReactiveObjC
  4. //
  5. // Created by Justin Spahr-Summers on 2012-09-06.
  6. // Copyright (c) 2012 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACSignal+Operations.h"
  9. #import "NSObject+RACDeallocating.h"
  10. #import "NSObject+RACDescription.h"
  11. #import "RACBlockTrampoline.h"
  12. #import "RACCommand.h"
  13. #import "RACCompoundDisposable.h"
  14. #import "RACDisposable.h"
  15. #import "RACEvent.h"
  16. #import "RACGroupedSignal.h"
  17. #import "RACMulticastConnection+Private.h"
  18. #import "RACReplaySubject.h"
  19. #import "RACScheduler.h"
  20. #import "RACSerialDisposable.h"
  21. #import "RACSignalSequence.h"
  22. #import "RACStream+Private.h"
  23. #import "RACSubject.h"
  24. #import "RACSubscriber+Private.h"
  25. #import "RACSubscriber.h"
  26. #import "RACTuple.h"
  27. #import "RACUnit.h"
  28. #import <libkern/OSAtomic.h>
  29. #import <objc/runtime.h>
  30. NSErrorDomain const RACSignalErrorDomain = @"RACSignalErrorDomain";
  31. // Subscribes to the given signal with the given blocks.
  32. //
  33. // If the signal errors or completes, the corresponding block is invoked. If the
  34. // disposable passed to the block is _not_ disposed, then the signal is
  35. // subscribed to again.
  36. static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
  37. next = [next copy];
  38. error = [error copy];
  39. completed = [completed copy];
  40. RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
  41. RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
  42. RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
  43. [compoundDisposable addDisposable:selfDisposable];
  44. __weak RACDisposable *weakSelfDisposable = selfDisposable;
  45. RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
  46. @autoreleasepool {
  47. error(e, compoundDisposable);
  48. [compoundDisposable removeDisposable:weakSelfDisposable];
  49. }
  50. recurse();
  51. } completed:^{
  52. @autoreleasepool {
  53. completed(compoundDisposable);
  54. [compoundDisposable removeDisposable:weakSelfDisposable];
  55. }
  56. recurse();
  57. }];
  58. [selfDisposable addDisposable:subscriptionDisposable];
  59. };
  60. // Subscribe once immediately, and then use recursive scheduling for any
  61. // further resubscriptions.
  62. recursiveBlock(^{
  63. RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
  64. RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
  65. [compoundDisposable addDisposable:schedulingDisposable];
  66. });
  67. return compoundDisposable;
  68. }
  69. @implementation RACSignal (Operations)
  70. - (RACSignal *)doNext:(void (^)(id x))block {
  71. NSCParameterAssert(block != NULL);
  72. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  73. return [self subscribeNext:^(id x) {
  74. block(x);
  75. [subscriber sendNext:x];
  76. } error:^(NSError *error) {
  77. [subscriber sendError:error];
  78. } completed:^{
  79. [subscriber sendCompleted];
  80. }];
  81. }] setNameWithFormat:@"[%@] -doNext:", self.name];
  82. }
  83. - (RACSignal *)doError:(void (^)(NSError *error))block {
  84. NSCParameterAssert(block != NULL);
  85. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  86. return [self subscribeNext:^(id x) {
  87. [subscriber sendNext:x];
  88. } error:^(NSError *error) {
  89. block(error);
  90. [subscriber sendError:error];
  91. } completed:^{
  92. [subscriber sendCompleted];
  93. }];
  94. }] setNameWithFormat:@"[%@] -doError:", self.name];
  95. }
  96. - (RACSignal *)doCompleted:(void (^)(void))block {
  97. NSCParameterAssert(block != NULL);
  98. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  99. return [self subscribeNext:^(id x) {
  100. [subscriber sendNext:x];
  101. } error:^(NSError *error) {
  102. [subscriber sendError:error];
  103. } completed:^{
  104. block();
  105. [subscriber sendCompleted];
  106. }];
  107. }] setNameWithFormat:@"[%@] -doCompleted:", self.name];
  108. }
  109. - (RACSignal *)throttle:(NSTimeInterval)interval {
  110. return [[self throttle:interval valuesPassingTest:^(id _) {
  111. return YES;
  112. }] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
  113. }
  114. - (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
  115. NSCParameterAssert(interval >= 0);
  116. NSCParameterAssert(predicate != nil);
  117. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  118. RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
  119. // We may never use this scheduler, but we need to set it up ahead of
  120. // time so that our scheduled blocks are run serially if we do.
  121. RACScheduler *scheduler = [RACScheduler scheduler];
  122. // Information about any currently-buffered `next` event.
  123. __block id nextValue = nil;
  124. __block BOOL hasNextValue = NO;
  125. RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
  126. void (^flushNext)(BOOL send) = ^(BOOL send) {
  127. @synchronized (compoundDisposable) {
  128. [nextDisposable.disposable dispose];
  129. if (!hasNextValue) return;
  130. if (send) [subscriber sendNext:nextValue];
  131. nextValue = nil;
  132. hasNextValue = NO;
  133. }
  134. };
  135. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  136. RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
  137. BOOL shouldThrottle = predicate(x);
  138. @synchronized (compoundDisposable) {
  139. flushNext(NO);
  140. if (!shouldThrottle) {
  141. [subscriber sendNext:x];
  142. return;
  143. }
  144. nextValue = x;
  145. hasNextValue = YES;
  146. nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
  147. flushNext(YES);
  148. }];
  149. }
  150. } error:^(NSError *error) {
  151. [compoundDisposable dispose];
  152. [subscriber sendError:error];
  153. } completed:^{
  154. flushNext(YES);
  155. [subscriber sendCompleted];
  156. }];
  157. [compoundDisposable addDisposable:subscriptionDisposable];
  158. return compoundDisposable;
  159. }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
  160. }
  161. - (RACSignal *)delay:(NSTimeInterval)interval {
  162. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  163. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  164. // We may never use this scheduler, but we need to set it up ahead of
  165. // time so that our scheduled blocks are run serially if we do.
  166. RACScheduler *scheduler = [RACScheduler scheduler];
  167. void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
  168. RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
  169. RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
  170. [disposable addDisposable:schedulerDisposable];
  171. };
  172. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  173. schedule(^{
  174. [subscriber sendNext:x];
  175. });
  176. } error:^(NSError *error) {
  177. [subscriber sendError:error];
  178. } completed:^{
  179. schedule(^{
  180. [subscriber sendCompleted];
  181. });
  182. }];
  183. [disposable addDisposable:subscriptionDisposable];
  184. return disposable;
  185. }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
  186. }
  187. - (RACSignal *)repeat {
  188. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  189. return subscribeForever(self,
  190. ^(id x) {
  191. [subscriber sendNext:x];
  192. },
  193. ^(NSError *error, RACDisposable *disposable) {
  194. [disposable dispose];
  195. [subscriber sendError:error];
  196. },
  197. ^(RACDisposable *disposable) {
  198. // Resubscribe.
  199. });
  200. }] setNameWithFormat:@"[%@] -repeat", self.name];
  201. }
  202. - (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
  203. NSCParameterAssert(catchBlock != NULL);
  204. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  205. RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
  206. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  207. [subscriber sendNext:x];
  208. } error:^(NSError *error) {
  209. RACSignal *signal = catchBlock(error);
  210. NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
  211. catchDisposable.disposable = [signal subscribe:subscriber];
  212. } completed:^{
  213. [subscriber sendCompleted];
  214. }];
  215. return [RACDisposable disposableWithBlock:^{
  216. [catchDisposable dispose];
  217. [subscriptionDisposable dispose];
  218. }];
  219. }] setNameWithFormat:@"[%@] -catch:", self.name];
  220. }
  221. - (RACSignal *)catchTo:(RACSignal *)signal {
  222. return [[self catch:^(NSError *error) {
  223. return signal;
  224. }] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
  225. }
  226. + (RACSignal *)try:(id (^)(NSError **errorPtr))tryBlock {
  227. NSCParameterAssert(tryBlock != NULL);
  228. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  229. NSError *error;
  230. id value = tryBlock(&error);
  231. RACSignal *signal = (value == nil ? [RACSignal error:error] : [RACSignal return:value]);
  232. return [signal subscribe:subscriber];
  233. }] setNameWithFormat:@"+try:"];
  234. }
  235. - (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
  236. NSCParameterAssert(tryBlock != NULL);
  237. return [[self flattenMap:^(id value) {
  238. NSError *error = nil;
  239. BOOL passed = tryBlock(value, &error);
  240. return (passed ? [RACSignal return:value] : [RACSignal error:error]);
  241. }] setNameWithFormat:@"[%@] -try:", self.name];
  242. }
  243. - (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
  244. NSCParameterAssert(mapBlock != NULL);
  245. return [[self flattenMap:^(id value) {
  246. NSError *error = nil;
  247. id mappedValue = mapBlock(value, &error);
  248. return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
  249. }] setNameWithFormat:@"[%@] -tryMap:", self.name];
  250. }
  251. - (RACSignal *)initially:(void (^)(void))block {
  252. NSCParameterAssert(block != NULL);
  253. return [[RACSignal defer:^{
  254. block();
  255. return self;
  256. }] setNameWithFormat:@"[%@] -initially:", self.name];
  257. }
  258. - (RACSignal *)finally:(void (^)(void))block {
  259. NSCParameterAssert(block != NULL);
  260. return [[[self
  261. doError:^(NSError *error) {
  262. block();
  263. }]
  264. doCompleted:^{
  265. block();
  266. }]
  267. setNameWithFormat:@"[%@] -finally:", self.name];
  268. }
  269. - (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  270. NSCParameterAssert(scheduler != nil);
  271. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  272. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  273. RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
  274. NSMutableArray *values = [NSMutableArray array];
  275. void (^flushValues)(void) = ^{
  276. @synchronized (values) {
  277. [timerDisposable.disposable dispose];
  278. if (values.count == 0) return;
  279. RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
  280. [values removeAllObjects];
  281. [subscriber sendNext:tuple];
  282. }
  283. };
  284. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  285. @synchronized (values) {
  286. if (values.count == 0) {
  287. timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
  288. }
  289. [values addObject:x ?: RACTupleNil.tupleNil];
  290. }
  291. } error:^(NSError *error) {
  292. [subscriber sendError:error];
  293. } completed:^{
  294. flushValues();
  295. [subscriber sendCompleted];
  296. }];
  297. return [RACDisposable disposableWithBlock:^{
  298. [selfDisposable dispose];
  299. [timerDisposable dispose];
  300. }];
  301. }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
  302. }
  303. - (RACSignal *)collect {
  304. return [[self aggregateWithStartFactory:^{
  305. return [[NSMutableArray alloc] init];
  306. } reduce:^(NSMutableArray *collectedValues, id x) {
  307. [collectedValues addObject:(x ?: NSNull.null)];
  308. return collectedValues;
  309. }] setNameWithFormat:@"[%@] -collect", self.name];
  310. }
  311. - (RACSignal *)takeLast:(NSUInteger)count {
  312. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  313. NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
  314. return [self subscribeNext:^(id x) {
  315. [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
  316. while (valuesTaken.count > count) {
  317. [valuesTaken removeObjectAtIndex:0];
  318. }
  319. } error:^(NSError *error) {
  320. [subscriber sendError:error];
  321. } completed:^{
  322. for (id value in valuesTaken) {
  323. [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
  324. }
  325. [subscriber sendCompleted];
  326. }];
  327. }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
  328. }
  329. - (RACSignal *)combineLatestWith:(RACSignal *)signal {
  330. NSCParameterAssert(signal != nil);
  331. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  332. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  333. __block id lastSelfValue = nil;
  334. __block BOOL selfCompleted = NO;
  335. __block id lastOtherValue = nil;
  336. __block BOOL otherCompleted = NO;
  337. void (^sendNext)(void) = ^{
  338. @synchronized (disposable) {
  339. if (lastSelfValue == nil || lastOtherValue == nil) return;
  340. [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
  341. }
  342. };
  343. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  344. @synchronized (disposable) {
  345. lastSelfValue = x ?: RACTupleNil.tupleNil;
  346. sendNext();
  347. }
  348. } error:^(NSError *error) {
  349. [subscriber sendError:error];
  350. } completed:^{
  351. @synchronized (disposable) {
  352. selfCompleted = YES;
  353. if (otherCompleted) [subscriber sendCompleted];
  354. }
  355. }];
  356. [disposable addDisposable:selfDisposable];
  357. RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
  358. @synchronized (disposable) {
  359. lastOtherValue = x ?: RACTupleNil.tupleNil;
  360. sendNext();
  361. }
  362. } error:^(NSError *error) {
  363. [subscriber sendError:error];
  364. } completed:^{
  365. @synchronized (disposable) {
  366. otherCompleted = YES;
  367. if (selfCompleted) [subscriber sendCompleted];
  368. }
  369. }];
  370. [disposable addDisposable:otherDisposable];
  371. return disposable;
  372. }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
  373. }
  374. + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
  375. return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
  376. return [left combineLatestWith:right];
  377. }] setNameWithFormat:@"+combineLatest: %@", signals];
  378. }
  379. + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(RACGenericReduceBlock)reduceBlock {
  380. NSCParameterAssert(reduceBlock != nil);
  381. RACSignal *result = [self combineLatest:signals];
  382. // Although we assert this condition above, older versions of this method
  383. // supported this argument being nil. Avoid crashing Release builds of
  384. // apps that depended on that.
  385. if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
  386. return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
  387. }
  388. - (RACSignal *)merge:(RACSignal *)signal {
  389. return [[RACSignal
  390. merge:@[ self, signal ]]
  391. setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
  392. }
  393. + (RACSignal *)merge:(id<NSFastEnumeration>)signals {
  394. NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
  395. for (RACSignal *signal in signals) {
  396. [copiedSignals addObject:signal];
  397. }
  398. return [[[RACSignal
  399. createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
  400. for (RACSignal *signal in copiedSignals) {
  401. [subscriber sendNext:signal];
  402. }
  403. [subscriber sendCompleted];
  404. return nil;
  405. }]
  406. flatten]
  407. setNameWithFormat:@"+merge: %@", copiedSignals];
  408. }
  409. - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
  410. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  411. RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
  412. // Contains disposables for the currently active subscriptions.
  413. //
  414. // This should only be used while synchronized on `subscriber`.
  415. NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
  416. // Whether the signal-of-signals has completed yet.
  417. //
  418. // This should only be used while synchronized on `subscriber`.
  419. __block BOOL selfCompleted = NO;
  420. // Subscribes to the given signal.
  421. __block void (^subscribeToSignal)(RACSignal *);
  422. // Weak reference to the above, to avoid a leak.
  423. __weak __block void (^recur)(RACSignal *);
  424. // Sends completed to the subscriber if all signals are finished.
  425. //
  426. // This should only be used while synchronized on `subscriber`.
  427. void (^completeIfAllowed)(void) = ^{
  428. if (selfCompleted && activeDisposables.count == 0) {
  429. [subscriber sendCompleted];
  430. }
  431. };
  432. // The signals waiting to be started.
  433. //
  434. // This array should only be used while synchronized on `subscriber`.
  435. NSMutableArray *queuedSignals = [NSMutableArray array];
  436. recur = subscribeToSignal = ^(RACSignal *signal) {
  437. RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
  438. @synchronized (subscriber) {
  439. [compoundDisposable addDisposable:serialDisposable];
  440. [activeDisposables addObject:serialDisposable];
  441. }
  442. serialDisposable.disposable = [signal subscribeNext:^(id x) {
  443. [subscriber sendNext:x];
  444. } error:^(NSError *error) {
  445. [subscriber sendError:error];
  446. } completed:^{
  447. __strong void (^subscribeToSignal)(RACSignal *) = recur;
  448. RACSignal *nextSignal;
  449. @synchronized (subscriber) {
  450. [compoundDisposable removeDisposable:serialDisposable];
  451. [activeDisposables removeObjectIdenticalTo:serialDisposable];
  452. if (queuedSignals.count == 0) {
  453. completeIfAllowed();
  454. return;
  455. }
  456. nextSignal = queuedSignals[0];
  457. [queuedSignals removeObjectAtIndex:0];
  458. }
  459. subscribeToSignal(nextSignal);
  460. }];
  461. };
  462. [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
  463. if (signal == nil) return;
  464. NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
  465. @synchronized (subscriber) {
  466. if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
  467. [queuedSignals addObject:signal];
  468. // If we need to wait, skip subscribing to this
  469. // signal.
  470. return;
  471. }
  472. }
  473. subscribeToSignal(signal);
  474. } error:^(NSError *error) {
  475. [subscriber sendError:error];
  476. } completed:^{
  477. @synchronized (subscriber) {
  478. selfCompleted = YES;
  479. completeIfAllowed();
  480. }
  481. }]];
  482. [compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
  483. // A strong reference is held to `subscribeToSignal` until we're
  484. // done, preventing it from deallocating early.
  485. subscribeToSignal = nil;
  486. }]];
  487. return compoundDisposable;
  488. }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
  489. }
  490. - (RACSignal *)then:(RACSignal * (^)(void))block {
  491. NSCParameterAssert(block != nil);
  492. return [[[self
  493. ignoreValues]
  494. concat:[RACSignal defer:block]]
  495. setNameWithFormat:@"[%@] -then:", self.name];
  496. }
  497. - (RACSignal *)concat {
  498. return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
  499. }
  500. - (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
  501. NSCParameterAssert(startFactory != NULL);
  502. NSCParameterAssert(reduceBlock != NULL);
  503. return [[RACSignal defer:^{
  504. return [self aggregateWithStart:startFactory() reduce:reduceBlock];
  505. }] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
  506. }
  507. - (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
  508. return [[self
  509. aggregateWithStart:start
  510. reduceWithIndex:^(id running, id next, NSUInteger index) {
  511. return reduceBlock(running, next);
  512. }]
  513. setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, RACDescription(start)];
  514. }
  515. - (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
  516. return [[[[self
  517. scanWithStart:start reduceWithIndex:reduceBlock]
  518. startWith:start]
  519. takeLast:1]
  520. setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, RACDescription(start)];
  521. }
  522. - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
  523. return [self setKeyPath:keyPath onObject:object nilValue:nil];
  524. }
  525. - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
  526. NSCParameterAssert(keyPath != nil);
  527. NSCParameterAssert(object != nil);
  528. keyPath = [keyPath copy];
  529. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  530. // Purposely not retaining 'object', since we want to tear down the binding
  531. // when it deallocates normally.
  532. __block void * volatile objectPtr = (__bridge void *)object;
  533. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  534. // Possibly spec, possibly compiler bug, but this __bridge cast does not
  535. // result in a retain here, effectively an invisible __unsafe_unretained
  536. // qualifier. Using objc_precise_lifetime gives the __strong reference
  537. // desired. The explicit use of __strong is strictly defensive.
  538. __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
  539. [object setValue:x ?: nilValue forKeyPath:keyPath];
  540. } error:^(NSError *error) {
  541. __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
  542. NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
  543. // Log the error if we're running with assertions disabled.
  544. NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
  545. [disposable dispose];
  546. } completed:^{
  547. [disposable dispose];
  548. }];
  549. [disposable addDisposable:subscriptionDisposable];
  550. #if DEBUG
  551. static void *bindingsKey = &bindingsKey;
  552. NSMutableDictionary *bindings;
  553. @synchronized (object) {
  554. bindings = objc_getAssociatedObject(object, bindingsKey);
  555. if (bindings == nil) {
  556. bindings = [NSMutableDictionary dictionary];
  557. objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
  558. }
  559. }
  560. @synchronized (bindings) {
  561. NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
  562. bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
  563. }
  564. #endif
  565. RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
  566. #if DEBUG
  567. @synchronized (bindings) {
  568. [bindings removeObjectForKey:keyPath];
  569. }
  570. #endif
  571. while (YES) {
  572. void *ptr = objectPtr;
  573. if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
  574. break;
  575. }
  576. }
  577. }];
  578. [disposable addDisposable:clearPointerDisposable];
  579. [object.rac_deallocDisposable addDisposable:disposable];
  580. RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
  581. return [RACDisposable disposableWithBlock:^{
  582. [objectDisposable removeDisposable:disposable];
  583. [disposable dispose];
  584. }];
  585. }
  586. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  587. return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
  588. }
  589. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
  590. NSCParameterAssert(scheduler != nil);
  591. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  592. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  593. return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
  594. [subscriber sendNext:[NSDate date]];
  595. }];
  596. }] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
  597. }
  598. - (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
  599. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  600. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  601. void (^triggerCompletion)(void) = ^{
  602. [disposable dispose];
  603. [subscriber sendCompleted];
  604. };
  605. RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
  606. triggerCompletion();
  607. } completed:^{
  608. triggerCompletion();
  609. }];
  610. [disposable addDisposable:triggerDisposable];
  611. if (!disposable.disposed) {
  612. RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
  613. [subscriber sendNext:x];
  614. } error:^(NSError *error) {
  615. [subscriber sendError:error];
  616. } completed:^{
  617. [disposable dispose];
  618. [subscriber sendCompleted];
  619. }];
  620. [disposable addDisposable:selfDisposable];
  621. }
  622. return disposable;
  623. }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
  624. }
  625. - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
  626. return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  627. RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
  628. RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
  629. [selfDisposable dispose];
  630. [subscriber sendNext:x];
  631. } error:^(NSError *error) {
  632. [selfDisposable dispose];
  633. [subscriber sendError:error];
  634. } completed:^{
  635. [selfDisposable dispose];
  636. [subscriber sendCompleted];
  637. }];
  638. if (!selfDisposable.disposed) {
  639. selfDisposable.disposable = [[self
  640. concat:[RACSignal never]]
  641. subscribe:subscriber];
  642. }
  643. return [RACDisposable disposableWithBlock:^{
  644. [selfDisposable dispose];
  645. [replacementDisposable dispose];
  646. }];
  647. }];
  648. }
  649. - (RACSignal *)switchToLatest {
  650. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  651. RACMulticastConnection *connection = [self publish];
  652. RACDisposable *subscriptionDisposable = [[connection.signal
  653. flattenMap:^(RACSignal *x) {
  654. NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
  655. // -concat:[RACSignal never] prevents completion of the receiver from
  656. // prematurely terminating the inner signal.
  657. return [x takeUntil:[connection.signal concat:[RACSignal never]]];
  658. }]
  659. subscribe:subscriber];
  660. RACDisposable *connectionDisposable = [connection connect];
  661. return [RACDisposable disposableWithBlock:^{
  662. [subscriptionDisposable dispose];
  663. [connectionDisposable dispose];
  664. }];
  665. }] setNameWithFormat:@"[%@] -switchToLatest", self.name];
  666. }
  667. + (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
  668. NSCParameterAssert(signal != nil);
  669. NSCParameterAssert(cases != nil);
  670. for (id key in cases) {
  671. id value __attribute__((unused)) = cases[key];
  672. NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
  673. }
  674. NSDictionary *copy = [cases copy];
  675. return [[[signal
  676. map:^(id key) {
  677. if (key == nil) key = RACTupleNil.tupleNil;
  678. RACSignal *signal = copy[key] ?: defaultSignal;
  679. if (signal == nil) {
  680. NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
  681. return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
  682. }
  683. return signal;
  684. }]
  685. switchToLatest]
  686. setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
  687. }
  688. + (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
  689. NSCParameterAssert(boolSignal != nil);
  690. NSCParameterAssert(trueSignal != nil);
  691. NSCParameterAssert(falseSignal != nil);
  692. return [[[boolSignal
  693. map:^(NSNumber *value) {
  694. NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
  695. return (value.boolValue ? trueSignal : falseSignal);
  696. }]
  697. switchToLatest]
  698. setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
  699. }
  700. - (id)first {
  701. return [self firstOrDefault:nil];
  702. }
  703. - (id)firstOrDefault:(id)defaultValue {
  704. return [self firstOrDefault:defaultValue success:NULL error:NULL];
  705. }
  706. - (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
  707. NSCondition *condition = [[NSCondition alloc] init];
  708. condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
  709. __block id value = defaultValue;
  710. __block BOOL done = NO;
  711. // Ensures that we don't pass values across thread boundaries by reference.
  712. __block NSError *localError;
  713. __block BOOL localSuccess;
  714. [[self take:1] subscribeNext:^(id x) {
  715. [condition lock];
  716. value = x;
  717. localSuccess = YES;
  718. done = YES;
  719. [condition broadcast];
  720. [condition unlock];
  721. } error:^(NSError *e) {
  722. [condition lock];
  723. if (!done) {
  724. localSuccess = NO;
  725. localError = e;
  726. done = YES;
  727. [condition broadcast];
  728. }
  729. [condition unlock];
  730. } completed:^{
  731. [condition lock];
  732. localSuccess = YES;
  733. done = YES;
  734. [condition broadcast];
  735. [condition unlock];
  736. }];
  737. [condition lock];
  738. while (!done) {
  739. [condition wait];
  740. }
  741. if (success != NULL) *success = localSuccess;
  742. if (error != NULL) *error = localError;
  743. [condition unlock];
  744. return value;
  745. }
  746. - (BOOL)waitUntilCompleted:(NSError **)error {
  747. BOOL success = NO;
  748. [[[self
  749. ignoreValues]
  750. setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
  751. firstOrDefault:nil success:&success error:error];
  752. return success;
  753. }
  754. + (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
  755. NSCParameterAssert(block != NULL);
  756. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  757. return [block() subscribe:subscriber];
  758. }] setNameWithFormat:@"+defer:"];
  759. }
  760. - (NSArray *)toArray {
  761. return [[[self collect] first] copy];
  762. }
  763. - (RACSequence *)sequence {
  764. return [[RACSignalSequence sequenceWithSignal:self] setNameWithFormat:@"[%@] -sequence", self.name];
  765. }
  766. - (RACMulticastConnection *)publish {
  767. RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
  768. RACMulticastConnection *connection = [self multicast:subject];
  769. return connection;
  770. }
  771. - (RACMulticastConnection *)multicast:(RACSubject *)subject {
  772. [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
  773. RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
  774. return connection;
  775. }
  776. - (RACSignal *)replay {
  777. RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
  778. RACMulticastConnection *connection = [self multicast:subject];
  779. [connection connect];
  780. return connection.signal;
  781. }
  782. - (RACSignal *)replayLast {
  783. RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
  784. RACMulticastConnection *connection = [self multicast:subject];
  785. [connection connect];
  786. return connection.signal;
  787. }
  788. - (RACSignal *)replayLazily {
  789. RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
  790. return [[RACSignal
  791. defer:^{
  792. [connection connect];
  793. return connection.signal;
  794. }]
  795. setNameWithFormat:@"[%@] -replayLazily", self.name];
  796. }
  797. - (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
  798. NSCParameterAssert(scheduler != nil);
  799. NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
  800. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  801. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  802. RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
  803. [disposable dispose];
  804. [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
  805. }];
  806. [disposable addDisposable:timeoutDisposable];
  807. RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
  808. [subscriber sendNext:x];
  809. } error:^(NSError *error) {
  810. [disposable dispose];
  811. [subscriber sendError:error];
  812. } completed:^{
  813. [disposable dispose];
  814. [subscriber sendCompleted];
  815. }];
  816. [disposable addDisposable:subscriptionDisposable];
  817. return disposable;
  818. }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
  819. }
  820. - (RACSignal *)deliverOn:(RACScheduler *)scheduler {
  821. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  822. return [self subscribeNext:^(id x) {
  823. [scheduler schedule:^{
  824. [subscriber sendNext:x];
  825. }];
  826. } error:^(NSError *error) {
  827. [scheduler schedule:^{
  828. [subscriber sendError:error];
  829. }];
  830. } completed:^{
  831. [scheduler schedule:^{
  832. [subscriber sendCompleted];
  833. }];
  834. }];
  835. }] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
  836. }
  837. - (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
  838. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  839. RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  840. RACDisposable *schedulingDisposable = [scheduler schedule:^{
  841. RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
  842. [disposable addDisposable:subscriptionDisposable];
  843. }];
  844. [disposable addDisposable:schedulingDisposable];
  845. return disposable;
  846. }] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
  847. }
  848. - (RACSignal *)deliverOnMainThread {
  849. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  850. __block volatile int32_t queueLength = 0;
  851. void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
  852. int32_t queued = OSAtomicIncrement32(&queueLength);
  853. if (NSThread.isMainThread && queued == 1) {
  854. block();
  855. OSAtomicDecrement32(&queueLength);
  856. } else {
  857. dispatch_async(dispatch_get_main_queue(), ^{
  858. block();
  859. OSAtomicDecrement32(&queueLength);
  860. });
  861. }
  862. };
  863. return [self subscribeNext:^(id x) {
  864. performOnMainThread(^{
  865. [subscriber sendNext:x];
  866. });
  867. } error:^(NSError *error) {
  868. performOnMainThread(^{
  869. [subscriber sendError:error];
  870. });
  871. } completed:^{
  872. performOnMainThread(^{
  873. [subscriber sendCompleted];
  874. });
  875. }];
  876. }] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
  877. }
  878. - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
  879. NSCParameterAssert(keyBlock != NULL);
  880. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  881. NSMutableDictionary *groups = [NSMutableDictionary dictionary];
  882. NSMutableArray *orderedGroups = [NSMutableArray array];
  883. return [self subscribeNext:^(id x) {
  884. id<NSCopying> key = keyBlock(x);
  885. RACGroupedSignal *groupSubject = nil;
  886. @synchronized(groups) {
  887. groupSubject = groups[key];
  888. if (groupSubject == nil) {
  889. groupSubject = [RACGroupedSignal signalWithKey:key];
  890. groups[key] = groupSubject;
  891. [orderedGroups addObject:groupSubject];
  892. [subscriber sendNext:groupSubject];
  893. }
  894. }
  895. [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
  896. } error:^(NSError *error) {
  897. [subscriber sendError:error];
  898. [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
  899. } completed:^{
  900. [subscriber sendCompleted];
  901. [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
  902. }];
  903. }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
  904. }
  905. - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
  906. return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
  907. }
  908. - (RACSignal *)any {
  909. return [[self any:^(id x) {
  910. return YES;
  911. }] setNameWithFormat:@"[%@] -any", self.name];
  912. }
  913. - (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
  914. NSCParameterAssert(predicateBlock != NULL);
  915. return [[[self materialize] bind:^{
  916. return ^(RACEvent *event, BOOL *stop) {
  917. if (event.finished) {
  918. *stop = YES;
  919. return [RACSignal return:@NO];
  920. }
  921. if (predicateBlock(event.value)) {
  922. *stop = YES;
  923. return [RACSignal return:@YES];
  924. }
  925. return [RACSignal empty];
  926. };
  927. }] setNameWithFormat:@"[%@] -any:", self.name];
  928. }
  929. - (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
  930. NSCParameterAssert(predicateBlock != NULL);
  931. return [[[self materialize] bind:^{
  932. return ^(RACEvent *event, BOOL *stop) {
  933. if (event.eventType == RACEventTypeCompleted) {
  934. *stop = YES;
  935. return [RACSignal return:@YES];
  936. }
  937. if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
  938. *stop = YES;
  939. return [RACSignal return:@NO];
  940. }
  941. return [RACSignal empty];
  942. };
  943. }] setNameWithFormat:@"[%@] -all:", self.name];
  944. }
  945. - (RACSignal *)retry:(NSInteger)retryCount {
  946. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  947. __block NSInteger currentRetryCount = 0;
  948. return subscribeForever(self,
  949. ^(id x) {
  950. [subscriber sendNext:x];
  951. },
  952. ^(NSError *error, RACDisposable *disposable) {
  953. if (retryCount == 0 || currentRetryCount < retryCount) {
  954. // Resubscribe.
  955. currentRetryCount++;
  956. return;
  957. }
  958. [disposable dispose];
  959. [subscriber sendError:error];
  960. },
  961. ^(RACDisposable *disposable) {
  962. [disposable dispose];
  963. [subscriber sendCompleted];
  964. });
  965. }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
  966. }
  967. - (RACSignal *)retry {
  968. return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
  969. }
  970. - (RACSignal *)sample:(RACSignal *)sampler {
  971. NSCParameterAssert(sampler != nil);
  972. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  973. NSLock *lock = [[NSLock alloc] init];
  974. __block id lastValue;
  975. __block BOOL hasValue = NO;
  976. RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
  977. RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
  978. [lock lock];
  979. hasValue = YES;
  980. lastValue = x;
  981. [lock unlock];
  982. } error:^(NSError *error) {
  983. [samplerDisposable dispose];
  984. [subscriber sendError:error];
  985. } completed:^{
  986. [samplerDisposable dispose];
  987. [subscriber sendCompleted];
  988. }];
  989. samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
  990. BOOL shouldSend = NO;
  991. id value;
  992. [lock lock];
  993. shouldSend = hasValue;
  994. value = lastValue;
  995. [lock unlock];
  996. if (shouldSend) {
  997. [subscriber sendNext:value];
  998. }
  999. } error:^(NSError *error) {
  1000. [sourceDisposable dispose];
  1001. [subscriber sendError:error];
  1002. } completed:^{
  1003. [sourceDisposable dispose];
  1004. [subscriber sendCompleted];
  1005. }];
  1006. return [RACDisposable disposableWithBlock:^{
  1007. [samplerDisposable dispose];
  1008. [sourceDisposable dispose];
  1009. }];
  1010. }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
  1011. }
  1012. - (RACSignal *)ignoreValues {
  1013. return [[self filter:^(id _) {
  1014. return NO;
  1015. }] setNameWithFormat:@"[%@] -ignoreValues", self.name];
  1016. }
  1017. - (RACSignal *)materialize {
  1018. return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  1019. return [self subscribeNext:^(id x) {
  1020. [subscriber sendNext:[RACEvent eventWithValue:x]];
  1021. } error:^(NSError *error) {
  1022. [subscriber sendNext:[RACEvent eventWithError:error]];
  1023. [subscriber sendCompleted];
  1024. } completed:^{
  1025. [subscriber sendNext:RACEvent.completedEvent];
  1026. [subscriber sendCompleted];
  1027. }];
  1028. }] setNameWithFormat:@"[%@] -materialize", self.name];
  1029. }
  1030. - (RACSignal *)dematerialize {
  1031. return [[self bind:^{
  1032. return ^(RACEvent *event, BOOL *stop) {
  1033. switch (event.eventType) {
  1034. case RACEventTypeCompleted:
  1035. *stop = YES;
  1036. return [RACSignal empty];
  1037. case RACEventTypeError:
  1038. *stop = YES;
  1039. return [RACSignal error:event.error];
  1040. case RACEventTypeNext:
  1041. return [RACSignal return:event.value];
  1042. }
  1043. };
  1044. }] setNameWithFormat:@"[%@] -dematerialize", self.name];
  1045. }
  1046. - (RACSignal *)not {
  1047. return [[self map:^(NSNumber *value) {
  1048. NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
  1049. return @(!value.boolValue);
  1050. }] setNameWithFormat:@"[%@] -not", self.name];
  1051. }
  1052. - (RACSignal *)and {
  1053. return [[self map:^(RACTuple *tuple) {
  1054. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
  1055. NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
  1056. return @([tuple.rac_sequence all:^(NSNumber *number) {
  1057. NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
  1058. return number.boolValue;
  1059. }]);
  1060. }] setNameWithFormat:@"[%@] -and", self.name];
  1061. }
  1062. - (RACSignal *)or {
  1063. return [[self map:^(RACTuple *tuple) {
  1064. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
  1065. NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
  1066. return @([tuple.rac_sequence any:^(NSNumber *number) {
  1067. NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
  1068. return number.boolValue;
  1069. }]);
  1070. }] setNameWithFormat:@"[%@] -or", self.name];
  1071. }
  1072. - (RACSignal *)reduceApply {
  1073. return [[self map:^(RACTuple *tuple) {
  1074. NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
  1075. NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
  1076. // We can't use -array, because we need to preserve RACTupleNil
  1077. NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
  1078. for (id val in tuple) {
  1079. [tupleArray addObject:val];
  1080. }
  1081. RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
  1082. return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
  1083. }] setNameWithFormat:@"[%@] -reduceApply", self.name];
  1084. }
  1085. @end