123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- //
- // RACSignal.m
- // ReactiveObjC
- //
- // Created by Josh Abernathy on 3/15/12.
- // Copyright (c) 2012 GitHub, Inc. All rights reserved.
- //
- #import "RACSignal.h"
- #import "RACCompoundDisposable.h"
- #import "RACDisposable.h"
- #import "RACDynamicSignal.h"
- #import "RACEmptySignal.h"
- #import "RACErrorSignal.h"
- #import "RACMulticastConnection.h"
- #import "RACReplaySubject.h"
- #import "RACReturnSignal.h"
- #import "RACScheduler.h"
- #import "RACSerialDisposable.h"
- #import "RACSignal+Operations.h"
- #import "RACSubject.h"
- #import "RACSubscriber+Private.h"
- #import "RACTuple.h"
- #import <libkern/OSAtomic.h>
- @implementation RACSignal
- #pragma mark Lifecycle
- + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
- return [RACDynamicSignal createSignal:didSubscribe];
- }
- + (RACSignal *)error:(NSError *)error {
- return [RACErrorSignal error:error];
- }
- + (RACSignal *)never {
- return [[self createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
- return nil;
- }] setNameWithFormat:@"+never"];
- }
- + (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
- NSCParameterAssert(scheduler != nil);
- NSCParameterAssert(block != NULL);
- RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
- // Subscribe to force the lazy signal to call its block.
- [[signal publish] connect];
- return [signal setNameWithFormat:@"+startEagerlyWithScheduler: %@ block:", scheduler];
- }
- + (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
- NSCParameterAssert(scheduler != nil);
- NSCParameterAssert(block != NULL);
- RACMulticastConnection *connection = [[RACSignal
- createSignal:^ id (id<RACSubscriber> subscriber) {
- block(subscriber);
- return nil;
- }]
- multicast:[RACReplaySubject subject]];
-
- return [[[RACSignal
- createSignal:^ id (id<RACSubscriber> subscriber) {
- [connection.signal subscribe:subscriber];
- [connection connect];
- return nil;
- }]
- subscribeOn:scheduler]
- setNameWithFormat:@"+startLazilyWithScheduler: %@ block:", scheduler];
- }
- #pragma mark NSObject
- - (NSString *)description {
- return [NSString stringWithFormat:@"<%@: %p> name: %@", self.class, self, self.name];
- }
- @end
- @implementation RACSignal (RACStream)
- + (RACSignal *)empty {
- return [RACEmptySignal empty];
- }
- + (RACSignal *)return:(id)value {
- return [RACReturnSignal return:value];
- }
- - (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
- NSCParameterAssert(block != NULL);
- /*
- * -bind: should:
- *
- * 1. Subscribe to the original signal of values.
- * 2. Any time the original signal sends a value, transform it using the binding block.
- * 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
- * 4. If the binding block asks the bind to terminate, complete the _original_ signal.
- * 5. When _all_ signals complete, send completed to the subscriber.
- *
- * If any signal sends an error at any point, send that to the subscriber.
- */
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACSignalBindBlock bindingBlock = block();
- __block volatile int32_t signalCount = 1; // indicates self
- RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
- void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
- if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
- [subscriber sendCompleted];
- [compoundDisposable dispose];
- } else {
- [compoundDisposable removeDisposable:finishedDisposable];
- }
- };
- void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
- OSAtomicIncrement32Barrier(&signalCount);
- RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
- [compoundDisposable addDisposable:selfDisposable];
- RACDisposable *disposable = [signal subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [compoundDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- @autoreleasepool {
- completeSignal(selfDisposable);
- }
- }];
- selfDisposable.disposable = disposable;
- };
- @autoreleasepool {
- RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
- [compoundDisposable addDisposable:selfDisposable];
- RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
- // Manually check disposal to handle synchronous errors.
- if (compoundDisposable.disposed) return;
- BOOL stop = NO;
- id signal = bindingBlock(x, &stop);
- @autoreleasepool {
- if (signal != nil) addSignal(signal);
- if (signal == nil || stop) {
- [selfDisposable dispose];
- completeSignal(selfDisposable);
- }
- }
- } error:^(NSError *error) {
- [compoundDisposable dispose];
- [subscriber sendError:error];
- } completed:^{
- @autoreleasepool {
- completeSignal(selfDisposable);
- }
- }];
- selfDisposable.disposable = bindingDisposable;
- }
- return compoundDisposable;
- }] setNameWithFormat:@"[%@] -bind:", self.name];
- }
- - (RACSignal *)concat:(RACSignal *)signal {
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
- RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
- [subscriber sendNext:x];
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- RACDisposable *concattedDisposable = [signal subscribe:subscriber];
- [compoundDisposable addDisposable:concattedDisposable];
- }];
- [compoundDisposable addDisposable:sourceDisposable];
- return compoundDisposable;
- }] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
- }
- - (RACSignal *)zipWith:(RACSignal *)signal {
- NSCParameterAssert(signal != nil);
- return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
- __block BOOL selfCompleted = NO;
- NSMutableArray *selfValues = [NSMutableArray array];
- __block BOOL otherCompleted = NO;
- NSMutableArray *otherValues = [NSMutableArray array];
- void (^sendCompletedIfNecessary)(void) = ^{
- @synchronized (selfValues) {
- BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
- BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
- if (selfEmpty || otherEmpty) [subscriber sendCompleted];
- }
- };
- void (^sendNext)(void) = ^{
- @synchronized (selfValues) {
- if (selfValues.count == 0) return;
- if (otherValues.count == 0) return;
- RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
- [selfValues removeObjectAtIndex:0];
- [otherValues removeObjectAtIndex:0];
- [subscriber sendNext:tuple];
- sendCompletedIfNecessary();
- }
- };
- RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
- @synchronized (selfValues) {
- [selfValues addObject:x ?: RACTupleNil.tupleNil];
- sendNext();
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- @synchronized (selfValues) {
- selfCompleted = YES;
- sendCompletedIfNecessary();
- }
- }];
- RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
- @synchronized (selfValues) {
- [otherValues addObject:x ?: RACTupleNil.tupleNil];
- sendNext();
- }
- } error:^(NSError *error) {
- [subscriber sendError:error];
- } completed:^{
- @synchronized (selfValues) {
- otherCompleted = YES;
- sendCompletedIfNecessary();
- }
- }];
- return [RACDisposable disposableWithBlock:^{
- [selfDisposable dispose];
- [otherDisposable dispose];
- }];
- }] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
- }
- @end
- @implementation RACSignal (Subscription)
- - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
- NSCAssert(NO, @"This method must be overridden by subclasses");
- return nil;
- }
- - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
- NSCParameterAssert(nextBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock {
- NSCParameterAssert(nextBlock != NULL);
- NSCParameterAssert(completedBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:completedBlock];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
- NSCParameterAssert(nextBlock != NULL);
- NSCParameterAssert(errorBlock != NULL);
- NSCParameterAssert(completedBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeError:(void (^)(NSError *error))errorBlock {
- NSCParameterAssert(errorBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:errorBlock completed:NULL];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeCompleted:(void (^)(void))completedBlock {
- NSCParameterAssert(completedBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:NULL completed:completedBlock];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock {
- NSCParameterAssert(nextBlock != NULL);
- NSCParameterAssert(errorBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:NULL];
- return [self subscribe:o];
- }
- - (RACDisposable *)subscribeError:(void (^)(NSError *))errorBlock completed:(void (^)(void))completedBlock {
- NSCParameterAssert(completedBlock != NULL);
- NSCParameterAssert(errorBlock != NULL);
-
- RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:errorBlock completed:completedBlock];
- return [self subscribe:o];
- }
- @end
- @implementation RACSignal (Debugging)
- - (RACSignal *)logAll {
- return [[[self logNext] logError] logCompleted];
- }
- - (RACSignal *)logNext {
- return [[self doNext:^(id x) {
- NSLog(@"%@ next: %@", self, x);
- }] setNameWithFormat:@"%@", self.name];
- }
- - (RACSignal *)logError {
- return [[self doError:^(NSError *error) {
- NSLog(@"%@ error: %@", self, error);
- }] setNameWithFormat:@"%@", self.name];
- }
- - (RACSignal *)logCompleted {
- return [[self doCompleted:^{
- NSLog(@"%@ completed", self);
- }] setNameWithFormat:@"%@", self.name];
- }
- @end
- @implementation RACSignal (Testing)
- static const NSTimeInterval RACSignalAsynchronousWaitTimeout = 10;
- - (id)asynchronousFirstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
- return [self asynchronousFirstOrDefault:defaultValue success:success error:error timeout:RACSignalAsynchronousWaitTimeout];
- }
- - (id)asynchronousFirstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error timeout:(NSTimeInterval)timeout {
- NSCAssert([NSThread isMainThread], @"%s should only be used from the main thread", __func__);
- __block id result = defaultValue;
- __block BOOL done = NO;
- // Ensures that we don't pass values across thread boundaries by reference.
- __block NSError *localError;
- __block BOOL localSuccess = YES;
- [[[[self
- take:1]
- timeout:timeout onScheduler:[RACScheduler scheduler]]
- deliverOn:RACScheduler.mainThreadScheduler]
- subscribeNext:^(id x) {
- result = x;
- done = YES;
- } error:^(NSError *e) {
- if (!done) {
- localSuccess = NO;
- localError = e;
- done = YES;
- }
- } completed:^{
- done = YES;
- }];
-
- do {
- [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:0.1]];
- } while (!done);
- if (success != NULL) *success = localSuccess;
- if (error != NULL) *error = localError;
- return result;
- }
- - (BOOL)asynchronouslyWaitUntilCompleted:(NSError **)error timeout:(NSTimeInterval)timeout {
- BOOL success = NO;
- [[self ignoreValues] asynchronousFirstOrDefault:nil success:&success error:error timeout:timeout];
- return success;
- }
- - (BOOL)asynchronouslyWaitUntilCompleted:(NSError **)error {
- return [self asynchronouslyWaitUntilCompleted:error timeout:RACSignalAsynchronousWaitTimeout];
- }
- @end
|