diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-08-01 23:29:27 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-08-01 23:29:27 +0000 |
commit | 861873b97c66a53a0aa7a7608c93d18bd9eef485 (patch) | |
tree | 0a5cc22a5723c07cc75c6f6a922a6add394d204c | |
parent | 5c32ca386ab2035ccad860e11976e0aa8ff2e133 (diff) | |
parent | a70049022b53abca4e1b94a0fa0da140436f31ee (diff) | |
download | core-861873b97c66a53a0aa7a7608c93d18bd9eef485.tar.gz |
Snap for 8892748 from a70049022b53abca4e1b94a0fa0da140436f31ee to tm-qpr1-release
Change-Id: Id63eea5467a6822268479941e79a705088d8e71c
8 files changed, 142 insertions, 19 deletions
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp index 019b64a44..870801ef1 100644 --- a/fs_mgr/libsnapshot/snapshot.cpp +++ b/fs_mgr/libsnapshot/snapshot.cpp @@ -2151,8 +2151,17 @@ bool SnapshotManager::ListSnapshots(LockedFile* lock, std::vector<std::string>* if (!suffix.empty() && !android::base::EndsWith(name, suffix)) { continue; } - snapshots->emplace_back(std::move(name)); + + // Insert system and product partition at the beginning so that + // during snapshot-merge, these partitions are merged first. + if (name == "system_a" || name == "system_b" || name == "product_a" || + name == "product_b") { + snapshots->insert(snapshots->begin(), std::move(name)); + } else { + snapshots->emplace_back(std::move(name)); + } } + return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index 692cb740c..718c13ce0 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -143,17 +143,18 @@ void SnapshotHandler::PrepareReadAhead() { NotifyRAForMergeReady(); } -void SnapshotHandler::CheckMergeCompletionStatus() { +bool SnapshotHandler::CheckMergeCompletionStatus() { if (!merge_initiated_) { SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: " << reader_->get_num_total_data_ops(); - return; + return false; } struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_); SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops << " Total-data-ops: " << reader_->get_num_total_data_ops(); + return true; } bool SnapshotHandler::ReadMetadata() { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index 83d40f635..c6805e5d9 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -18,6 +18,9 @@ #include <stdint.h> #include <stdlib.h> #include <sys/mman.h> +#include <sys/resource.h> +#include <sys/time.h> +#include <unistd.h> #include <condition_variable> #include <cstring> @@ -54,6 +57,8 @@ static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ); static constexpr int kNumWorkerThreads = 4; +static constexpr int kNiceValueForMergeThreads = -5; + #define SNAP_LOG(level) LOG(level) << misc_name_ << ": " #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": " @@ -274,7 +279,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> { const bool& IsAttached() const { return attached_; } void AttachControlDevice() { attached_ = true; } - void CheckMergeCompletionStatus(); + bool CheckMergeCompletionStatus(); bool CommitMerge(int num_merge_ops); void CloseFds() { cow_fd_ = {}; } @@ -305,6 +310,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> { // State transitions for merge void InitiateMerge(); + void MonitorMerge(); + void WakeupMonitorMergeThread(); void WaitForMergeComplete(); bool WaitForMergeBegin(); void NotifyRAForMergeReady(); @@ -333,6 +340,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> { void SetSocketPresent(bool socket) { is_socket_present_ = socket; } void SetIouringEnabled(bool io_uring_enabled) { is_io_uring_enabled_ = io_uring_enabled; } bool MergeInitiated() { return merge_initiated_; } + bool MergeMonitored() { return merge_monitored_; } double GetMergePercentage() { return merge_completion_percentage_; } // Merge Block State Transitions @@ -407,6 +415,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> { double merge_completion_percentage_; bool merge_initiated_ = false; + bool merge_monitored_ = false; bool attached_ = false; bool is_socket_present_; bool is_io_uring_enabled_ = false; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index c26a2cd5c..63f47d6f0 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -71,16 +71,16 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, } bool Worker::MergeReplaceZeroOps() { - // Flush every 8192 ops. Since all ops are independent and there is no + // Flush after merging 2MB. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number - // of ops merged in COW file for every 8192 ops. If there is a crash, - // we will end up replaying some of the COW ops which were already merged. - // That is ok. + // of ops merged in COW block device. If there is a crash, we will + // end up replaying some of the COW ops which were already merged. That is + // ok. // - // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but - // we need to make sure that we checkpoint; 8k ops seems optimal. In-case - // if there is a crash merge should always make forward progress. - int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32; + // Although increasing this greater than 2MB may help in improving merge + // times; however, on devices with low memory, this can be problematic + // when there are multiple merge threads in parallel. + int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2; int num_ops_merged = 0; SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; @@ -543,6 +543,10 @@ bool Worker::RunMergeThread() { return true; } + if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) { + SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid(); + } + SNAP_LOG(INFO) << "Merge starting.."; if (!Init()) { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp index fa2866f39..b9e4255b2 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp @@ -727,6 +727,10 @@ bool ReadAhead::RunThread() { InitializeIouring(); + if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) { + SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid(); + } + while (!RAIterDone()) { if (!ReadAheadIOStart()) { break; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp index 82b2b25dd..5d93f01a7 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -59,6 +59,14 @@ DaemonOps UserSnapshotServer::Resolveop(std::string& input) { return DaemonOps::INVALID; } +UserSnapshotServer::UserSnapshotServer() { + monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); + if (monitor_merge_event_fd_ == -1) { + PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd"; + } + terminating_ = false; +} + UserSnapshotServer::~UserSnapshotServer() { // Close any client sockets that were added via AcceptClient(). for (size_t i = 1; i < watched_fds_.size(); i++) { @@ -249,7 +257,7 @@ bool UserSnapshotServer::Receivemsg(android::base::borrowed_fd fd, const std::st return Sendmsg(fd, "fail"); } - if (!StartMerge(*iter)) { + if (!StartMerge(&lock, *iter)) { return Sendmsg(fd, "fail"); } @@ -298,7 +306,7 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha } handler->snapuserd()->CloseFds(); - handler->snapuserd()->CheckMergeCompletionStatus(); + bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus(); handler->snapuserd()->UnmapBufferRegion(); auto misc_name = handler->misc_name(); @@ -306,7 +314,11 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha { std::lock_guard<std::mutex> lock(lock_); - num_partitions_merge_complete_ += 1; + if (merge_completed) { + num_partitions_merge_complete_ += 1; + active_merge_threads_ -= 1; + WakeupMonitorMergeThread(); + } handler->SetThreadTerminated(); auto iter = FindHandler(&lock, handler->misc_name()); if (iter == dm_users_.end()) { @@ -418,6 +430,9 @@ void UserSnapshotServer::JoinAllThreads() { if (th.joinable()) th.join(); } + + stop_monitor_merge_thread_ = true; + WakeupMonitorMergeThread(); } void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) { @@ -502,13 +517,24 @@ bool UserSnapshotServer::StartHandler(const std::shared_ptr<UserSnapshotDmUserHa return true; } -bool UserSnapshotServer::StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler) { +bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock, + const std::shared_ptr<UserSnapshotDmUserHandler>& handler) { + CHECK(proof_of_lock); + if (!handler->snapuserd()->IsAttached()) { LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; return false; } - handler->snapuserd()->InitiateMerge(); + handler->snapuserd()->MonitorMerge(); + + if (!is_merge_monitor_started_.has_value()) { + std::thread(&UserSnapshotServer::MonitorMerge, this).detach(); + is_merge_monitor_started_ = true; + } + + merge_handlers_.push(handler); + WakeupMonitorMergeThread(); return true; } @@ -590,6 +616,42 @@ bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) { return true; } +void UserSnapshotServer::WakeupMonitorMergeThread() { + uint64_t notify = 1; + ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify))); + if (rc < 0) { + PLOG(FATAL) << "failed to notify monitor merge thread"; + } +} + +void UserSnapshotServer::MonitorMerge() { + while (!stop_monitor_merge_thread_) { + uint64_t testVal; + ssize_t ret = + TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal))); + if (ret == -1) { + PLOG(FATAL) << "Failed to read from eventfd"; + } else if (ret == 0) { + LOG(FATAL) << "Hit EOF on eventfd"; + } + + LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_; + { + std::lock_guard<std::mutex> lock(lock_); + while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) { + auto handler = merge_handlers_.front(); + merge_handlers_.pop(); + LOG(INFO) << "Starting merge for partition: " + << handler->snapuserd()->GetMiscName(); + handler->snapuserd()->InitiateMerge(); + active_merge_threads_ += 1; + } + } + } + + LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size(); +} + bool UserSnapshotServer::WaitForSocket() { auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); }); @@ -646,6 +708,7 @@ bool UserSnapshotServer::WaitForSocket() { if (!StartWithSocket(true)) { return false; } + return Run(); } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h index 34e7941bc..e0844aeb1 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -15,6 +15,7 @@ #pragma once #include <poll.h> +#include <sys/eventfd.h> #include <cstdio> #include <cstring> @@ -22,6 +23,8 @@ #include <future> #include <iostream> #include <mutex> +#include <optional> +#include <queue> #include <sstream> #include <string> #include <thread> @@ -34,6 +37,7 @@ namespace android { namespace snapshot { static constexpr uint32_t kMaxPacketSize = 512; +static constexpr uint8_t kMaxMergeThreads = 2; enum class DaemonOps { INIT, @@ -84,13 +88,19 @@ class UserSnapshotServer { std::vector<struct pollfd> watched_fds_; bool is_socket_present_ = false; int num_partitions_merge_complete_ = 0; + int active_merge_threads_ = 0; + bool stop_monitor_merge_thread_ = false; bool is_server_running_ = false; bool io_uring_enabled_ = false; + std::optional<bool> is_merge_monitor_started_; + + android::base::unique_fd monitor_merge_event_fd_; std::mutex lock_; using HandlerList = std::vector<std::shared_ptr<UserSnapshotDmUserHandler>>; HandlerList dm_users_; + std::queue<std::shared_ptr<UserSnapshotDmUserHandler>> merge_handlers_; void AddWatchedFd(android::base::borrowed_fd fd, int events); void AcceptClient(); @@ -108,6 +118,8 @@ class UserSnapshotServer { bool IsTerminating() { return terminating_; } void RunThread(std::shared_ptr<UserSnapshotDmUserHandler> handler); + void MonitorMerge(); + void JoinAllThreads(); bool StartWithSocket(bool start_listening); @@ -119,7 +131,7 @@ class UserSnapshotServer { void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock); public: - UserSnapshotServer() { terminating_ = false; } + UserSnapshotServer(); ~UserSnapshotServer(); bool Start(const std::string& socketname); @@ -133,9 +145,11 @@ class UserSnapshotServer { const std::string& backing_device, const std::string& base_path_merge); bool StartHandler(const std::shared_ptr<UserSnapshotDmUserHandler>& handler); - bool StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler); + bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock, + const std::shared_ptr<UserSnapshotDmUserHandler>& handler); std::string GetMergeStatus(const std::shared_ptr<UserSnapshotDmUserHandler>& handler); + void WakeupMonitorMergeThread(); void SetTerminating() { terminating_ = true; } void ReceivedSocketSignal() { received_socket_signal_ = true; } void SetServerRunning() { is_server_running_ = true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index d4e1d7c7e..28c9f688a 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -165,6 +165,13 @@ using namespace android; using namespace android::dm; using android::base::unique_fd; +void SnapshotHandler::MonitorMerge() { + { + std::lock_guard<std::mutex> lock(lock_); + merge_monitored_ = true; + } +} + // This is invoked once primarily by update-engine to initiate // the merge void SnapshotHandler::InitiateMerge() { @@ -361,10 +368,16 @@ void SnapshotHandler::WaitForMergeComplete() { std::string SnapshotHandler::GetMergeStatus() { bool merge_not_initiated = false; + bool merge_monitored = false; bool merge_failed = false; { std::lock_guard<std::mutex> lock(lock_); + + if (MergeMonitored()) { + merge_monitored = true; + } + if (!MergeInitiated()) { merge_not_initiated = true; } @@ -387,6 +400,12 @@ std::string SnapshotHandler::GetMergeStatus() { return "snapshot-merge-complete"; } + // Merge monitor thread is tracking the merge but the merge thread + // is not started yet. + if (merge_monitored) { + return "snapshot-merge"; + } + // Return the state as "snapshot". If the device was rebooted during // merge, we will return the status as "snapshot". This is ok, as // libsnapshot will explicitly resume the merge. This is slightly |