transaction.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*************************************************************************
  2. *
  3. * Copyright 2016 Realm Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. **************************************************************************/
  18. #ifndef REALM_TRANSACTION_HPP
  19. #define REALM_TRANSACTION_HPP
  20. #include <realm/db.hpp>
  21. namespace realm {
  22. class Transaction : public Group {
  23. public:
  24. Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage);
  25. // convenience, so you don't need to carry a reference to the DB around
  26. ~Transaction();
  27. DB::version_type get_version() const noexcept
  28. {
  29. return m_read_lock.m_version;
  30. }
  31. DB::version_type get_version_of_latest_snapshot()
  32. {
  33. return db->get_version_of_latest_snapshot();
  34. }
  35. /// Get a version id which may be used to request a different transaction locked to specific version.
  36. DB::VersionID get_version_of_current_transaction() const noexcept
  37. {
  38. return VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
  39. }
  40. void close() REQUIRES(!m_async_mutex);
  41. bool is_attached()
  42. {
  43. return m_transact_stage != DB::transact_Ready && db->is_attached();
  44. }
  45. /// Get the approximate size of the data that would be written to the file if
  46. /// a commit were done at this point. The reported size will always be bigger
  47. /// than what will eventually be needed as we reserve a bit more memory than
  48. /// what will be needed.
  49. size_t get_commit_size() const;
  50. DB::version_type commit() REQUIRES(!m_async_mutex);
  51. void rollback() REQUIRES(!m_async_mutex);
  52. void end_read() REQUIRES(!m_async_mutex);
  53. template <class O>
  54. void parse_history(O& observer, DB::version_type begin, DB::version_type end);
  55. // Live transactions state changes, often taking an observer functor:
  56. VersionID commit_and_continue_as_read(bool commit_to_disk = true) REQUIRES(!m_async_mutex);
  57. VersionID commit_and_continue_writing();
  58. template <class O>
  59. void rollback_and_continue_as_read(O& observer) REQUIRES(!m_async_mutex);
  60. void rollback_and_continue_as_read() REQUIRES(!m_async_mutex);
  61. template <class O>
  62. void advance_read(O* observer, VersionID target_version = VersionID());
  63. void advance_read(VersionID target_version = VersionID())
  64. {
  65. _impl::NullInstructionObserver* o = nullptr;
  66. advance_read(o, target_version);
  67. }
  68. template <class O>
  69. bool promote_to_write(O* observer, bool nonblocking = false) REQUIRES(!m_async_mutex);
  70. bool promote_to_write(bool nonblocking = false) REQUIRES(!m_async_mutex)
  71. {
  72. _impl::NullInstructionObserver* o = nullptr;
  73. return promote_to_write(o, nonblocking);
  74. }
  75. TransactionRef freeze();
  76. // Frozen transactions are created by freeze() or DB::start_frozen()
  77. bool is_frozen() const noexcept override
  78. {
  79. return m_transact_stage == DB::transact_Frozen;
  80. }
  81. bool is_async() noexcept REQUIRES(!m_async_mutex)
  82. {
  83. util::CheckedLockGuard lck(m_async_mutex);
  84. return m_async_stage != AsyncState::Idle;
  85. }
  86. TransactionRef duplicate();
  87. void copy_to(TransactionRef dest) const;
  88. _impl::History* get_history() const;
  89. // direct handover of accessor instances
  90. Obj import_copy_of(const Obj& original);
  91. TableRef import_copy_of(const ConstTableRef original);
  92. LnkLst import_copy_of(const LnkLst& original);
  93. LnkSet import_copy_of(const LnkSet& original);
  94. LstBasePtr import_copy_of(const LstBase& original);
  95. SetBasePtr import_copy_of(const SetBase& original);
  96. CollectionBasePtr import_copy_of(const CollectionBase& original);
  97. LnkLstPtr import_copy_of(const LnkLstPtr& original);
  98. LnkSetPtr import_copy_of(const LnkSetPtr& original);
  99. LinkCollectionPtr import_copy_of(const LinkCollectionPtr& original);
  100. // handover of the heavier Query and TableView
  101. std::unique_ptr<Query> import_copy_of(Query&, PayloadPolicy);
  102. std::unique_ptr<TableView> import_copy_of(TableView&, PayloadPolicy);
  103. /// Get the current transaction type
  104. DB::TransactStage get_transact_stage() const noexcept
  105. {
  106. return m_transact_stage;
  107. }
  108. void upgrade_file_format(int target_file_format_version);
  109. /// Task oriented/async interface for continuous transactions.
  110. // true if this transaction already holds the write mutex
  111. bool holds_write_mutex() const noexcept REQUIRES(!m_async_mutex)
  112. {
  113. util::CheckedLockGuard lck(m_async_mutex);
  114. return m_async_stage == AsyncState::HasLock || m_async_stage == AsyncState::HasCommits;
  115. }
  116. // Convert an existing write transaction to an async write transaction
  117. void promote_to_async() REQUIRES(!m_async_mutex);
  118. // request full synchronization to stable storage for all writes done since
  119. // last sync - or just release write mutex.
  120. // The write mutex is released after full synchronization.
  121. void async_complete_writes(util::UniqueFunction<void()> when_synchronized = nullptr) REQUIRES(!m_async_mutex);
  122. // Complete all pending async work and return once the async stage is Idle.
  123. // If currently in an async write transaction that transaction is cancelled,
  124. // and any async writes which were committed are synchronized.
  125. void prepare_for_close() REQUIRES(!m_async_mutex);
  126. // true if sync to disk has been requested
  127. bool is_synchronizing() noexcept REQUIRES(!m_async_mutex)
  128. {
  129. util::CheckedLockGuard lck(m_async_mutex);
  130. return m_async_stage == AsyncState::Syncing;
  131. }
  132. std::exception_ptr get_commit_exception() noexcept REQUIRES(!m_async_mutex)
  133. {
  134. util::CheckedLockGuard lck(m_async_mutex);
  135. auto err = std::move(m_commit_exception);
  136. m_commit_exception = nullptr;
  137. return err;
  138. }
  139. bool has_unsynced_commits() noexcept REQUIRES(!m_async_mutex)
  140. {
  141. util::CheckedLockGuard lck(m_async_mutex);
  142. return static_cast<bool>(m_oldest_version_not_persisted);
  143. }
  144. private:
  145. enum class AsyncState { Idle, Requesting, HasLock, HasCommits, Syncing };
  146. DBRef get_db() const
  147. {
  148. return db;
  149. }
  150. Replication* const* get_repl() const final
  151. {
  152. return db->get_repl();
  153. }
  154. template <class O>
  155. bool internal_advance_read(O* observer, VersionID target_version, _impl::History&, bool) REQUIRES(!db->m_mutex);
  156. void set_transact_stage(DB::TransactStage stage) noexcept;
  157. void do_end_read() noexcept REQUIRES(!m_async_mutex);
  158. void initialize_replication();
  159. void replicate(Transaction* dest, Replication& repl) const;
  160. void complete_async_commit();
  161. void acquire_write_lock() REQUIRES(!m_async_mutex);
  162. void cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit);
  163. void close_read_with_lock() REQUIRES(!m_async_mutex, db->m_mutex);
  164. DBRef db;
  165. mutable std::unique_ptr<_impl::History> m_history_read;
  166. mutable _impl::History* m_history = nullptr;
  167. DB::ReadLockInfo m_read_lock;
  168. util::Optional<DB::ReadLockInfo> m_oldest_version_not_persisted;
  169. std::exception_ptr m_commit_exception GUARDED_BY(m_async_mutex);
  170. bool m_async_commit_has_failed = false;
  171. // Mutex is protecting access to members just below
  172. util::CheckedMutex m_async_mutex;
  173. std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
  174. AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
  175. std::chrono::steady_clock::time_point m_request_time_point;
  176. bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
  177. bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;
  178. DB::TransactStage m_transact_stage = DB::transact_Ready;
  179. friend class DB;
  180. friend class DisableReplication;
  181. };
  182. /*
  183. * classes providing backward Compatibility with the older
  184. * ReadTransaction and WriteTransaction types.
  185. */
  186. class ReadTransaction {
  187. public:
  188. ReadTransaction(DBRef sg)
  189. : trans(sg->start_read())
  190. {
  191. }
  192. ~ReadTransaction() noexcept {}
  193. operator Transaction&()
  194. {
  195. return *trans;
  196. }
  197. bool has_table(StringData name) const noexcept
  198. {
  199. return trans->has_table(name);
  200. }
  201. ConstTableRef get_table(TableKey key) const
  202. {
  203. return trans->get_table(key); // Throws
  204. }
  205. ConstTableRef get_table(StringData name) const
  206. {
  207. return trans->get_table(name); // Throws
  208. }
  209. const Group& get_group() const noexcept
  210. {
  211. return *trans.get();
  212. }
  213. /// Get the version of the snapshot to which this read transaction is bound.
  214. DB::version_type get_version() const noexcept
  215. {
  216. return trans->get_version();
  217. }
  218. private:
  219. TransactionRef trans;
  220. };
  221. class WriteTransaction {
  222. public:
  223. WriteTransaction(DBRef sg)
  224. : trans(sg->start_write())
  225. {
  226. }
  227. ~WriteTransaction() noexcept {}
  228. operator Transaction&()
  229. {
  230. return *trans;
  231. }
  232. bool has_table(StringData name) const noexcept
  233. {
  234. return trans->has_table(name);
  235. }
  236. TableRef get_table(TableKey key) const
  237. {
  238. return trans->get_table(key); // Throws
  239. }
  240. TableRef get_table(StringData name) const
  241. {
  242. return trans->get_table(name); // Throws
  243. }
  244. TableRef add_table(StringData name, Table::Type table_type = Table::Type::TopLevel) const
  245. {
  246. return trans->add_table(name, table_type); // Throws
  247. }
  248. TableRef get_or_add_table(StringData name, Table::Type table_type = Table::Type::TopLevel,
  249. bool* was_added = nullptr) const
  250. {
  251. return trans->get_or_add_table(name, table_type, was_added); // Throws
  252. }
  253. Group& get_group() const noexcept
  254. {
  255. return *trans.get();
  256. }
  257. /// Get the version of the snapshot on which this write transaction is
  258. /// based.
  259. DB::version_type get_version() const noexcept
  260. {
  261. return trans->get_version();
  262. }
  263. DB::version_type commit()
  264. {
  265. return trans->commit();
  266. }
  267. void rollback() noexcept
  268. {
  269. trans->rollback();
  270. }
  271. private:
  272. TransactionRef trans;
  273. };
  274. // Implementation:
  275. template <class O>
  276. inline void Transaction::advance_read(O* observer, VersionID version_id)
  277. {
  278. if (m_transact_stage != DB::transact_Reading)
  279. throw WrongTransactionState("Not a read transaction");
  280. // It is an error if the new version precedes the currently bound one.
  281. if (version_id.version < m_read_lock.m_version)
  282. throw IllegalOperation("Requesting an older version when advancing");
  283. auto hist = get_history(); // Throws
  284. if (!hist)
  285. throw IllegalOperation("No transaction log when advancing");
  286. auto old_version = m_read_lock.m_version;
  287. internal_advance_read(observer, version_id, *hist, false); // Throws
  288. if (db->m_logger) {
  289. db->m_logger->log(util::Logger::Level::trace, "Advance read: %1 -> %2", old_version, m_read_lock.m_version);
  290. }
  291. }
  292. template <class O>
  293. inline bool Transaction::promote_to_write(O* observer, bool nonblocking)
  294. {
  295. if (m_transact_stage != DB::transact_Reading)
  296. throw WrongTransactionState("Not a read transaction");
  297. if (!holds_write_mutex()) {
  298. if (nonblocking) {
  299. bool succes = db->do_try_begin_write();
  300. if (!succes) {
  301. return false;
  302. }
  303. }
  304. else {
  305. auto t1 = std::chrono::steady_clock::now();
  306. acquire_write_lock(); // Throws
  307. if (db->m_logger) {
  308. auto t2 = std::chrono::steady_clock::now();
  309. db->m_logger->log(util::Logger::Level::trace, "Acquired write lock in %1 us",
  310. std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
  311. }
  312. }
  313. }
  314. auto old_version = m_read_lock.m_version;
  315. try {
  316. Replication* repl = db->get_replication();
  317. if (!repl)
  318. throw IllegalOperation("No transaction log when promoting to write");
  319. VersionID version = VersionID(); // Latest
  320. m_history = repl->_get_history_write();
  321. bool history_updated = internal_advance_read(observer, version, *m_history, true); // Throws
  322. REALM_ASSERT(repl); // Presence of `repl` follows from the presence of `hist`
  323. DB::version_type current_version = m_read_lock.m_version;
  324. m_alloc.init_mapping_management(current_version);
  325. repl->initiate_transact(*this, current_version, history_updated); // Throws
  326. // If the group has no top array (top_ref == 0), create a new node
  327. // structure for an empty group now, to be ready for modifications. See
  328. // also Group::attach_shared().
  329. if (!m_top.is_attached())
  330. create_empty_group(); // Throws
  331. }
  332. catch (...) {
  333. if (!holds_write_mutex())
  334. db->end_write_on_correct_thread();
  335. m_history = nullptr;
  336. throw;
  337. }
  338. if (db->m_logger) {
  339. db->m_logger->log(util::Logger::Level::trace, "Promote to write: %1 -> %2", old_version,
  340. m_read_lock.m_version);
  341. }
  342. set_transact_stage(DB::transact_Writing);
  343. return true;
  344. }
  345. template <class O>
  346. inline void Transaction::rollback_and_continue_as_read(O& observer)
  347. {
  348. if (m_transact_stage != DB::transact_Writing)
  349. throw WrongTransactionState("Not a write transaction");
  350. Replication* repl = db->get_replication();
  351. if (!repl)
  352. throw IllegalOperation("No transaction log when rolling back");
  353. BinaryData uncommitted_changes = repl->get_uncommitted_changes();
  354. if (uncommitted_changes.size()) {
  355. util::SimpleInputStream in(uncommitted_changes);
  356. _impl::parse_transact_log(in, observer); // Throws
  357. }
  358. rollback_and_continue_as_read();
  359. }
  360. inline void Transaction::rollback_and_continue_as_read()
  361. {
  362. if (m_transact_stage != DB::transact_Writing)
  363. throw WrongTransactionState("Not a write transaction");
  364. Replication* repl = db->get_replication();
  365. if (!repl)
  366. throw IllegalOperation("No transaction log when rolling back");
  367. // Mark all managed space (beyond the attached file) as free.
  368. db->reset_free_space_tracking(); // Throws
  369. m_read_lock.check();
  370. ref_type top_ref = m_read_lock.m_top_ref;
  371. size_t file_size = m_read_lock.m_file_size;
  372. // since we had the write lock, we already have the latest encrypted pages in memory
  373. m_alloc.update_reader_view(file_size); // Throws
  374. update_allocator_wrappers(false);
  375. advance_transact(top_ref, nullptr, false); // Throws
  376. if (!holds_write_mutex())
  377. db->end_write_on_correct_thread();
  378. if (db->m_logger) {
  379. db->m_logger->log(util::Logger::Level::trace, "Rollback");
  380. }
  381. m_history = nullptr;
  382. set_transact_stage(DB::transact_Reading);
  383. }
  384. template <class O>
  385. inline bool Transaction::internal_advance_read(O* observer, VersionID version_id, _impl::History& hist, bool writable)
  386. {
  387. DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, version_id); // Throws
  388. REALM_ASSERT(new_read_lock.m_version >= m_read_lock.m_version);
  389. if (new_read_lock.m_version == m_read_lock.m_version) {
  390. db->release_read_lock(new_read_lock);
  391. // _impl::History::update_early_from_top_ref() was not called
  392. // update allocator wrappers merely to update write protection
  393. update_allocator_wrappers(writable);
  394. return false;
  395. }
  396. DB::version_type old_version = m_read_lock.m_version;
  397. DB::ReadLockGuard g(*db, new_read_lock);
  398. DB::version_type new_version = new_read_lock.m_version;
  399. size_t new_file_size = new_read_lock.m_file_size;
  400. ref_type new_top_ref = new_read_lock.m_top_ref;
  401. // Synchronize readers view of the file
  402. SlabAlloc& alloc = m_alloc;
  403. alloc.update_reader_view(new_file_size);
  404. update_allocator_wrappers(writable);
  405. using gf = _impl::GroupFriend;
  406. ref_type hist_ref = gf::get_history_ref(alloc, new_top_ref);
  407. hist.update_from_ref_and_version(hist_ref, new_version);
  408. if (observer) {
  409. // This has to happen in the context of the originally bound snapshot
  410. // and while the read transaction is still in a fully functional state.
  411. _impl::ChangesetInputStream in(hist, old_version, new_version);
  412. _impl::parse_transact_log(in, *observer); // Throws
  413. }
  414. // The old read lock must be retained for as long as the change history is
  415. // accessed (until Group::advance_transact() returns). This ensures that the
  416. // oldest needed changeset remains in the history, even when the history is
  417. // implemented as a separate unversioned entity outside the Realm (i.e., the
  418. // old implementation and ShortCircuitHistory in
  419. // test_lang_Bind_helper.cpp). On the other hand, if it had been the case,
  420. // that the history was always implemented as a versioned entity, that was
  421. // part of the Realm state, then it would not have been necessary to retain
  422. // the old read lock beyond this point.
  423. _impl::ChangesetInputStream in(hist, old_version, new_version);
  424. advance_transact(new_top_ref, &in, writable); // Throws
  425. g.release();
  426. db->release_read_lock(m_read_lock);
  427. m_read_lock = new_read_lock;
  428. return true; // _impl::History::update_early_from_top_ref() was called
  429. }
  430. template <class O>
  431. void Transaction::parse_history(O& observer, DB::version_type begin, DB::version_type end)
  432. {
  433. REALM_ASSERT(m_transact_stage != DB::transact_Ready);
  434. REALM_ASSERT(end <= m_read_lock.m_version);
  435. auto hist = get_history(); // Throws
  436. REALM_ASSERT(hist);
  437. hist->ensure_updated(m_read_lock.m_version);
  438. _impl::ChangesetInputStream in(*hist, begin, end);
  439. _impl::parse_transact_log(in, observer); // Throws
  440. }
  441. } // namespace realm
  442. #endif /* REALM_TRANSACTION_HPP */