|
- //
- // RACSignal+Operations.m
- // ReactiveObjC
- //
- // Created by Justin Spahr-Summers on 2012-09-06.
- // Copyright (c) 2012 GitHub, Inc. All rights reserved.
- //
- #import "RACSignal+Operations.h"
- #import "NSObject+RACDeallocating.h"
- #import "NSObject+RACDescription.h"
- #import "RACBlockTrampoline.h"
- #import "RACCommand.h"
- #import "RACCompoundDisposable.h"
- #import "RACDisposable.h"
- #import "RACEvent.h"
- #import "RACGroupedSignal.h"
- #import "RACMulticastConnection+Private.h"
- #import "RACReplaySubject.h"
- #import "RACScheduler.h"
- #import "RACSerialDisposable.h"
- #import "RACSignalSequence.h"
- #import "RACStream+Private.h"
- #import "RACSubject.h"
- #import "RACSubscriber+Private.h"
- #import "RACSubscriber.h"
- #import "RACTuple.h"
- #import "RACUnit.h"
- #import <libkern/OSAtomic.h>
- #import <objc/runtime.h>
- NSErrorDomain const RACSignalErrorDomain = @"RACSignalErrorDomain";
- // Subscribes to the given signal with the given blocks.
- //
- // If the signal errors or completes, the corresponding block is invoked. If the
- // disposable passed to the block is _not_ disposed, then the signal is
- // subscribed to again.
- static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
- next = [next copy];
- error = [error copy];
- completed = [completed copy];
- RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
- RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
- RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
- [compoundDisposable addDisposable:selfDisposable];
- __weak RACDisposable *weakSelfDisposable = selfDisposable;
- RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
- @autoreleasepool {
- error(e, compoundDisposable);
- [compoundDisposable removeDisposable:weakSelfDisposable];
- }
- recurse();
- } completed:^{
- @autoreleasepool {
- completed(compoundDisposable);
- [compoundDisposable removeDisposable:weakSelfDisposable];
- }
- recurse();
- }];
- [selfDisposable addDisposable:subscriptionDisposable];
- };
- // Subscribe once immediately, and then use recursive scheduling for any
- // further resubscriptions.
- recursiveBlock(^{
- RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
- RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
- [compoundDisposable addDisposable:schedulingDisposable];
- });
- return compoundDisposable;
- }
- @implementation RACSignal (Operations)
- - (RACSignal *)doNext:(void (^)(id x))block {
- NSCParameterAssert(block != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [self subscribeNext:^(id x) {
- block(x);
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- [subscriber sendCompleted];
- }];
- }] setNameWithFormat:@"[%@] -doNext:", self.name];
- }
- - (RACSignal *)doError:(void (^)(NSError *error))block {
- NSCParameterAssert(block != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- block(error);
- [subscriber sendError:error];
- } completed:^{
- [subscriber sendCompleted];
- }];
- }] setNameWithFormat:@"[%@] -doError:", self.name];
- }
- - (RACSignal *)doCompleted:(void (^)(void))block {
- NSCParameterAssert(block != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- block();
- [subscriber sendCompleted];
- }];
- }] setNameWithFormat:@"[%@] -doCompleted:", self.name];
- }
- - (RACSignal *)throttle:(NSTimeInterval)interval {
- return [[self throttle:interval valuesPassingTest:^(id _) {
- return YES;
- }] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
- }
- - (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
- NSCParameterAssert(interval >= 0);
- NSCParameterAssert(predicate != nil);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
- // We may never use this scheduler, but we need to set it up ahead of
- // time so that our scheduled blocks are run serially if we do.
- RACScheduler *scheduler = [RACScheduler scheduler];
- // Information about any currently-buffered `next` event.
- __block id nextValue = nil;
- __block BOOL hasNextValue = NO;
- RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
- void (^flushNext)(BOOL send) = ^(BOOL send) {
- @synchronized (compoundDisposable) {
- [nextDisposable.disposable dispose];
- if (!hasNextValue) return;
- if (send) [subscriber sendNext:nextValue];
- nextValue = nil;
- hasNextValue = NO;
- }
- };
- RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
- RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
- BOOL shouldThrottle = predicate(x);
- @synchronized (compoundDisposable) {
- flushNext(NO);
- if (!shouldThrottle) {
- [subscriber sendNext:x];
- return;
- }
- nextValue = x;
- hasNextValue = YES;
- nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
- flushNext(YES);
- }];
- }
- } error:^(NSError *error) {
- [compoundDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- flushNext(YES);
- [subscriber sendCompleted];
- }];
- [compoundDisposable addDisposable:subscriptionDisposable];
- return compoundDisposable;
- }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
- }
- - (RACSignal *)delay:(NSTimeInterval)interval {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- // We may never use this scheduler, but we need to set it up ahead of
- // time so that our scheduled blocks are run serially if we do.
- RACScheduler *scheduler = [RACScheduler scheduler];
- void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
- RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
- RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
- [disposable addDisposable:schedulerDisposable];
- };
- RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
- schedule(^{
- [subscriber sendNext:x];
- });
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- schedule(^{
- [subscriber sendCompleted];
- });
- }];
- [disposable addDisposable:subscriptionDisposable];
- return disposable;
- }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
- }
- - (RACSignal *)repeat {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return subscribeForever(self,
- ^(id x) {
- [subscriber sendNext:x];
- },
- ^(NSError *error, RACDisposable *disposable) {
- [disposable dispose];
- [subscriber sendError:error];
- },
- ^(RACDisposable *disposable) {
- // Resubscribe.
- });
- }] setNameWithFormat:@"[%@] -repeat", self.name];
- }
- - (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
- NSCParameterAssert(catchBlock != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
- RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- RACSignal *signal = catchBlock(error);
- NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
- catchDisposable.disposable = [signal subscribe:subscriber];
- } completed:^{
- [subscriber sendCompleted];
- }];
- return [RACDisposable disposableWithBlock:^{
- [catchDisposable dispose];
- [subscriptionDisposable dispose];
- }];
- }] setNameWithFormat:@"[%@] -catch:", self.name];
- }
- - (RACSignal *)catchTo:(RACSignal *)signal {
- return [[self catch:^(NSError *error) {
- return signal;
- }] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
- }
- + (RACSignal *)try:(id (^)(NSError **errorPtr))tryBlock {
- NSCParameterAssert(tryBlock != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- NSError *error;
- id value = tryBlock(&error);
- RACSignal *signal = (value == nil ? [RACSignal error:error] : [RACSignal return:value]);
- return [signal subscribe:subscriber];
- }] setNameWithFormat:@"+try:"];
- }
- - (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
- NSCParameterAssert(tryBlock != NULL);
- return [[self flattenMap:^(id value) {
- NSError *error = nil;
- BOOL passed = tryBlock(value, &error);
- return (passed ? [RACSignal return:value] : [RACSignal error:error]);
- }] setNameWithFormat:@"[%@] -try:", self.name];
- }
- - (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
- NSCParameterAssert(mapBlock != NULL);
- return [[self flattenMap:^(id value) {
- NSError *error = nil;
- id mappedValue = mapBlock(value, &error);
- return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
- }] setNameWithFormat:@"[%@] -tryMap:", self.name];
- }
- - (RACSignal *)initially:(void (^)(void))block {
- NSCParameterAssert(block != NULL);
- return [[RACSignal defer:^{
- block();
- return self;
- }] setNameWithFormat:@"[%@] -initially:", self.name];
- }
- - (RACSignal *)finally:(void (^)(void))block {
- NSCParameterAssert(block != NULL);
- return [[[self
- doError:^(NSError *error) {
- block();
- }]
- doCompleted:^{
- block();
- }]
- setNameWithFormat:@"[%@] -finally:", self.name];
- }
- - (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
- NSCParameterAssert(scheduler != nil);
- NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
- NSMutableArray *values = [NSMutableArray array];
- void (^flushValues)(void) = ^{
- @synchronized (values) {
- [timerDisposable.disposable dispose];
- if (values.count == 0) return;
- RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
- [values removeAllObjects];
- [subscriber sendNext:tuple];
- }
- };
- RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
- @synchronized (values) {
- if (values.count == 0) {
- timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
- }
- [values addObject:x ?: RACTupleNil.tupleNil];
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- flushValues();
- [subscriber sendCompleted];
- }];
- return [RACDisposable disposableWithBlock:^{
- [selfDisposable dispose];
- [timerDisposable dispose];
- }];
- }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
- }
- - (RACSignal *)collect {
- return [[self aggregateWithStartFactory:^{
- return [[NSMutableArray alloc] init];
- } reduce:^(NSMutableArray *collectedValues, id x) {
- [collectedValues addObject:(x ?: NSNull.null)];
- return collectedValues;
- }] setNameWithFormat:@"[%@] -collect", self.name];
- }
- - (RACSignal *)takeLast:(NSUInteger)count {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
- return [self subscribeNext:^(id x) {
- [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
- while (valuesTaken.count > count) {
- [valuesTaken removeObjectAtIndex:0];
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- for (id value in valuesTaken) {
- [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
- }
- [subscriber sendCompleted];
- }];
- }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
- }
- - (RACSignal *)combineLatestWith:(RACSignal *)signal {
- NSCParameterAssert(signal != nil);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- __block id lastSelfValue = nil;
- __block BOOL selfCompleted = NO;
- __block id lastOtherValue = nil;
- __block BOOL otherCompleted = NO;
- void (^sendNext)(void) = ^{
- @synchronized (disposable) {
- if (lastSelfValue == nil || lastOtherValue == nil) return;
- [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
- }
- };
- RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
- @synchronized (disposable) {
- lastSelfValue = x ?: RACTupleNil.tupleNil;
- sendNext();
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- @synchronized (disposable) {
- selfCompleted = YES;
- if (otherCompleted) [subscriber sendCompleted];
- }
- }];
- [disposable addDisposable:selfDisposable];
- RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
- @synchronized (disposable) {
- lastOtherValue = x ?: RACTupleNil.tupleNil;
- sendNext();
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- @synchronized (disposable) {
- otherCompleted = YES;
- if (selfCompleted) [subscriber sendCompleted];
- }
- }];
- [disposable addDisposable:otherDisposable];
- return disposable;
- }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
- }
- + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
- return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
- return [left combineLatestWith:right];
- }] setNameWithFormat:@"+combineLatest: %@", signals];
- }
- + (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(RACGenericReduceBlock)reduceBlock {
- NSCParameterAssert(reduceBlock != nil);
- RACSignal *result = [self combineLatest:signals];
- // Although we assert this condition above, older versions of this method
- // supported this argument being nil. Avoid crashing Release builds of
- // apps that depended on that.
- if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
- return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
- }
- - (RACSignal *)merge:(RACSignal *)signal {
- return [[RACSignal
- merge:@[ self, signal ]]
- setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
- }
- + (RACSignal *)merge:(id<NSFastEnumeration>)signals {
- NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
- for (RACSignal *signal in signals) {
- [copiedSignals addObject:signal];
- }
- return [[[RACSignal
- createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
- for (RACSignal *signal in copiedSignals) {
- [subscriber sendNext:signal];
- }
- [subscriber sendCompleted];
- return nil;
- }]
- flatten]
- setNameWithFormat:@"+merge: %@", copiedSignals];
- }
- - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
- // Contains disposables for the currently active subscriptions.
- //
- // This should only be used while synchronized on `subscriber`.
- NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
- // Whether the signal-of-signals has completed yet.
- //
- // This should only be used while synchronized on `subscriber`.
- __block BOOL selfCompleted = NO;
- // Subscribes to the given signal.
- __block void (^subscribeToSignal)(RACSignal *);
- // Weak reference to the above, to avoid a leak.
- __weak __block void (^recur)(RACSignal *);
- // Sends completed to the subscriber if all signals are finished.
- //
- // This should only be used while synchronized on `subscriber`.
- void (^completeIfAllowed)(void) = ^{
- if (selfCompleted && activeDisposables.count == 0) {
- [subscriber sendCompleted];
- }
- };
- // The signals waiting to be started.
- //
- // This array should only be used while synchronized on `subscriber`.
- NSMutableArray *queuedSignals = [NSMutableArray array];
- recur = subscribeToSignal = ^(RACSignal *signal) {
- RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
- @synchronized (subscriber) {
- [compoundDisposable addDisposable:serialDisposable];
- [activeDisposables addObject:serialDisposable];
- }
- serialDisposable.disposable = [signal subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- __strong void (^subscribeToSignal)(RACSignal *) = recur;
- RACSignal *nextSignal;
- @synchronized (subscriber) {
- [compoundDisposable removeDisposable:serialDisposable];
- [activeDisposables removeObjectIdenticalTo:serialDisposable];
- if (queuedSignals.count == 0) {
- completeIfAllowed();
- return;
- }
- nextSignal = queuedSignals[0];
- [queuedSignals removeObjectAtIndex:0];
- }
- subscribeToSignal(nextSignal);
- }];
- };
- [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
- if (signal == nil) return;
- NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
- @synchronized (subscriber) {
- if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
- [queuedSignals addObject:signal];
- // If we need to wait, skip subscribing to this
- // signal.
- return;
- }
- }
- subscribeToSignal(signal);
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- @synchronized (subscriber) {
- selfCompleted = YES;
- completeIfAllowed();
- }
- }]];
- [compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
- // A strong reference is held to `subscribeToSignal` until we're
- // done, preventing it from deallocating early.
- subscribeToSignal = nil;
- }]];
- return compoundDisposable;
- }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
- }
- - (RACSignal *)then:(RACSignal * (^)(void))block {
- NSCParameterAssert(block != nil);
- return [[[self
- ignoreValues]
- concat:[RACSignal defer:block]]
- setNameWithFormat:@"[%@] -then:", self.name];
- }
- - (RACSignal *)concat {
- return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
- }
- - (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
- NSCParameterAssert(startFactory != NULL);
- NSCParameterAssert(reduceBlock != NULL);
- return [[RACSignal defer:^{
- return [self aggregateWithStart:startFactory() reduce:reduceBlock];
- }] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
- }
- - (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
- return [[self
- aggregateWithStart:start
- reduceWithIndex:^(id running, id next, NSUInteger index) {
- return reduceBlock(running, next);
- }]
- setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, RACDescription(start)];
- }
- - (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
- return [[[[self
- scanWithStart:start reduceWithIndex:reduceBlock]
- startWith:start]
- takeLast:1]
- setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, RACDescription(start)];
- }
- - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
- return [self setKeyPath:keyPath onObject:object nilValue:nil];
- }
- - (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
- NSCParameterAssert(keyPath != nil);
- NSCParameterAssert(object != nil);
- keyPath = [keyPath copy];
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- // Purposely not retaining 'object', since we want to tear down the binding
- // when it deallocates normally.
- __block void * volatile objectPtr = (__bridge void *)object;
- RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
- // Possibly spec, possibly compiler bug, but this __bridge cast does not
- // result in a retain here, effectively an invisible __unsafe_unretained
- // qualifier. Using objc_precise_lifetime gives the __strong reference
- // desired. The explicit use of __strong is strictly defensive.
- __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
- [object setValue:x ?: nilValue forKeyPath:keyPath];
- } error:^(NSError *error) {
- __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
- NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
- // Log the error if we're running with assertions disabled.
- NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
- [disposable dispose];
- } completed:^{
- [disposable dispose];
- }];
- [disposable addDisposable:subscriptionDisposable];
- #if DEBUG
- static void *bindingsKey = &bindingsKey;
- NSMutableDictionary *bindings;
- @synchronized (object) {
- bindings = objc_getAssociatedObject(object, bindingsKey);
- if (bindings == nil) {
- bindings = [NSMutableDictionary dictionary];
- objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
- }
- }
- @synchronized (bindings) {
- NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
- bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
- }
- #endif
- RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
- #if DEBUG
- @synchronized (bindings) {
- [bindings removeObjectForKey:keyPath];
- }
- #endif
- while (YES) {
- void *ptr = objectPtr;
- if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
- break;
- }
- }
- }];
- [disposable addDisposable:clearPointerDisposable];
- [object.rac_deallocDisposable addDisposable:disposable];
- RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
- return [RACDisposable disposableWithBlock:^{
- [objectDisposable removeDisposable:disposable];
- [disposable dispose];
- }];
- }
- + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
- return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
- }
- + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
- NSCParameterAssert(scheduler != nil);
- NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
- [subscriber sendNext:[NSDate date]];
- }];
- }] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
- }
- - (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- void (^triggerCompletion)(void) = ^{
- [disposable dispose];
- [subscriber sendCompleted];
- };
- RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
- triggerCompletion();
- } completed:^{
- triggerCompletion();
- }];
- [disposable addDisposable:triggerDisposable];
- if (!disposable.disposed) {
- RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- [disposable dispose];
- [subscriber sendCompleted];
- }];
- [disposable addDisposable:selfDisposable];
- }
- return disposable;
- }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
- }
- - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
- return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
- RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
- [selfDisposable dispose];
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [selfDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- [selfDisposable dispose];
- [subscriber sendCompleted];
- }];
- if (!selfDisposable.disposed) {
- selfDisposable.disposable = [[self
- concat:[RACSignal never]]
- subscribe:subscriber];
- }
- return [RACDisposable disposableWithBlock:^{
- [selfDisposable dispose];
- [replacementDisposable dispose];
- }];
- }];
- }
- - (RACSignal *)switchToLatest {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACMulticastConnection *connection = [self publish];
- RACDisposable *subscriptionDisposable = [[connection.signal
- flattenMap:^(RACSignal *x) {
- NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
- // -concat:[RACSignal never] prevents completion of the receiver from
- // prematurely terminating the inner signal.
- return [x takeUntil:[connection.signal concat:[RACSignal never]]];
- }]
- subscribe:subscriber];
- RACDisposable *connectionDisposable = [connection connect];
- return [RACDisposable disposableWithBlock:^{
- [subscriptionDisposable dispose];
- [connectionDisposable dispose];
- }];
- }] setNameWithFormat:@"[%@] -switchToLatest", self.name];
- }
- + (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
- NSCParameterAssert(signal != nil);
- NSCParameterAssert(cases != nil);
- for (id key in cases) {
- id value __attribute__((unused)) = cases[key];
- NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
- }
- NSDictionary *copy = [cases copy];
- return [[[signal
- map:^(id key) {
- if (key == nil) key = RACTupleNil.tupleNil;
- RACSignal *signal = copy[key] ?: defaultSignal;
- if (signal == nil) {
- NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
- return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
- }
- return signal;
- }]
- switchToLatest]
- setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
- }
- + (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
- NSCParameterAssert(boolSignal != nil);
- NSCParameterAssert(trueSignal != nil);
- NSCParameterAssert(falseSignal != nil);
- return [[[boolSignal
- map:^(NSNumber *value) {
- NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
- return (value.boolValue ? trueSignal : falseSignal);
- }]
- switchToLatest]
- setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
- }
- - (id)first {
- return [self firstOrDefault:nil];
- }
- - (id)firstOrDefault:(id)defaultValue {
- return [self firstOrDefault:defaultValue success:NULL error:NULL];
- }
- - (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
- NSCondition *condition = [[NSCondition alloc] init];
- condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
- __block id value = defaultValue;
- __block BOOL done = NO;
- // Ensures that we don't pass values across thread boundaries by reference.
- __block NSError *localError;
- __block BOOL localSuccess;
- [[self take:1] subscribeNext:^(id x) {
- [condition lock];
- value = x;
- localSuccess = YES;
- done = YES;
- [condition broadcast];
- [condition unlock];
- } error:^(NSError *e) {
- [condition lock];
- if (!done) {
- localSuccess = NO;
- localError = e;
- done = YES;
- [condition broadcast];
- }
- [condition unlock];
- } completed:^{
- [condition lock];
- localSuccess = YES;
- done = YES;
- [condition broadcast];
- [condition unlock];
- }];
- [condition lock];
- while (!done) {
- [condition wait];
- }
- if (success != NULL) *success = localSuccess;
- if (error != NULL) *error = localError;
- [condition unlock];
- return value;
- }
- - (BOOL)waitUntilCompleted:(NSError **)error {
- BOOL success = NO;
- [[[self
- ignoreValues]
- setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
- firstOrDefault:nil success:&success error:error];
- return success;
- }
- + (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
- NSCParameterAssert(block != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [block() subscribe:subscriber];
- }] setNameWithFormat:@"+defer:"];
- }
- - (NSArray *)toArray {
- return [[[self collect] first] copy];
- }
- - (RACSequence *)sequence {
- return [[RACSignalSequence sequenceWithSignal:self] setNameWithFormat:@"[%@] -sequence", self.name];
- }
- - (RACMulticastConnection *)publish {
- RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
- RACMulticastConnection *connection = [self multicast:subject];
- return connection;
- }
- - (RACMulticastConnection *)multicast:(RACSubject *)subject {
- [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
- RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
- return connection;
- }
- - (RACSignal *)replay {
- RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
- RACMulticastConnection *connection = [self multicast:subject];
- [connection connect];
- return connection.signal;
- }
- - (RACSignal *)replayLast {
- RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
- RACMulticastConnection *connection = [self multicast:subject];
- [connection connect];
- return connection.signal;
- }
- - (RACSignal *)replayLazily {
- RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
- return [[RACSignal
- defer:^{
- [connection connect];
- return connection.signal;
- }]
- setNameWithFormat:@"[%@] -replayLazily", self.name];
- }
- - (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
- NSCParameterAssert(scheduler != nil);
- NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
- [disposable dispose];
- [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
- }];
- [disposable addDisposable:timeoutDisposable];
- RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [disposable dispose];
- [subscriber sendError:error];
- } completed:^{
- [disposable dispose];
- [subscriber sendCompleted];
- }];
- [disposable addDisposable:subscriptionDisposable];
- return disposable;
- }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
- }
- - (RACSignal *)deliverOn:(RACScheduler *)scheduler {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [self subscribeNext:^(id x) {
- [scheduler schedule:^{
- [subscriber sendNext:x];
- }];
- } error:^(NSError *error) {
- [scheduler schedule:^{
- [subscriber sendError:error];
- }];
- } completed:^{
- [scheduler schedule:^{
- [subscriber sendCompleted];
- }];
- }];
- }] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
- }
- - (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
- RACDisposable *schedulingDisposable = [scheduler schedule:^{
- RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
- [disposable addDisposable:subscriptionDisposable];
- }];
- [disposable addDisposable:schedulingDisposable];
- return disposable;
- }] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
- }
- - (RACSignal *)deliverOnMainThread {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- __block volatile int32_t queueLength = 0;
-
- void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
- int32_t queued = OSAtomicIncrement32(&queueLength);
- if (NSThread.isMainThread && queued == 1) {
- block();
- OSAtomicDecrement32(&queueLength);
- } else {
- dispatch_async(dispatch_get_main_queue(), ^{
- block();
- OSAtomicDecrement32(&queueLength);
- });
- }
- };
- return [self subscribeNext:^(id x) {
- performOnMainThread(^{
- [subscriber sendNext:x];
- });
- } error:^(NSError *error) {
- performOnMainThread(^{
- [subscriber sendError:error];
- });
- } completed:^{
- performOnMainThread(^{
- [subscriber sendCompleted];
- });
- }];
- }] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
- }
- - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
- NSCParameterAssert(keyBlock != NULL);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- NSMutableDictionary *groups = [NSMutableDictionary dictionary];
- NSMutableArray *orderedGroups = [NSMutableArray array];
- return [self subscribeNext:^(id x) {
- id<NSCopying> key = keyBlock(x);
- RACGroupedSignal *groupSubject = nil;
- @synchronized(groups) {
- groupSubject = groups[key];
- if (groupSubject == nil) {
- groupSubject = [RACGroupedSignal signalWithKey:key];
- groups[key] = groupSubject;
- [orderedGroups addObject:groupSubject];
- [subscriber sendNext:groupSubject];
- }
- }
- [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
- } completed:^{
- [subscriber sendCompleted];
- [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
- }];
- }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
- }
- - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
- return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
- }
- - (RACSignal *)any {
- return [[self any:^(id x) {
- return YES;
- }] setNameWithFormat:@"[%@] -any", self.name];
- }
- - (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
- NSCParameterAssert(predicateBlock != NULL);
- return [[[self materialize] bind:^{
- return ^(RACEvent *event, BOOL *stop) {
- if (event.finished) {
- *stop = YES;
- return [RACSignal return:@NO];
- }
- if (predicateBlock(event.value)) {
- *stop = YES;
- return [RACSignal return:@YES];
- }
- return [RACSignal empty];
- };
- }] setNameWithFormat:@"[%@] -any:", self.name];
- }
- - (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
- NSCParameterAssert(predicateBlock != NULL);
- return [[[self materialize] bind:^{
- return ^(RACEvent *event, BOOL *stop) {
- if (event.eventType == RACEventTypeCompleted) {
- *stop = YES;
- return [RACSignal return:@YES];
- }
- if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
- *stop = YES;
- return [RACSignal return:@NO];
- }
- return [RACSignal empty];
- };
- }] setNameWithFormat:@"[%@] -all:", self.name];
- }
- - (RACSignal *)retry:(NSInteger)retryCount {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- __block NSInteger currentRetryCount = 0;
- return subscribeForever(self,
- ^(id x) {
- [subscriber sendNext:x];
- },
- ^(NSError *error, RACDisposable *disposable) {
- if (retryCount == 0 || currentRetryCount < retryCount) {
- // Resubscribe.
- currentRetryCount++;
- return;
- }
- [disposable dispose];
- [subscriber sendError:error];
- },
- ^(RACDisposable *disposable) {
- [disposable dispose];
- [subscriber sendCompleted];
- });
- }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
- }
- - (RACSignal *)retry {
- return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
- }
- - (RACSignal *)sample:(RACSignal *)sampler {
- NSCParameterAssert(sampler != nil);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- NSLock *lock = [[NSLock alloc] init];
- __block id lastValue;
- __block BOOL hasValue = NO;
- RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
- RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
- [lock lock];
- hasValue = YES;
- lastValue = x;
- [lock unlock];
- } error:^(NSError *error) {
- [samplerDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- [samplerDisposable dispose];
- [subscriber sendCompleted];
- }];
- samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
- BOOL shouldSend = NO;
- id value;
- [lock lock];
- shouldSend = hasValue;
- value = lastValue;
- [lock unlock];
- if (shouldSend) {
- [subscriber sendNext:value];
- }
- } error:^(NSError *error) {
- [sourceDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- [sourceDisposable dispose];
- [subscriber sendCompleted];
- }];
- return [RACDisposable disposableWithBlock:^{
- [samplerDisposable dispose];
- [sourceDisposable dispose];
- }];
- }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
- }
- - (RACSignal *)ignoreValues {
- return [[self filter:^(id _) {
- return NO;
- }] setNameWithFormat:@"[%@] -ignoreValues", self.name];
- }
- - (RACSignal *)materialize {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- return [self subscribeNext:^(id x) {
- [subscriber sendNext:[RACEvent eventWithValue:x]];
- } error:^(NSError *error) {
- [subscriber sendNext:[RACEvent eventWithError:error]];
- [subscriber sendCompleted];
- } completed:^{
- [subscriber sendNext:RACEvent.completedEvent];
- [subscriber sendCompleted];
- }];
- }] setNameWithFormat:@"[%@] -materialize", self.name];
- }
- - (RACSignal *)dematerialize {
- return [[self bind:^{
- return ^(RACEvent *event, BOOL *stop) {
- switch (event.eventType) {
- case RACEventTypeCompleted:
- *stop = YES;
- return [RACSignal empty];
- case RACEventTypeError:
- *stop = YES;
- return [RACSignal error:event.error];
- case RACEventTypeNext:
- return [RACSignal return:event.value];
- }
- };
- }] setNameWithFormat:@"[%@] -dematerialize", self.name];
- }
- - (RACSignal *)not {
- return [[self map:^(NSNumber *value) {
- NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
- return @(!value.boolValue);
- }] setNameWithFormat:@"[%@] -not", self.name];
- }
- - (RACSignal *)and {
- return [[self map:^(RACTuple *tuple) {
- NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
- NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
- return @([tuple.rac_sequence all:^(NSNumber *number) {
- NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
- return number.boolValue;
- }]);
- }] setNameWithFormat:@"[%@] -and", self.name];
- }
- - (RACSignal *)or {
- return [[self map:^(RACTuple *tuple) {
- NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
- NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
- return @([tuple.rac_sequence any:^(NSNumber *number) {
- NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
- return number.boolValue;
- }]);
- }] setNameWithFormat:@"[%@] -or", self.name];
- }
- - (RACSignal *)reduceApply {
- return [[self map:^(RACTuple *tuple) {
- NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
- NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
- // We can't use -array, because we need to preserve RACTupleNil
- NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
- for (id val in tuple) {
- [tupleArray addObject:val];
- }
- RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
- return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
- }] setNameWithFormat:@"[%@] -reduceApply", self.name];
- }
- @end
|