diff options
author | Steven Moreland <smoreland@google.com> | 2021-10-04 21:41:16 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-10-04 21:41:16 +0000 |
commit | adcb6a2733c1baf66e5ad72365965ab504f5f959 (patch) | |
tree | c2f19f92e4503b2de0afeebdd9bf7aeb1bb2e9c1 | |
parent | ebae424d2150b8850a101d3cf67e704318bb8089 (diff) | |
parent | cbfb18e134845deeace954bbba818acda48cb80f (diff) | |
download | native-adcb6a2733c1baf66e5ad72365965ab504f5f959.tar.gz |
Merge "libbinder: RPC allow RpcSession to be reusable" am: cbfb18e134temp_sam_202323961
Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1840387
Change-Id: Ifb27b3eb12454fa96f07e6797745c697b4f831c4
-rw-r--r-- | libs/binder/RpcSession.cpp | 105 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 21 |
2 files changed, 80 insertions, 46 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 65f6bc68c9..4465b8ef87 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -28,6 +28,7 @@ #include <android-base/hex.h> #include <android-base/macros.h> +#include <android-base/scopeguard.h> #include <android_runtime/vm.h> #include <binder/BpBinder.h> #include <binder/Parcel.h> @@ -54,13 +55,13 @@ using base::unique_fd; RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ctx)) { LOG_RPC_DETAIL("RpcSession created %p", this); - mState = std::make_unique<RpcState>(); + mRpcBinderState = std::make_unique<RpcState>(); } RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mIncomingConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mThreadState.mIncomingConnections.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -77,10 +78,12 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans void RpcSession::setMaxThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(!mOutgoingConnections.empty() || !mIncomingConnections.empty(), + LOG_ALWAYS_FATAL_IF(!mThreadState.mOutgoingConnections.empty() || + !mThreadState.mIncomingConnections.empty(), "Must set max threads before setting up connections, but has %zu client(s) " "and %zu server(s)", - mOutgoingConnections.size(), mIncomingConnections.size()); + mThreadState.mOutgoingConnections.size(), + mThreadState.mIncomingConnections.size()); mMaxThreads = threads; } @@ -194,11 +197,11 @@ bool RpcSession::shutdownAndWait(bool wait) { LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownListener->waitForShutdown(_l, sp<RpcSession>::fromExisting(this)); - LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + LOG_ALWAYS_FATAL_IF(!mThreadState.mThreads.empty(), "Shutdown failed"); } _l.unlock(); - mState->clear(); + mRpcBinderState->clear(); return true; } @@ -260,11 +263,11 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, const sp<RpcSession>& session) { - while (session->mIncomingConnections.size() > 0) { + while (session->mThreadState.mIncomingConnections.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { ALOGE("Waiting for RpcSession to shut down (1s w/o progress): %zu incoming connections " "still.", - session->mIncomingConnections.size()); + session->mThreadState.mIncomingConnections.size()); } } } @@ -274,7 +277,7 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) { { std::lock_guard<std::mutex> _l(mMutex); - mThreads[thread.get_id()] = std::move(thread); + mThreadState.mThreads[thread.get_id()] = std::move(thread); } } @@ -289,7 +292,8 @@ RpcSession::PreJoinSetupResult RpcSession::preJoinSetup( if (connection == nullptr) { status = DEAD_OBJECT; } else { - status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this)); + status = + mRpcBinderState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this)); } return PreJoinSetupResult{ @@ -376,10 +380,10 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { std::lock_guard<std::mutex> _l(session->mMutex); - auto it = session->mThreads.find(std::this_thread::get_id()); - LOG_ALWAYS_FATAL_IF(it == session->mThreads.end()); + auto it = session->mThreadState.mThreads.find(std::this_thread::get_id()); + LOG_ALWAYS_FATAL_IF(it == session->mThreadState.mThreads.end()); it->second.detach(); - session->mThreads.erase(it); + session->mThreadState.mThreads.erase(it); listener = session->mEventListener.promote(); } @@ -410,12 +414,34 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mThreadState.mOutgoingConnections.size() != 0, "Must only setup session once, but already has %zu clients", - mOutgoingConnections.size()); + mThreadState.mOutgoingConnections.size()); } + if (auto status = initShutdownTrigger(); status != OK) return status; + auto oldProtocolVersion = mProtocolVersion; + auto cleanup = base::ScopeGuard([&] { + // if any threads are started, shut them down + (void)shutdownAndWait(true); + + mShutdownListener = nullptr; + mEventListener.clear(); + + mId.clear(); + + mShutdownTrigger = nullptr; + mRpcBinderState = std::make_unique<RpcState>(); + + // protocol version may have been downgraded - if we reuse this object + // to connect to another server, force that server to request a + // downgrade again + mProtocolVersion = oldProtocolVersion; + + mThreadState = {}; + }); + if (status_t status = connectAndInit({}, false /*incoming*/); status != OK) return status; { @@ -464,6 +490,8 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< if (status_t status = connectAndInit(mId, true /*incoming*/); status != OK) return status; } + cleanup.Disable(); + return OK; } @@ -634,12 +662,12 @@ status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTran std::lock_guard<std::mutex> _l(mMutex); connection->rpcTransport = std::move(rpcTransport); connection->exclusiveTid = gettid(); - mOutgoingConnections.push_back(connection); + mThreadState.mOutgoingConnections.push_back(connection); } status_t status = OK; if (init) { - mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this)); + mRpcBinderState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this)); } { @@ -671,9 +699,9 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { std::lock_guard<std::mutex> _l(mMutex); - if (mIncomingConnections.size() >= mMaxThreads) { + if (mThreadState.mIncomingConnections.size() >= mMaxThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", - mIncomingConnections.size(), mMaxThreads); + mThreadState.mIncomingConnections.size(), mMaxThreads); return nullptr; } @@ -681,7 +709,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( // happens when new connections are still being established as part of a // very short-lived session which shuts down after it already started // accepting new connections. - if (mIncomingConnections.size() < mMaxIncomingConnections) { + if (mThreadState.mIncomingConnections.size() < mThreadState.mMaxIncomingConnections) { return nullptr; } @@ -689,18 +717,19 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( session->rpcTransport = std::move(rpcTransport); session->exclusiveTid = gettid(); - mIncomingConnections.push_back(session); - mMaxIncomingConnections = mIncomingConnections.size(); + mThreadState.mIncomingConnections.push_back(session); + mThreadState.mMaxIncomingConnections = mThreadState.mIncomingConnections.size(); return session; } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); - if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection); - it != mIncomingConnections.end()) { - mIncomingConnections.erase(it); - if (mIncomingConnections.size() == 0) { + if (auto it = std::find(mThreadState.mIncomingConnections.begin(), + mThreadState.mIncomingConnections.end(), connection); + it != mThreadState.mIncomingConnections.end()) { + mThreadState.mIncomingConnections.erase(it); + if (mThreadState.mIncomingConnections.size() == 0) { sp<EventListener> listener = mEventListener.promote(); if (listener) { _l.unlock(); @@ -725,7 +754,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co pid_t tid = gettid(); std::unique_lock<std::mutex> _l(session->mMutex); - session->mWaitingThreads++; + session->mThreadState.mWaitingThreads++; while (true) { sp<RpcConnection> exclusive; sp<RpcConnection> available; @@ -733,8 +762,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, session->mOutgoingConnections, - session->mOutgoingConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mThreadState.mOutgoingConnections, + session->mThreadState.mOutgoingConnectionsOffset); // WARNING: this assumes a server cannot request its client to send // a transaction, as mIncomingConnections is excluded below. @@ -747,8 +776,9 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - session->mOutgoingConnectionsOffset = (session->mOutgoingConnectionsOffset + 1) % - session->mOutgoingConnections.size(); + session->mThreadState.mOutgoingConnectionsOffset = + (session->mThreadState.mOutgoingConnectionsOffset + 1) % + session->mThreadState.mOutgoingConnections.size(); } // USE SERVING SOCKET (e.g. nested transaction) @@ -756,7 +786,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co sp<RpcConnection> exclusiveIncoming; // server connections are always assigned to a thread findConnection(tid, &exclusiveIncoming, nullptr /*available*/, - session->mIncomingConnections, 0 /* index hint */); + session->mThreadState.mIncomingConnections, 0 /* index hint */); // asynchronous calls cannot be nested, we currently allow ref count // calls to be nested (so that you can use this without having extra @@ -785,19 +815,20 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co break; } - if (session->mOutgoingConnections.size() == 0) { + if (session->mThreadState.mOutgoingConnections.size() == 0) { ALOGE("Session has no client connections. This is required for an RPC server to make " "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " "connections: %zu", - static_cast<int>(use), session->mIncomingConnections.size()); + static_cast<int>(use), session->mThreadState.mIncomingConnections.size()); return WOULD_BLOCK; } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - session->mOutgoingConnections.size(), session->mIncomingConnections.size()); + session->mThreadState.mOutgoingConnections.size(), + session->mThreadState.mIncomingConnections.size()); session->mAvailableConnectionCv.wait(_l); } - session->mWaitingThreads--; + session->mThreadState.mWaitingThreads--; return OK; } @@ -836,7 +867,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() { if (!mReentrant && mConnection != nullptr) { std::unique_lock<std::mutex> _l(mSession->mMutex); mConnection->exclusiveTid = std::nullopt; - if (mSession->mWaitingThreads > 0) { + if (mSession->mThreadState.mWaitingThreads > 0) { _l.unlock(); mSession->mAvailableConnectionCv.notify_one(); } diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 12d448d1e4..19888b7bf7 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -168,7 +168,7 @@ public: sp<RpcServer> server(); // internal only - const std::unique_ptr<RpcState>& state() { return mState; } + const std::unique_ptr<RpcState>& state() { return mRpcBinderState; } private: friend sp<RpcSession>; @@ -303,7 +303,7 @@ private: std::unique_ptr<FdTrigger> mShutdownTrigger; - std::unique_ptr<RpcState> mState; + std::unique_ptr<RpcState> mRpcBinderState; std::mutex mMutex; // for all below @@ -311,13 +311,16 @@ private: std::optional<uint32_t> mProtocolVersion; std::condition_variable mAvailableConnectionCv; // for mWaitingThreads - size_t mWaitingThreads = 0; - // hint index into clients, ++ when sending an async transaction - size_t mOutgoingConnectionsOffset = 0; - std::vector<sp<RpcConnection>> mOutgoingConnections; - size_t mMaxIncomingConnections = 0; - std::vector<sp<RpcConnection>> mIncomingConnections; - std::map<std::thread::id, std::thread> mThreads; + + struct ThreadState { + size_t mWaitingThreads = 0; + // hint index into clients, ++ when sending an async transaction + size_t mOutgoingConnectionsOffset = 0; + std::vector<sp<RpcConnection>> mOutgoingConnections; + size_t mMaxIncomingConnections = 0; + std::vector<sp<RpcConnection>> mIncomingConnections; + std::map<std::thread::id, std::thread> mThreads; + } mThreadState; }; } // namespace android |