RLMAsyncTask.mm 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. ////////////////////////////////////////////////////////////////////////////
  2. //
  3. // Copyright 2023 Realm Inc.
  4. //
  5. // Licensed under the Apache License, Version 2.0 (the "License");
  6. // you may not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. //
  17. ////////////////////////////////////////////////////////////////////////////
  18. #import "RLMAsyncTask_Private.h"
  19. #import "RLMError_Private.hpp"
  20. #import "RLMRealm_Private.hpp"
  21. #import "RLMRealmConfiguration_Private.hpp"
  22. #import "RLMScheduler.h"
  23. #import "RLMSyncSubscription_Private.h"
  24. #import "RLMUtil.hpp"
  25. #import <realm/exceptions.hpp>
  26. #import <realm/object-store/sync/async_open_task.hpp>
  27. #import <realm/object-store/sync/sync_session.hpp>
  28. #import <realm/object-store/thread_safe_reference.hpp>
  29. static dispatch_queue_t s_async_open_queue = dispatch_queue_create("io.realm.asyncOpenDispatchQueue",
  30. DISPATCH_QUEUE_CONCURRENT);
  31. void RLMSetAsyncOpenQueue(dispatch_queue_t queue) {
  32. s_async_open_queue = queue;
  33. }
  34. static NSError *s_canceledError = [NSError errorWithDomain:NSPOSIXErrorDomain
  35. code:ECANCELED userInfo:@{
  36. NSLocalizedDescriptionKey: @"Operation canceled"
  37. }];
  38. __attribute__((objc_direct_members))
  39. @implementation RLMAsyncOpenTask {
  40. RLMUnfairMutex _mutex;
  41. std::shared_ptr<realm::AsyncOpenTask> _task;
  42. std::vector<RLMProgressNotificationBlock> _progressBlocks;
  43. bool _cancel;
  44. RLMRealmConfiguration *_configuration;
  45. RLMScheduler *_scheduler;
  46. bool _waitForDownloadCompletion;
  47. void (^_completion)(NSError *);
  48. RLMRealm *_backgroundRealm;
  49. }
  50. - (void)addProgressNotificationOnQueue:(dispatch_queue_t)queue block:(RLMProgressNotificationBlock)block {
  51. auto wrappedBlock = ^(NSUInteger transferred_bytes, NSUInteger transferrable_bytes) {
  52. dispatch_async(queue, ^{
  53. @autoreleasepool {
  54. block(transferred_bytes, transferrable_bytes);
  55. }
  56. });
  57. };
  58. std::lock_guard lock(_mutex);
  59. if (_task) {
  60. _task->register_download_progress_notifier(wrappedBlock);
  61. }
  62. else if (!_cancel) {
  63. _progressBlocks.push_back(wrappedBlock);
  64. }
  65. }
  66. - (void)addProgressNotificationBlock:(RLMProgressNotificationBlock)block {
  67. [self addProgressNotificationOnQueue:dispatch_get_main_queue() block:block];
  68. }
  69. - (void)cancel {
  70. std::lock_guard lock(_mutex);
  71. _cancel = true;
  72. _progressBlocks.clear();
  73. if (_task) {
  74. _task->cancel();
  75. // Cancelling realm::AsyncOpenTask results in it never calling our callback,
  76. // so if we're currently in that we have to just send the cancellation
  77. // error immediately. In all other cases, though, we want to wait until
  78. // we've actually cancelled and will send the error the next time we
  79. // check for cancellation
  80. [self reportError:s_canceledError];
  81. }
  82. }
  83. - (instancetype)initWithConfiguration:(RLMRealmConfiguration *)configuration
  84. confinedTo:(RLMScheduler *)scheduler
  85. download:(bool)waitForDownloadCompletion {
  86. if (!(self = [super init])) {
  87. return self;
  88. }
  89. // Copying the configuration here as the user could potentially modify
  90. // the config after calling async open
  91. _configuration = configuration.copy;
  92. _scheduler = scheduler;
  93. _waitForDownloadCompletion = waitForDownloadCompletion;
  94. return self;
  95. }
  96. - (instancetype)initWithConfiguration:(RLMRealmConfiguration *)configuration
  97. confinedTo:(RLMScheduler *)confinement
  98. download:(bool)waitForDownloadCompletion
  99. completion:(RLMAsyncOpenRealmCallback)completion {
  100. self = [self initWithConfiguration:configuration confinedTo:confinement
  101. download:waitForDownloadCompletion];
  102. [self waitForOpen:completion];
  103. return self;
  104. }
  105. - (void)waitForOpen:(RLMAsyncOpenRealmCallback)completion {
  106. __weak auto weakSelf = self;
  107. [self waitWithCompletion:^(NSError *error) {
  108. RLMRealm *realm;
  109. if (auto self = weakSelf) {
  110. realm = self->_localRealm;
  111. self->_localRealm = nil;
  112. }
  113. completion(realm, error);
  114. }];
  115. }
  116. - (void)waitWithCompletion:(void (^)(NSError *))completion {
  117. std::lock_guard lock(_mutex);
  118. _completion = completion;
  119. if (_cancel) {
  120. return [self reportError:s_canceledError];
  121. }
  122. // get_synchronized_realm() synchronously opens the DB and performs file-format
  123. // upgrades, so we want to dispatch to the background before invoking it.
  124. dispatch_async(s_async_open_queue, ^{
  125. @autoreleasepool {
  126. [self startAsyncOpen];
  127. }
  128. });
  129. }
  130. // The full async open flow is:
  131. // 1. Dispatch to a background queue
  132. // 2. Use Realm::get_synchronized_realm() to create the Realm file, run
  133. // migrations and compactions, and download the latest data from the server.
  134. // 3. Dispatch back to queue
  135. // 4. Initialize a RLMRealm in the background queue to perform the SDK
  136. // initialization (e.g. creating managed accessor classes).
  137. // 5. Wait for initial flexible sync subscriptions to complete
  138. // 6. Dispatch to the final scheduler
  139. // 7. Open the final RLMRealm, release the previously opened background one,
  140. // and then invoke the completion callback.
  141. //
  142. // Steps 2 and 5 are skipped for non-sync or non-flexible sync Realms, in which
  143. // case step 4 will handle doing migrations and compactions etc. in the background.
  144. //
  145. // At any point `cancel` can be called from another thread. Cancellation is mostly
  146. // cooperative rather than preemptive: we check at each step if we've been cancelled,
  147. // and if so call the completion with the cancellation error rather than
  148. // proceeding. Downloading the data from the server is the one exception to this.
  149. // Ideally waiting for flexible sync subscriptions would also be preempted.
  150. - (void)startAsyncOpen {
  151. std::unique_lock lock(_mutex);
  152. if ([self checkCancellation]) {
  153. return;
  154. }
  155. if (_waitForDownloadCompletion && _configuration.configRef.sync_config) {
  156. #if REALM_ENABLE_SYNC
  157. _task = realm::Realm::get_synchronized_realm(_configuration.config);
  158. for (auto& block : _progressBlocks) {
  159. _task->register_download_progress_notifier(block);
  160. }
  161. _progressBlocks.clear();
  162. _task->start([=](realm::ThreadSafeReference ref, std::exception_ptr err) {
  163. std::lock_guard lock(_mutex);
  164. if ([self checkCancellation]) {
  165. return;
  166. }
  167. // Note that cancellation intentionally trumps reporting other kinds
  168. // of errors
  169. if (err) {
  170. return [self reportException:err];
  171. }
  172. // Dispatch blocks can only capture copyable types, so we need to
  173. // resolve the TSR to a shared_ptr<Realm>
  174. auto realm = ref.resolve<std::shared_ptr<realm::Realm>>(nullptr);
  175. // We're now running on the sync worker thread, so hop back
  176. // to a more appropriate queue for the next stage of init.
  177. dispatch_async(s_async_open_queue, ^{
  178. @autoreleasepool {
  179. [self downloadCompleted];
  180. // Capture the Realm to keep the RealmCoordinator alive
  181. // so that we don't have to reopen it.
  182. static_cast<void>(realm);
  183. }
  184. });
  185. });
  186. #else
  187. @throw RLMException(@"Realm was not built with sync enabled");
  188. #endif
  189. }
  190. else {
  191. // We're not downloading first, so just proceed directly to the next step.
  192. lock.unlock();
  193. [self downloadCompleted];
  194. }
  195. }
  196. - (void)downloadCompleted {
  197. std::unique_lock lock(_mutex);
  198. _task.reset();
  199. if ([self checkCancellation]) {
  200. return;
  201. }
  202. NSError *error;
  203. // We've now downloaded all data (if applicable) and done the object
  204. // store initialization, and are back on our background queue. Next we
  205. // want to do our own initialization while still in the background
  206. @autoreleasepool {
  207. // Holding onto the Realm so that opening the final Realm on the target
  208. // scheduler can hit the fast path
  209. _backgroundRealm = [RLMRealm realmWithConfiguration:_configuration
  210. confinedTo:RLMScheduler.currentRunLoop error:&error];
  211. if (error) {
  212. return [self reportError:error];
  213. }
  214. }
  215. #if REALM_ENABLE_SYNC
  216. // If we're opening a flexible sync Realm, we now want to wait for the
  217. // initial subscriptions to be ready
  218. if (_waitForDownloadCompletion && _backgroundRealm.isFlexibleSync) {
  219. auto subscriptions = _backgroundRealm.subscriptions;
  220. if (subscriptions.state == RLMSyncSubscriptionStatePending) {
  221. // FIXME: need cancellation for waiting for the subscription
  222. return [subscriptions waitForSynchronizationOnQueue:nil
  223. completionBlock:^(NSError *error) {
  224. if (error) {
  225. std::lock_guard lock(_mutex);
  226. return [self reportError:error];
  227. }
  228. [self asyncOpenCompleted];
  229. }];
  230. }
  231. }
  232. #endif
  233. lock.unlock();
  234. [self asyncOpenCompleted];
  235. }
  236. - (void)asyncOpenCompleted {
  237. std::lock_guard lock(_mutex);
  238. if (![self checkCancellation]) {
  239. [_scheduler invoke:^{
  240. [self openFinalRealmAndCallCompletion];
  241. }];
  242. }
  243. }
  244. - (void)openFinalRealmAndCallCompletion {
  245. std::unique_lock lock(_mutex);
  246. @autoreleasepool {
  247. if ([self checkCancellation]) {
  248. return;
  249. }
  250. if (!_completion) {
  251. return;
  252. }
  253. NSError *error;
  254. auto completion = _completion;
  255. // It should not actually be possible for this to fail
  256. _localRealm = [RLMRealm realmWithConfiguration:_configuration
  257. confinedTo:_scheduler
  258. error:&error];
  259. [self releaseResources];
  260. lock.unlock();
  261. completion(error);
  262. }
  263. }
  264. - (bool)checkCancellation {
  265. if (_cancel && _completion) {
  266. [self reportError:s_canceledError];
  267. }
  268. return _cancel;
  269. }
  270. - (void)reportException:(std::exception_ptr const&)err {
  271. try {
  272. std::rethrow_exception(err);
  273. }
  274. catch (realm::Exception const& e) {
  275. if (e.code() == realm::ErrorCodes::OperationAborted) {
  276. return [self reportError:s_canceledError];
  277. }
  278. [self reportError:makeError(e)];
  279. }
  280. catch (...) {
  281. NSError *error;
  282. RLMRealmTranslateException(&error);
  283. [self reportError:error];
  284. }
  285. }
  286. - (void)reportError:(NSError *)error {
  287. if (!_completion || !_scheduler) {
  288. return;
  289. }
  290. auto completion = _completion;
  291. auto scheduler = _scheduler;
  292. [self releaseResources];
  293. [scheduler invoke:^{
  294. completion(error);
  295. }];
  296. }
  297. - (void)releaseResources {
  298. _backgroundRealm = nil;
  299. _configuration = nil;
  300. _scheduler = nil;
  301. _completion = nil;
  302. }
  303. @end
  304. @implementation RLMAsyncDownloadTask {
  305. RLMUnfairMutex _mutex;
  306. std::shared_ptr<realm::SyncSession> _session;
  307. bool _started;
  308. }
  309. - (instancetype)initWithRealm:(RLMRealm *)realm {
  310. if (self = [super init]) {
  311. _session = realm->_realm->sync_session();
  312. }
  313. return self;
  314. }
  315. - (void)waitWithCompletion:(void (^)(NSError *_Nullable))completion {
  316. std::unique_lock lock(_mutex);
  317. if (!_session) {
  318. lock.unlock();
  319. return completion(nil);
  320. }
  321. _started = true;
  322. _session->revive_if_needed();
  323. _session->wait_for_download_completion([=](realm::Status status) {
  324. completion(makeError(status));
  325. });
  326. }
  327. - (void)cancel {
  328. std::unique_lock lock(_mutex);
  329. if (_started) {
  330. _session->force_close();
  331. }
  332. _session = nullptr;
  333. }
  334. @end
  335. __attribute__((objc_direct_members))
  336. @implementation RLMAsyncRefreshTask {
  337. RLMUnfairMutex _mutex;
  338. void (^_completion)(bool);
  339. bool _complete;
  340. bool _didRefresh;
  341. }
  342. - (void)complete:(bool)didRefresh {
  343. void (^completion)(bool);
  344. {
  345. std::lock_guard lock(_mutex);
  346. std::swap(completion, _completion);
  347. _complete = true;
  348. // If we're both cancelled and did complete a refresh then continue
  349. // to report true
  350. _didRefresh = _didRefresh || didRefresh;
  351. }
  352. if (completion) {
  353. completion(didRefresh);
  354. }
  355. }
  356. - (void)wait:(void (^)(bool))completion {
  357. bool didRefresh;
  358. {
  359. std::lock_guard lock(_mutex);
  360. if (!_complete) {
  361. _completion = completion;
  362. return;
  363. }
  364. didRefresh = _didRefresh;
  365. }
  366. completion(didRefresh);
  367. }
  368. + (RLMAsyncRefreshTask *)completedRefresh {
  369. static RLMAsyncRefreshTask *shared = [] {
  370. auto refresh = [[RLMAsyncRefreshTask alloc] init];
  371. refresh->_complete = true;
  372. refresh->_didRefresh = true;
  373. return refresh;
  374. }();
  375. return shared;
  376. }
  377. @end
  378. @implementation RLMAsyncWriteTask {
  379. // Mutex guards _realm and _completion
  380. RLMUnfairMutex _mutex;
  381. // _realm is non-nil only while waiting for an async write to begin. It is
  382. // set to `nil` when it either completes or is cancelled.
  383. RLMRealm *_realm;
  384. dispatch_block_t _completion;
  385. RLMAsyncTransactionId _id;
  386. }
  387. // No locking needed for these two as they have to be called before the
  388. // cancellation handler is set up
  389. - (instancetype)initWithRealm:(RLMRealm *)realm {
  390. if (self = [super init]) {
  391. _realm = realm;
  392. }
  393. return self;
  394. }
  395. - (void)setTransactionId:(RLMAsyncTransactionId)transactionID {
  396. _id = transactionID;
  397. }
  398. - (void)complete:(bool)cancel {
  399. // The swap-under-lock pattern is used to avoid invoking the callback with
  400. // a lock held
  401. dispatch_block_t completion;
  402. {
  403. std::lock_guard lock(_mutex);
  404. std::swap(completion, _completion);
  405. if (cancel) {
  406. // This is a no-op if cancellation is coming after the wait completed
  407. [_realm cancelAsyncTransaction:_id];
  408. }
  409. _realm = nil;
  410. }
  411. if (completion) {
  412. completion();
  413. }
  414. }
  415. - (void)wait:(void (^)())completion {
  416. {
  417. std::lock_guard lock(_mutex);
  418. // `_realm` being non-nil means it's neither completed nor been cancelled
  419. if (_realm) {
  420. _completion = completion;
  421. return;
  422. }
  423. }
  424. // It has either been completed or cancelled, so call the callback immediately
  425. completion();
  426. }
  427. @end