diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2017-10-11 07:21:48 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2017-10-11 07:21:48 +0000 |
commit | dcdc77c87e2c777fc8f5cb9672b4706d5dad3b36 (patch) | |
tree | 6595f49407fbe45702f943d913a1d34bd1910fb8 | |
parent | 666e90ffc939a27db63bd7c031fe70ebc8676b73 (diff) | |
parent | 52ea25cf06cef250ec73052611b48556b3fce4d5 (diff) | |
download | native-dcdc77c87e2c777fc8f5cb9672b4706d5dad3b36.tar.gz |
Snap for 4388906 from 52ea25cf06cef250ec73052611b48556b3fce4d5 to oc-mr1-release
Change-Id: I8be8e1ad05e7445a5c03a3f6009b5e1e30171a59
44 files changed, 2072 insertions, 794 deletions
diff --git a/libs/vr/libbufferhub/Android.bp b/libs/vr/libbufferhub/Android.bp index da0ea24dab..f32720038a 100644 --- a/libs/vr/libbufferhub/Android.bp +++ b/libs/vr/libbufferhub/Android.bp @@ -37,7 +37,8 @@ sharedLibraries = [ "libnativewindow" ] -HeaderLibraries = [ +headerLibraries = [ + "libdvr_headers", "libnativebase_headers", ] @@ -45,12 +46,13 @@ cc_library { srcs: sourceFiles, cflags: [ "-DLOG_TAG=\"libbufferhub\"", - "-DTRACE=0" + "-DTRACE=0", + "-DATRACE_TAG=ATRACE_TAG_GRAPHICS", ], export_include_dirs: localIncludeFiles, static_libs: staticLibraries, shared_libs: sharedLibraries, - header_libs: HeaderLibraries, + header_libs: headerLibraries, name: "libbufferhub", export_header_lib_headers: [ "libnativebase_headers", @@ -62,6 +64,7 @@ cc_test { srcs: ["bufferhub_tests.cpp"], static_libs: ["libbufferhub"] + staticLibraries, shared_libs: sharedLibraries, + header_libs: headerLibraries, name: "bufferhub_tests", } diff --git a/libs/vr/libbufferhub/buffer_hub_client.cpp b/libs/vr/libbufferhub/buffer_hub_client.cpp index b9a53b0ced..97341b1477 100644 --- a/libs/vr/libbufferhub/buffer_hub_client.cpp +++ b/libs/vr/libbufferhub/buffer_hub_client.cpp @@ -2,7 +2,7 @@ #include <log/log.h> #include <poll.h> -#define ATRACE_TAG ATRACE_TAG_GRAPHICS +#include <sys/epoll.h> #include <utils/Trace.h> #include <mutex> @@ -12,9 +12,8 @@ #include "include/private/dvr/bufferhub_rpc.h" -using android::pdx::LocalHandle; using android::pdx::LocalChannelHandle; -using android::pdx::rpc::WrapBuffer; +using android::pdx::LocalHandle; using android::pdx::Status; namespace android { @@ -29,7 +28,11 @@ BufferHubBuffer::BufferHubBuffer(const std::string& endpoint_path) endpoint_path)}, id_(-1) {} -BufferHubBuffer::~BufferHubBuffer() {} +BufferHubBuffer::~BufferHubBuffer() { + if (metadata_header_ != nullptr) { + metadata_buffer_.Unlock(); + } +} Status<LocalChannelHandle> BufferHubBuffer::CreateConsumer() { Status<LocalChannelHandle> status = @@ -43,7 +46,7 @@ Status<LocalChannelHandle> BufferHubBuffer::CreateConsumer() { int BufferHubBuffer::ImportBuffer() { ATRACE_NAME("BufferHubBuffer::ImportBuffer"); - Status<NativeBufferHandle<LocalHandle>> status = + Status<BufferDescription<LocalHandle>> status = InvokeRemoteMethod<BufferHubRPC::GetBuffer>(); if (!status) { ALOGE("BufferHubBuffer::ImportBuffer: Failed to get buffer: %s", @@ -54,24 +57,135 @@ int BufferHubBuffer::ImportBuffer() { return -EIO; } - auto buffer_handle = status.take(); + auto buffer_desc = status.take(); // Stash the buffer id to replace the value in id_. - const int new_id = buffer_handle.id(); + const int new_id = buffer_desc.id(); // Import the buffer. IonBuffer ion_buffer; - ALOGD_IF( - TRACE, "BufferHubBuffer::ImportBuffer: id=%d FdCount=%zu IntCount=%zu", - buffer_handle.id(), buffer_handle.FdCount(), buffer_handle.IntCount()); + ALOGD_IF(TRACE, "BufferHubBuffer::ImportBuffer: id=%d.", buffer_desc.id()); - const int ret = buffer_handle.Import(&ion_buffer); - if (ret < 0) + if (const int ret = buffer_desc.ImportBuffer(&ion_buffer)) return ret; - // If the import succeeds, replace the previous buffer and id. + // Import the metadata. + IonBuffer metadata_buffer; + if (const int ret = buffer_desc.ImportMetadata(&metadata_buffer)) { + ALOGE("Failed to import metadata buffer, error=%d", ret); + return ret; + } + size_t metadata_buf_size = metadata_buffer.width(); + if (metadata_buf_size < BufferHubDefs::kMetadataHeaderSize) { + ALOGE("BufferHubBuffer::ImportBuffer: metadata buffer too small: %zu", + metadata_buf_size); + return -ENOMEM; + } + + // If all imports succee, replace the previous buffer and id. buffer_ = std::move(ion_buffer); + metadata_buffer_ = std::move(metadata_buffer); + metadata_buf_size_ = metadata_buf_size; + user_metadata_size_ = metadata_buf_size_ - BufferHubDefs::kMetadataHeaderSize; + + void* metadata_ptr = nullptr; + if (const int ret = + metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0, + /*y=*/0, metadata_buf_size_, + /*height=*/1, &metadata_ptr)) { + ALOGE("BufferHubBuffer::ImportBuffer: Failed to lock metadata."); + return ret; + } + + // Set up shared fences. + shared_acquire_fence_ = buffer_desc.take_acquire_fence(); + shared_release_fence_ = buffer_desc.take_release_fence(); + if (!shared_acquire_fence_ || !shared_release_fence_) { + ALOGE("BufferHubBuffer::ImportBuffer: Failed to import shared fences."); + return -EIO; + } + + metadata_header_ = + reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr); + if (user_metadata_size_) { + user_metadata_ptr_ = + reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(metadata_ptr) + + BufferHubDefs::kMetadataHeaderSize); + } else { + user_metadata_ptr_ = nullptr; + } + id_ = new_id; + buffer_state_bit_ = buffer_desc.buffer_state_bit(); + + // Note that here the buffer state is mapped from shared memory as an atomic + // object. The std::atomic's constructor will not be called so that the + // original value stored in the memory region will be preserved. + buffer_state_ = &metadata_header_->buffer_state; + ALOGD_IF(TRACE, + "BufferHubBuffer::ImportBuffer: id=%d, buffer_state=%" PRIx64 ".", + id(), buffer_state_->load()); + fence_state_ = &metadata_header_->fence_state; + ALOGD_IF(TRACE, + "BufferHubBuffer::ImportBuffer: id=%d, fence_state=%" PRIx64 ".", + id(), fence_state_->load()); + + return 0; +} + +inline int BufferHubBuffer::CheckMetadata(size_t user_metadata_size) const { + if (user_metadata_size && !user_metadata_ptr_) { + ALOGE("BufferHubBuffer::CheckMetadata: doesn't support custom metadata."); + return -EINVAL; + } + if (user_metadata_size > user_metadata_size_) { + ALOGE("BufferHubBuffer::CheckMetadata: too big: %zu, maximum: %zu.", + user_metadata_size, user_metadata_size_); + return -E2BIG; + } + return 0; +} + +int BufferHubBuffer::UpdateSharedFence(const LocalHandle& new_fence, + const LocalHandle& shared_fence) { + if (pending_fence_fd_.Get() != new_fence.Get()) { + // First, replace the old fd if there was already one. Skipping if the new + // one is the same as the old. + if (pending_fence_fd_.IsValid()) { + const int ret = epoll_ctl(shared_fence.Get(), EPOLL_CTL_DEL, + pending_fence_fd_.Get(), nullptr); + ALOGW_IF(ret, + "BufferHubBuffer::UpdateSharedFence: failed to remove old fence " + "fd from epoll set, error: %s.", + strerror(errno)); + } + + if (new_fence.IsValid()) { + // If ready fence is valid, we put that into the epoll set. + epoll_event event; + event.events = EPOLLIN; + event.data.u64 = buffer_state_bit(); + pending_fence_fd_ = new_fence.Duplicate(); + if (epoll_ctl(shared_fence.Get(), EPOLL_CTL_ADD, pending_fence_fd_.Get(), + &event) < 0) { + const int error = errno; + ALOGE( + "BufferHubBuffer::UpdateSharedFence: failed to add new fence fd " + "into epoll set, error: %s.", + strerror(error)); + return -error; + } + // Set bit in fence state to indicate that there is a fence from this + // producer or consumer. + fence_state_->fetch_or(buffer_state_bit()); + } else { + // Unset bit in fence state to indicate that there is no fence, so that + // when consumer to acquire or producer to acquire, it knows no need to + // check fence for this buffer. + fence_state_->fetch_and(~buffer_state_bit()); + } + } + return 0; } @@ -131,31 +245,144 @@ std::unique_ptr<BufferConsumer> BufferConsumer::Import( : LocalChannelHandle{nullptr, -status.error()}); } +int BufferConsumer::LocalAcquire(DvrNativeBufferMetadata* out_meta, + LocalHandle* out_fence) { + if (!out_meta) + return -EINVAL; + + // Only check producer bit and this consumer buffer's particular consumer bit. + // The buffer is can be acquired iff: 1) producer bit is set; 2) consumer bit + // is not set. + uint64_t buffer_state = buffer_state_->load(); + if (!BufferHubDefs::IsBufferPosted(buffer_state, buffer_state_bit())) { + ALOGE("BufferConsumer::LocalAcquire: not posted, id=%d state=%" PRIx64 + " buffer_state_bit=%" PRIx64 ".", + id(), buffer_state, buffer_state_bit()); + return -EBUSY; + } + + // Copy the canonical metadata. + void* metadata_ptr = reinterpret_cast<void*>(&metadata_header_->metadata); + memcpy(out_meta, metadata_ptr, sizeof(DvrNativeBufferMetadata)); + // Fill in the user_metadata_ptr in address space of the local process. + if (out_meta->user_metadata_size) { + out_meta->user_metadata_ptr = + reinterpret_cast<uint64_t>(user_metadata_ptr_); + } else { + out_meta->user_metadata_ptr = 0; + } + + uint64_t fence_state = fence_state_->load(); + // If there is an acquire fence from producer, we need to return it. + if (fence_state & BufferHubDefs::kProducerStateBit) { + *out_fence = shared_acquire_fence_.Duplicate(); + } + + // Set the consumer bit unique to this consumer. + BufferHubDefs::ModifyBufferState(buffer_state_, 0ULL, buffer_state_bit()); + return 0; +} + int BufferConsumer::Acquire(LocalHandle* ready_fence) { return Acquire(ready_fence, nullptr, 0); } int BufferConsumer::Acquire(LocalHandle* ready_fence, void* meta, - size_t meta_size_bytes) { + size_t user_metadata_size) { ATRACE_NAME("BufferConsumer::Acquire"); - LocalFence fence; - auto return_value = - std::make_pair(std::ref(fence), WrapBuffer(meta, meta_size_bytes)); - auto status = InvokeRemoteMethodInPlace<BufferHubRPC::ConsumerAcquire>( - &return_value, meta_size_bytes); - if (status && ready_fence) - *ready_fence = fence.take(); - return status ? 0 : -status.error(); + + if (const int error = CheckMetadata(user_metadata_size)) + return error; + + DvrNativeBufferMetadata canonical_meta; + if (const int error = LocalAcquire(&canonical_meta, ready_fence)) + return error; + + if (meta && user_metadata_size) { + void* metadata_src = + reinterpret_cast<void*>(canonical_meta.user_metadata_ptr); + if (metadata_src) { + memcpy(meta, metadata_src, user_metadata_size); + } else { + ALOGW("BufferConsumer::Acquire: no user-defined metadata."); + } + } + + auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerAcquire>(); + if (!status) + return -status.error(); + return 0; +} + +int BufferConsumer::AcquireAsync(DvrNativeBufferMetadata* out_meta, + LocalHandle* out_fence) { + ATRACE_NAME("BufferConsumer::AcquireAsync"); + + if (const int error = LocalAcquire(out_meta, out_fence)) + return error; + + auto status = SendImpulse(BufferHubRPC::ConsumerAcquire::Opcode); + if (!status) + return -status.error(); + return 0; +} + +int BufferConsumer::LocalRelease(const DvrNativeBufferMetadata* meta, + const LocalHandle& release_fence) { + if (const int error = CheckMetadata(meta->user_metadata_size)) + return error; + + // Check invalid state transition. + uint64_t buffer_state = buffer_state_->load(); + if (!BufferHubDefs::IsBufferAcquired(buffer_state)) { + ALOGE("BufferConsumer::LocalRelease: not acquired id=%d state=%" PRIx64 ".", + id(), buffer_state); + return -EBUSY; + } + + // On release, only the user requested metadata is copied back into the shared + // memory for metadata. Since there are multiple consumers, it doesn't make + // sense to send the canonical metadata back to the producer. However, one of + // the consumer can still choose to write up to user_metadata_size bytes of + // data into user_metadata_ptr. + if (meta->user_metadata_ptr && meta->user_metadata_size) { + void* metadata_src = reinterpret_cast<void*>(meta->user_metadata_ptr); + memcpy(user_metadata_ptr_, metadata_src, meta->user_metadata_size); + } + + // Send out the release fence through the shared epoll fd. Note that during + // releasing the producer is not expected to be polling on the fence. + if (const int error = UpdateSharedFence(release_fence, shared_release_fence_)) + return error; + + // For release operation, the client don't need to change the state as it's + // bufferhubd's job to flip the produer bit once all consumers are released. + return 0; } int BufferConsumer::Release(const LocalHandle& release_fence) { ATRACE_NAME("BufferConsumer::Release"); + + DvrNativeBufferMetadata meta; + if (const int error = LocalRelease(&meta, release_fence)) + return error; + return ReturnStatusOrError(InvokeRemoteMethod<BufferHubRPC::ConsumerRelease>( BorrowedFence(release_fence.Borrow()))); } int BufferConsumer::ReleaseAsync() { + DvrNativeBufferMetadata meta; + return ReleaseAsync(&meta, LocalHandle()); +} + +int BufferConsumer::ReleaseAsync(const DvrNativeBufferMetadata* meta, + const LocalHandle& release_fence) { ATRACE_NAME("BufferConsumer::ReleaseAsync"); + + if (const int error = LocalRelease(meta, release_fence)) + return error; + return ReturnStatusOrError( SendImpulse(BufferHubRPC::ConsumerRelease::Opcode)); } @@ -168,24 +395,25 @@ int BufferConsumer::SetIgnore(bool ignore) { } BufferProducer::BufferProducer(uint32_t width, uint32_t height, uint32_t format, - uint32_t usage, size_t metadata_size) - : BufferProducer(width, height, format, usage, usage, metadata_size) {} + uint32_t usage, size_t user_metadata_size) + : BufferProducer(width, height, format, usage, usage, user_metadata_size) {} BufferProducer::BufferProducer(uint32_t width, uint32_t height, uint32_t format, uint64_t producer_usage, uint64_t consumer_usage, - size_t metadata_size) + size_t user_metadata_size) : BASE(BufferHubRPC::kClientPath) { ATRACE_NAME("BufferProducer::BufferProducer"); ALOGD_IF(TRACE, "BufferProducer::BufferProducer: fd=%d width=%u height=%u format=%u " "producer_usage=%" PRIx64 " consumer_usage=%" PRIx64 - " metadata_size=%zu", + " user_metadata_size=%zu", event_fd(), width, height, format, producer_usage, consumer_usage, - metadata_size); + user_metadata_size); // (b/37881101) Deprecate producer/consumer usage auto status = InvokeRemoteMethod<BufferHubRPC::CreateBuffer>( - width, height, format, (producer_usage | consumer_usage), metadata_size); + width, height, format, (producer_usage | consumer_usage), + user_metadata_size); if (!status) { ALOGE( "BufferProducer::BufferProducer: Failed to create producer buffer: %s", @@ -206,27 +434,28 @@ BufferProducer::BufferProducer(uint32_t width, uint32_t height, uint32_t format, BufferProducer::BufferProducer(const std::string& name, int user_id, int group_id, uint32_t width, uint32_t height, uint32_t format, uint32_t usage, - size_t meta_size_bytes) + size_t user_metadata_size) : BufferProducer(name, user_id, group_id, width, height, format, usage, - usage, meta_size_bytes) {} + usage, user_metadata_size) {} BufferProducer::BufferProducer(const std::string& name, int user_id, int group_id, uint32_t width, uint32_t height, uint32_t format, uint64_t producer_usage, - uint64_t consumer_usage, size_t meta_size_bytes) + uint64_t consumer_usage, + size_t user_metadata_size) : BASE(BufferHubRPC::kClientPath) { ATRACE_NAME("BufferProducer::BufferProducer"); ALOGD_IF(TRACE, "BufferProducer::BufferProducer: fd=%d name=%s user_id=%d " "group_id=%d width=%u height=%u format=%u producer_usage=%" PRIx64 - " consumer_usage=%" PRIx64 " meta_size_bytes=%zu", + " consumer_usage=%" PRIx64 " user_metadata_size=%zu", event_fd(), name.c_str(), user_id, group_id, width, height, format, - producer_usage, consumer_usage, meta_size_bytes); + producer_usage, consumer_usage, user_metadata_size); // (b/37881101) Deprecate producer/consumer usage auto status = InvokeRemoteMethod<BufferHubRPC::CreatePersistentBuffer>( name, user_id, group_id, width, height, format, - (producer_usage | consumer_usage), meta_size_bytes); + (producer_usage | consumer_usage), user_metadata_size); if (!status) { ALOGE( "BufferProducer::BufferProducer: Failed to create/get persistent " @@ -260,12 +489,12 @@ BufferProducer::BufferProducer(uint64_t producer_usage, uint64_t consumer_usage, const int width = static_cast<int>(size); const int height = 1; const int format = HAL_PIXEL_FORMAT_BLOB; - const size_t meta_size_bytes = 0; + const size_t user_metadata_size = 0; // (b/37881101) Deprecate producer/consumer usage auto status = InvokeRemoteMethod<BufferHubRPC::CreateBuffer>( width, height, format, (producer_usage | consumer_usage), - meta_size_bytes); + user_metadata_size); if (!status) { ALOGE("BufferProducer::BufferProducer: Failed to create blob: %s", status.GetErrorMessage().c_str()); @@ -299,12 +528,12 @@ BufferProducer::BufferProducer(const std::string& name, int user_id, const int width = static_cast<int>(size); const int height = 1; const int format = HAL_PIXEL_FORMAT_BLOB; - const size_t meta_size_bytes = 0; + const size_t user_metadata_size = 0; // (b/37881101) Deprecate producer/consumer usage auto status = InvokeRemoteMethod<BufferHubRPC::CreatePersistentBuffer>( name, user_id, group_id, width, height, format, - (producer_usage | consumer_usage), meta_size_bytes); + (producer_usage | consumer_usage), user_metadata_size); if (!status) { ALOGE( "BufferProducer::BufferProducer: Failed to create persistent " @@ -360,28 +589,141 @@ BufferProducer::BufferProducer(LocalChannelHandle channel) } } +int BufferProducer::LocalPost(const DvrNativeBufferMetadata* meta, + const LocalHandle& ready_fence) { + if (const int error = CheckMetadata(meta->user_metadata_size)) + return error; + + // Check invalid state transition. + uint64_t buffer_state = buffer_state_->load(); + if (!BufferHubDefs::IsBufferGained(buffer_state)) { + ALOGE("BufferProducer::LocalPost: not gained, id=%d state=%" PRIx64 ".", + id(), buffer_state); + return -EBUSY; + } + + // Copy the canonical metadata. + void* metadata_ptr = reinterpret_cast<void*>(&metadata_header_->metadata); + memcpy(metadata_ptr, meta, sizeof(DvrNativeBufferMetadata)); + // Copy extra user requested metadata. + if (meta->user_metadata_ptr && meta->user_metadata_size) { + void* metadata_src = reinterpret_cast<void*>(meta->user_metadata_ptr); + memcpy(user_metadata_ptr_, metadata_src, meta->user_metadata_size); + } + + // Send out the acquire fence through the shared epoll fd. Note that during + // posting no consumer is not expected to be polling on the fence. + if (const int error = UpdateSharedFence(ready_fence, shared_acquire_fence_)) + return error; + + // Set the producer bit atomically to transit into posted state. + BufferHubDefs::ModifyBufferState(buffer_state_, 0ULL, + BufferHubDefs::kProducerStateBit); + return 0; +} + int BufferProducer::Post(const LocalHandle& ready_fence, const void* meta, - size_t meta_size_bytes) { + size_t user_metadata_size) { ATRACE_NAME("BufferProducer::Post"); + + // Populate cononical metadata for posting. + DvrNativeBufferMetadata canonical_meta; + canonical_meta.user_metadata_ptr = reinterpret_cast<uint64_t>(meta); + canonical_meta.user_metadata_size = user_metadata_size; + + if (const int error = LocalPost(&canonical_meta, ready_fence)) + return error; + return ReturnStatusOrError(InvokeRemoteMethod<BufferHubRPC::ProducerPost>( - BorrowedFence(ready_fence.Borrow()), WrapBuffer(meta, meta_size_bytes))); + BorrowedFence(ready_fence.Borrow()))); +} + +int BufferProducer::PostAsync(const DvrNativeBufferMetadata* meta, + const LocalHandle& ready_fence) { + ATRACE_NAME("BufferProducer::PostAsync"); + + if (const int error = LocalPost(meta, ready_fence)) + return error; + + return ReturnStatusOrError(SendImpulse(BufferHubRPC::ProducerPost::Opcode)); +} + +int BufferProducer::LocalGain(DvrNativeBufferMetadata* out_meta, + LocalHandle* out_fence) { + uint64_t buffer_state = buffer_state_->load(); + ALOGD_IF(TRACE, "BufferProducer::LocalGain: buffer=%d, state=%" PRIx64 ".", + id(), buffer_state); + + if (!out_meta) + return -EINVAL; + + if (!BufferHubDefs::IsBufferReleased(buffer_state)) { + if (BufferHubDefs::IsBufferGained(buffer_state)) { + // We don't want to log error when gaining a newly allocated + // buffer. + ALOGI("BufferProducer::LocalGain: already gained id=%d.", id()); + return -EALREADY; + } + ALOGE("BufferProducer::LocalGain: not released id=%d state=%" PRIx64 ".", + id(), buffer_state); + return -EBUSY; + } + + // Canonical metadata is undefined on Gain. Except for user_metadata and + // release_fence_mask. Fill in the user_metadata_ptr in address space of the + // local process. + if (metadata_header_->metadata.user_metadata_size && user_metadata_ptr_) { + out_meta->user_metadata_size = + metadata_header_->metadata.user_metadata_size; + out_meta->user_metadata_ptr = + reinterpret_cast<uint64_t>(user_metadata_ptr_); + } else { + out_meta->user_metadata_size = 0; + out_meta->user_metadata_ptr = 0; + } + + uint64_t fence_state = fence_state_->load(); + // If there is an release fence from consumer, we need to return it. + if (fence_state & BufferHubDefs::kConsumerStateMask) { + *out_fence = shared_release_fence_.Duplicate(); + out_meta->release_fence_mask = + fence_state & BufferHubDefs::kConsumerStateMask; + } + + // Clear out all bits and the buffer is now back to gained state. + buffer_state_->store(0ULL); + return 0; } int BufferProducer::Gain(LocalHandle* release_fence) { ATRACE_NAME("BufferProducer::Gain"); + + DvrNativeBufferMetadata meta; + if (const int error = LocalGain(&meta, release_fence)) + return error; + auto status = InvokeRemoteMethod<BufferHubRPC::ProducerGain>(); if (!status) return -status.error(); - if (release_fence) - *release_fence = status.take().take(); return 0; } -int BufferProducer::GainAsync() { +int BufferProducer::GainAsync(DvrNativeBufferMetadata* out_meta, + LocalHandle* release_fence) { ATRACE_NAME("BufferProducer::GainAsync"); + + if (const int error = LocalGain(out_meta, release_fence)) + return error; + return ReturnStatusOrError(SendImpulse(BufferHubRPC::ProducerGain::Opcode)); } +int BufferProducer::GainAsync() { + DvrNativeBufferMetadata meta; + LocalHandle fence; + return GainAsync(&meta, &fence); +} + std::unique_ptr<BufferProducer> BufferProducer::Import( LocalChannelHandle channel) { ALOGD_IF(TRACE, "BufferProducer::Import: channel=%d", channel.value()); diff --git a/libs/vr/libbufferhub/bufferhub_tests.cpp b/libs/vr/libbufferhub/bufferhub_tests.cpp index 1daa5d62d7..c4b9a8c88d 100644 --- a/libs/vr/libbufferhub/bufferhub_tests.cpp +++ b/libs/vr/libbufferhub/bufferhub_tests.cpp @@ -1,5 +1,9 @@ #include <gtest/gtest.h> +#include <poll.h> #include <private/dvr/buffer_hub_client.h> +#include <private/dvr/bufferhub_rpc.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> #include <mutex> #include <thread> @@ -13,8 +17,10 @@ return result; \ })() -using android::dvr::BufferProducer; using android::dvr::BufferConsumer; +using android::dvr::BufferHubDefs::kConsumerStateMask; +using android::dvr::BufferHubDefs::kProducerStateBit; +using android::dvr::BufferProducer; using android::pdx::LocalHandle; const int kWidth = 640; @@ -37,29 +43,149 @@ TEST_F(LibBufferHubTest, TestBasicUsage) { BufferConsumer::Import(c->CreateConsumer()); ASSERT_TRUE(c2.get() != nullptr); + // Producer state mask is unique, i.e. 1. + EXPECT_EQ(p->buffer_state_bit(), kProducerStateBit); + // Consumer state mask cannot have producer bit on. + EXPECT_EQ(c->buffer_state_bit() & kProducerStateBit, 0); + // Consumer state mask must be a single, i.e. power of 2. + EXPECT_NE(c->buffer_state_bit(), 0); + EXPECT_EQ(c->buffer_state_bit() & (c->buffer_state_bit() - 1), 0); + // Consumer state mask cannot have producer bit on. + EXPECT_EQ(c2->buffer_state_bit() & kProducerStateBit, 0); + // Consumer state mask must be a single, i.e. power of 2. + EXPECT_NE(c2->buffer_state_bit(), 0); + EXPECT_EQ(c2->buffer_state_bit() & (c2->buffer_state_bit() - 1), 0); + // Each consumer should have unique bit. + EXPECT_EQ(c->buffer_state_bit() & c2->buffer_state_bit(), 0); + + // Initial state: producer not available, consumers not available. + EXPECT_EQ(0, RETRY_EINTR(p->Poll(100))); + EXPECT_EQ(0, RETRY_EINTR(c->Poll(100))); + EXPECT_EQ(0, RETRY_EINTR(c2->Poll(100))); + EXPECT_EQ(0, p->Post(LocalHandle(), kContext)); - // Both consumers should be triggered. - EXPECT_GE(0, RETRY_EINTR(p->Poll(0))); - EXPECT_LT(0, RETRY_EINTR(c->Poll(10))); - EXPECT_LT(0, RETRY_EINTR(c2->Poll(10))); + + // New state: producer not available, consumers available. + EXPECT_EQ(0, RETRY_EINTR(p->Poll(100))); + EXPECT_EQ(1, RETRY_EINTR(c->Poll(100))); + EXPECT_EQ(1, RETRY_EINTR(c2->Poll(100))); uint64_t context; LocalHandle fence; - EXPECT_LE(0, c->Acquire(&fence, &context)); + EXPECT_EQ(0, c->Acquire(&fence, &context)); EXPECT_EQ(kContext, context); - EXPECT_GE(0, RETRY_EINTR(c->Poll(0))); + EXPECT_EQ(0, RETRY_EINTR(c->Poll(100))); + EXPECT_EQ(1, RETRY_EINTR(c2->Poll(100))); - EXPECT_LE(0, c2->Acquire(&fence, &context)); + EXPECT_EQ(0, c2->Acquire(&fence, &context)); EXPECT_EQ(kContext, context); - EXPECT_GE(0, RETRY_EINTR(c2->Poll(0))); + EXPECT_EQ(0, RETRY_EINTR(c2->Poll(100))); + EXPECT_EQ(0, RETRY_EINTR(c->Poll(100))); EXPECT_EQ(0, c->Release(LocalHandle())); - EXPECT_GE(0, RETRY_EINTR(p->Poll(0))); + EXPECT_EQ(0, RETRY_EINTR(p->Poll(100))); EXPECT_EQ(0, c2->Discard()); - EXPECT_LE(0, RETRY_EINTR(p->Poll(0))); + EXPECT_EQ(1, RETRY_EINTR(p->Poll(100))); EXPECT_EQ(0, p->Gain(&fence)); - EXPECT_GE(0, RETRY_EINTR(p->Poll(0))); + EXPECT_EQ(0, RETRY_EINTR(p->Poll(100))); + EXPECT_EQ(0, RETRY_EINTR(c->Poll(100))); + EXPECT_EQ(0, RETRY_EINTR(c2->Poll(100))); +} + +TEST_F(LibBufferHubTest, TestEpoll) { + std::unique_ptr<BufferProducer> p = BufferProducer::Create( + kWidth, kHeight, kFormat, kUsage, sizeof(uint64_t)); + ASSERT_TRUE(p.get() != nullptr); + std::unique_ptr<BufferConsumer> c = + BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(c.get() != nullptr); + + LocalHandle epoll_fd{epoll_create1(EPOLL_CLOEXEC)}; + ASSERT_TRUE(epoll_fd.IsValid()); + + epoll_event event; + std::array<epoll_event, 64> events; + + auto event_sources = p->GetEventSources(); + ASSERT_LT(event_sources.size(), events.size()); + + for (const auto& event_source : event_sources) { + event = {.events = event_source.event_mask | EPOLLET, + .data = {.fd = p->event_fd()}}; + ASSERT_EQ(0, epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, event_source.event_fd, + &event)); + } + + event_sources = c->GetEventSources(); + ASSERT_LT(event_sources.size(), events.size()); + + for (const auto& event_source : event_sources) { + event = {.events = event_source.event_mask | EPOLLET, + .data = {.fd = c->event_fd()}}; + ASSERT_EQ(0, epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, event_source.event_fd, + &event)); + } + + // No events should be signaled initially. + ASSERT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 0)); + + // Post the producer and check for consumer signal. + EXPECT_EQ(0, p->Post({}, kContext)); + ASSERT_EQ(1, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); + ASSERT_TRUE(events[0].events & EPOLLIN); + ASSERT_EQ(c->event_fd(), events[0].data.fd); + + // Save the event bits to translate later. + event = events[0]; + + // Check for events again. Edge-triggered mode should prevent any. + EXPECT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); + EXPECT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); + EXPECT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); + EXPECT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); + + // Translate the events. + auto event_status = c->GetEventMask(event.events); + ASSERT_TRUE(event_status); + ASSERT_TRUE(event_status.get() & EPOLLIN); + + // Check for events again. Edge-triggered mode should prevent any. + EXPECT_EQ(0, epoll_wait(epoll_fd.Get(), events.data(), events.size(), 100)); +} + +TEST_F(LibBufferHubTest, TestStateMask) { + std::unique_ptr<BufferProducer> p = BufferProducer::Create( + kWidth, kHeight, kFormat, kUsage, sizeof(uint64_t)); + ASSERT_TRUE(p.get() != nullptr); + + // It's ok to create up to 63 consumer buffers. + uint64_t buffer_state_bits = p->buffer_state_bit(); + std::array<std::unique_ptr<BufferConsumer>, 63> cs; + for (size_t i = 0; i < 63; i++) { + cs[i] = BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(cs[i].get() != nullptr); + // Expect all buffers have unique state mask. + EXPECT_EQ(buffer_state_bits & cs[i]->buffer_state_bit(), 0); + buffer_state_bits |= cs[i]->buffer_state_bit(); + } + EXPECT_EQ(buffer_state_bits, kProducerStateBit | kConsumerStateMask); + + // The 64th creation will fail with out-of-memory error. + auto state = p->CreateConsumer(); + EXPECT_EQ(state.error(), E2BIG); + + // Release any consumer should allow us to re-create. + for (size_t i = 0; i < 63; i++) { + buffer_state_bits &= ~cs[i]->buffer_state_bit(); + cs[i] = nullptr; + cs[i] = BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(cs[i].get() != nullptr); + // The released state mask will be reused. + EXPECT_EQ(buffer_state_bits & cs[i]->buffer_state_bit(), 0); + buffer_state_bits |= cs[i]->buffer_state_bit(); + EXPECT_EQ(buffer_state_bits, kProducerStateBit | kConsumerStateMask); + } } TEST_F(LibBufferHubTest, TestStateTransitions) { @@ -98,6 +224,7 @@ TEST_F(LibBufferHubTest, TestStateTransitions) { // Release in acquired state should succeed. EXPECT_EQ(0, c->Release(LocalHandle())); + EXPECT_LT(0, RETRY_EINTR(p->Poll(10))); // Release, acquire, and post in released state should fail. EXPECT_EQ(-EBUSY, c->Release(LocalHandle())); @@ -144,6 +271,11 @@ TEST_F(LibBufferHubTest, TestPostWithWrongMetaSize) { int64_t field1; int64_t field2; }; + struct OverSizedMetadata { + int64_t field1; + int64_t field2; + int64_t field3; + }; std::unique_ptr<BufferProducer> p = BufferProducer::Create( kWidth, kHeight, kFormat, kUsage, sizeof(Metadata)); ASSERT_TRUE(p.get() != nullptr); @@ -151,9 +283,16 @@ TEST_F(LibBufferHubTest, TestPostWithWrongMetaSize) { BufferConsumer::Import(p->CreateConsumer()); ASSERT_TRUE(c.get() != nullptr); - int64_t sequence = 3; - EXPECT_NE(0, p->Post(LocalHandle(), sequence)); + // It is illegal to post metadata larger than originally requested during + // buffer allocation. + OverSizedMetadata evil_meta = {}; + EXPECT_NE(0, p->Post(LocalHandle(), evil_meta)); EXPECT_GE(0, RETRY_EINTR(c->Poll(10))); + + // It is ok to post metadata smaller than originally requested during + // buffer allocation. + int64_t sequence = 42; + EXPECT_EQ(0, p->Post(LocalHandle(), sequence)); } TEST_F(LibBufferHubTest, TestAcquireWithWrongMetaSize) { @@ -161,6 +300,11 @@ TEST_F(LibBufferHubTest, TestAcquireWithWrongMetaSize) { int64_t field1; int64_t field2; }; + struct OverSizedMetadata { + int64_t field1; + int64_t field2; + int64_t field3; + }; std::unique_ptr<BufferProducer> p = BufferProducer::Create( kWidth, kHeight, kFormat, kUsage, sizeof(Metadata)); ASSERT_TRUE(p.get() != nullptr); @@ -173,7 +317,16 @@ TEST_F(LibBufferHubTest, TestAcquireWithWrongMetaSize) { LocalHandle fence; int64_t sequence; - EXPECT_NE(0, c->Acquire(&fence, &sequence)); + OverSizedMetadata e; + + // It is illegal to acquire metadata larger than originally requested during + // buffer allocation. + EXPECT_NE(0, c->Acquire(&fence, &e)); + + // It is ok to acquire metadata smaller than originally requested during + // buffer allocation. + EXPECT_EQ(0, c->Acquire(&fence, &sequence)); + EXPECT_EQ(m.field1, sequence); } TEST_F(LibBufferHubTest, TestAcquireWithNoMeta) { @@ -266,12 +419,140 @@ TEST_F(LibBufferHubTest, TestRemovePersistentBuffer) { LocalHandle fence; auto c = BufferConsumer::Import(p->CreateConsumer()); ASSERT_NE(nullptr, c); - EXPECT_NE(-EPIPE, c->Acquire(&fence)); + EXPECT_EQ(0, p->Post<void>(LocalHandle())); + EXPECT_EQ(0, c->Acquire(&fence)); + EXPECT_EQ(0, c->Release(LocalHandle())); + EXPECT_LT(0, RETRY_EINTR(p->Poll(10))); // Test that removing persistence and closing the producer orphans the // consumer. + EXPECT_EQ(0, p->Gain(&fence)); + EXPECT_EQ(0, p->Post<void>(LocalHandle())); EXPECT_EQ(0, p->RemovePersistence()); p = nullptr; + // Orphaned consumer can acquire the posted buffer one more time in + // asynchronous manner. But synchronous call will fail. + DvrNativeBufferMetadata meta; + EXPECT_EQ(0, c->AcquireAsync(&meta, &fence)); EXPECT_EQ(-EPIPE, c->Release(LocalHandle())); } + +namespace { + +int PollFd(int fd, int timeout_ms) { + pollfd p = {fd, POLLIN, 0}; + return poll(&p, 1, timeout_ms); +} + +} // namespace + +TEST_F(LibBufferHubTest, TestAcquireFence) { + std::unique_ptr<BufferProducer> p = BufferProducer::Create( + kWidth, kHeight, kFormat, kUsage, /*metadata_size=*/0); + ASSERT_TRUE(p.get() != nullptr); + std::unique_ptr<BufferConsumer> c = + BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(c.get() != nullptr); + + DvrNativeBufferMetadata meta; + LocalHandle f1(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); + + // Post with unsignaled fence. + EXPECT_EQ(0, p->PostAsync(&meta, f1)); + + // Should acquire a valid fence. + LocalHandle f2; + EXPECT_LT(0, RETRY_EINTR(c->Poll(10))); + EXPECT_EQ(0, c->AcquireAsync(&meta, &f2)); + EXPECT_TRUE(f2.IsValid()); + // The original fence and acquired fence should have different fd number. + EXPECT_NE(f1.Get(), f2.Get()); + EXPECT_GE(0, PollFd(f2.Get(), 0)); + + // Signal the original fence will trigger the new fence. + eventfd_write(f1.Get(), 1); + // Now the original FD has been signaled. + EXPECT_LT(0, PollFd(f2.Get(), 10)); + + // Release the consumer with an invalid fence. + EXPECT_EQ(0, c->ReleaseAsync(&meta, LocalHandle())); + + // Should gain an invalid fence. + LocalHandle f3; + EXPECT_LT(0, RETRY_EINTR(p->Poll(10))); + EXPECT_EQ(0, p->GainAsync(&meta, &f3)); + EXPECT_FALSE(f3.IsValid()); + + // Post with a signaled fence. + EXPECT_EQ(0, p->PostAsync(&meta, f1)); + + // Should acquire a valid fence and it's already signalled. + LocalHandle f4; + EXPECT_LT(0, RETRY_EINTR(c->Poll(10))); + EXPECT_EQ(0, c->AcquireAsync(&meta, &f4)); + EXPECT_TRUE(f4.IsValid()); + EXPECT_LT(0, PollFd(f4.Get(), 10)); + + // Release with an unsignalled fence and signal it immediately after release + // without producer gainning. + LocalHandle f5(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); + EXPECT_EQ(0, c->ReleaseAsync(&meta, f5)); + eventfd_write(f5.Get(), 1); + + // Should gain a valid fence, which is already signaled. + LocalHandle f6; + EXPECT_LT(0, RETRY_EINTR(p->Poll(10))); + EXPECT_EQ(0, p->GainAsync(&meta, &f6)); + EXPECT_TRUE(f6.IsValid()); + EXPECT_LT(0, PollFd(f6.Get(), 10)); +} + +TEST_F(LibBufferHubTest, TestOrphanedAcquire) { + std::unique_ptr<BufferProducer> p = BufferProducer::Create( + kWidth, kHeight, kFormat, kUsage, sizeof(uint64_t)); + ASSERT_TRUE(p.get() != nullptr); + std::unique_ptr<BufferConsumer> c1 = + BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(c1.get() != nullptr); + const uint64_t consumer_state_bit1 = c1->buffer_state_bit(); + + DvrNativeBufferMetadata meta; + EXPECT_EQ(0, p->PostAsync(&meta, LocalHandle())); + + LocalHandle fence; + EXPECT_LT(0, RETRY_EINTR(c1->Poll(10))); + EXPECT_LE(0, c1->AcquireAsync(&meta, &fence)); + // Destroy the consumer now will make it orphaned and the buffer is still + // acquired. + c1 = nullptr; + EXPECT_GE(0, RETRY_EINTR(p->Poll(10))); + + std::unique_ptr<BufferConsumer> c2 = + BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(c2.get() != nullptr); + const uint64_t consumer_state_bit2 = c2->buffer_state_bit(); + EXPECT_NE(consumer_state_bit1, consumer_state_bit2); + + // The new consumer is available for acquire. + EXPECT_LT(0, RETRY_EINTR(c2->Poll(10))); + EXPECT_LE(0, c2->AcquireAsync(&meta, &fence)); + // Releasing the consumer makes the buffer gainable. + EXPECT_EQ(0, c2->ReleaseAsync(&meta, LocalHandle())); + + // The buffer is now available for the producer to gain. + EXPECT_LT(0, RETRY_EINTR(p->Poll(10))); + + // But if another consumer is created in released state. + std::unique_ptr<BufferConsumer> c3 = + BufferConsumer::Import(p->CreateConsumer()); + ASSERT_TRUE(c3.get() != nullptr); + const uint64_t consumer_state_bit3 = c3->buffer_state_bit(); + EXPECT_NE(consumer_state_bit2, consumer_state_bit3); + // The consumer buffer is not acquirable. + EXPECT_GE(0, RETRY_EINTR(c3->Poll(10))); + EXPECT_EQ(-EBUSY, c3->AcquireAsync(&meta, &fence)); + + // Producer should be able to gain no matter what. + EXPECT_EQ(0, p->GainAsync(&meta, &fence)); +} diff --git a/libs/vr/libbufferhub/include/private/dvr/buffer_hub_client.h b/libs/vr/libbufferhub/include/private/dvr/buffer_hub_client.h index be20e72a84..1186f9348d 100644 --- a/libs/vr/libbufferhub/include/private/dvr/buffer_hub_client.h +++ b/libs/vr/libbufferhub/include/private/dvr/buffer_hub_client.h @@ -11,6 +11,8 @@ #include <private/dvr/ion_buffer.h> +#include "bufferhub_rpc.h" + namespace android { namespace dvr { @@ -75,6 +77,14 @@ class BufferHubBuffer : public pdx::Client { } } + std::vector<pdx::ClientChannel::EventSource> GetEventSources() const { + if (auto* client_channel = GetChannel()) { + return client_channel->GetEventSources(); + } else { + return {}; + } + } + native_handle_t* native_handle() const { return const_cast<native_handle_t*>(buffer_.handle()); } @@ -84,6 +94,10 @@ class BufferHubBuffer : public pdx::Client { int id() const { return id_; } + // A state mask which is unique to a buffer hub client among all its siblings + // sharing the same concrete graphic buffer. + uint64_t buffer_state_bit() const { return buffer_state_bit_; } + // The following methods return settings of the first buffer. Currently, // it is only possible to create multi-buffer BufferHubBuffers with the same // settings. @@ -98,6 +112,9 @@ class BufferHubBuffer : public pdx::Client { uint64_t producer_usage() const { return buffer_.usage(); } uint64_t consumer_usage() const { return buffer_.usage(); } + uint64_t GetQueueIndex() const { return metadata_header_->queue_index; } + void SetQueueIndex(uint64_t index) { metadata_header_->queue_index = index; } + protected: explicit BufferHubBuffer(LocalChannelHandle channel); explicit BufferHubBuffer(const std::string& endpoint_path); @@ -106,6 +123,31 @@ class BufferHubBuffer : public pdx::Client { // Initialization helper. int ImportBuffer(); + // Check invalid metadata operation. Returns 0 if requested metadata is valid. + int CheckMetadata(size_t user_metadata_size) const; + + // Send out the new fence by updating the shared fence (shared_release_fence + // for producer and shared_acquire_fence for consumer). Note that during this + // should only be used in LocalPost() or LocalRelease, and the shared fence + // shouldn't be poll'ed by the other end. + int UpdateSharedFence(const LocalHandle& new_fence, + const LocalHandle& shared_fence); + + // IonBuffer that is shared between bufferhubd, producer, and consumers. + size_t metadata_buf_size_{0}; + size_t user_metadata_size_{0}; + BufferHubDefs::MetadataHeader* metadata_header_{nullptr}; + void* user_metadata_ptr_{nullptr}; + std::atomic<uint64_t>* buffer_state_{nullptr}; + std::atomic<uint64_t>* fence_state_{nullptr}; + + LocalHandle shared_acquire_fence_; + LocalHandle shared_release_fence_; + + // A local fence fd that holds the ownership of the fence fd on Post (for + // producer) and Release (for consumer). + LocalHandle pending_fence_fd_; + private: BufferHubBuffer(const BufferHubBuffer&) = delete; void operator=(const BufferHubBuffer&) = delete; @@ -114,8 +156,9 @@ class BufferHubBuffer : public pdx::Client { // for logging and debugging purposes only and should not be used for lookup // or any other functional purpose as a security precaution. int id_; - + uint64_t buffer_state_bit_{0ULL}; IonBuffer buffer_; + IonBuffer metadata_buffer_; }; // This represents a writable buffer. Calling Post notifies all clients and @@ -136,12 +179,17 @@ class BufferProducer : public pdx::ClientBase<BufferProducer, BufferHubBuffer> { static std::unique_ptr<BufferProducer> Import( Status<LocalChannelHandle> status); + // Asynchronously posts a buffer. The fence and metadata are passed to + // consumer via shared fd and shared memory. + int PostAsync(const DvrNativeBufferMetadata* meta, + const LocalHandle& ready_fence); + // Post this buffer, passing |ready_fence| to the consumers. The bytes in // |meta| are passed unaltered to the consumers. The producer must not modify // the buffer until it is re-gained. // This returns zero or a negative unix error code. int Post(const LocalHandle& ready_fence, const void* meta, - size_t meta_size_bytes); + size_t user_metadata_size); template <typename Meta, typename = typename std::enable_if<std::is_void<Meta>::value>::type> @@ -160,16 +208,15 @@ class BufferProducer : public pdx::ClientBase<BufferProducer, BufferHubBuffer> { // is in the released state. // This returns zero or a negative unix error code. int Gain(LocalHandle* release_fence); + int GainAsync(); // Asynchronously marks a released buffer as gained. This method is similar to // the synchronous version above, except that it does not wait for BufferHub - // to acknowledge success or failure, nor does it transfer a release fence to - // the client. This version may be used in situations where a release fence is - // not needed. Because of the asynchronous nature of the underlying message, - // no error is returned if this method is called when the buffer is in an - // incorrect state. Returns zero if sending the message succeeded, or a - // negative errno code otherwise. - int GainAsync(); + // to acknowledge success or failure. Because of the asynchronous nature of + // the underlying message, no error is returned if this method is called when + // the buffer is in an incorrect state. Returns zero if sending the message + // succeeded, or a negative errno code if local error check fails. + int GainAsync(DvrNativeBufferMetadata* out_meta, LocalHandle* out_fence); // Attaches the producer to |name| so that it becomes a persistent buffer that // may be retrieved by name at a later time. This may be used in cases where a @@ -216,7 +263,7 @@ class BufferProducer : public pdx::ClientBase<BufferProducer, BufferHubBuffer> { BufferProducer(const std::string& name, int user_id, int group_id, uint32_t width, uint32_t height, uint32_t format, uint64_t producer_usage, uint64_t consumer_usage, - size_t meta_size_bytes); + size_t user_metadata_size); // Constructs a blob (flat) buffer with the given usage flags. BufferProducer(uint32_t usage, size_t size); @@ -234,6 +281,11 @@ class BufferProducer : public pdx::ClientBase<BufferProducer, BufferHubBuffer> { // Imports the given file handle to a producer channel, taking ownership. explicit BufferProducer(LocalChannelHandle channel); + + // Local state transition helpers. + int LocalGain(DvrNativeBufferMetadata* out_meta, LocalHandle* out_fence); + int LocalPost(const DvrNativeBufferMetadata* meta, + const LocalHandle& ready_fence); }; // This is a connection to a producer buffer, which can be located in another @@ -263,7 +315,7 @@ class BufferConsumer : public pdx::ClientBase<BufferConsumer, BufferHubBuffer> { // are available. This call will only succeed if the buffer is in the posted // state. // Returns zero on success, or a negative errno code otherwise. - int Acquire(LocalHandle* ready_fence, void* meta, size_t meta_size_bytes); + int Acquire(LocalHandle* ready_fence, void* meta, size_t user_metadata_size); // Attempt to retrieve a post event from buffer hub. If successful, // |ready_fence| is set to a fence to wait on until the buffer is ready. This @@ -274,20 +326,22 @@ class BufferConsumer : public pdx::ClientBase<BufferConsumer, BufferHubBuffer> { return Acquire(ready_fence, meta, sizeof(*meta)); } + // Asynchronously acquires a bufer. + int AcquireAsync(DvrNativeBufferMetadata* out_meta, LocalHandle* out_fence); + // This should be called after a successful Acquire call. If the fence is // valid the fence determines the buffer usage, otherwise the buffer is // released immediately. // This returns zero or a negative unix error code. int Release(const LocalHandle& release_fence); + int ReleaseAsync(); // Asynchronously releases a buffer. Similar to the synchronous version above, - // except that it does not wait for BufferHub to reply with success or error, - // nor does it transfer a release fence. This version may be used in - // situations where a release fence is not needed. Because of the asynchronous - // nature of the underlying message, no error is returned if this method is - // called when the buffer is in an incorrect state. Returns zero if sending - // the message succeeded, or a negative errno code otherwise. - int ReleaseAsync(); + // except that it does not wait for BufferHub to reply with success or error. + // The fence and metadata are passed to consumer via shared fd and shared + // memory. + int ReleaseAsync(const DvrNativeBufferMetadata* meta, + const LocalHandle& release_fence); // May be called after or instead of Acquire to indicate that the consumer // does not need to access the buffer this cycle. This returns zero or a @@ -305,6 +359,11 @@ class BufferConsumer : public pdx::ClientBase<BufferConsumer, BufferHubBuffer> { friend BASE; explicit BufferConsumer(LocalChannelHandle channel); + + // Local state transition helpers. + int LocalAcquire(DvrNativeBufferMetadata* out_meta, LocalHandle* out_fence); + int LocalRelease(const DvrNativeBufferMetadata* meta, + const LocalHandle& release_fence); }; } // namespace dvr diff --git a/libs/vr/libbufferhub/include/private/dvr/bufferhub_rpc.h b/libs/vr/libbufferhub/include/private/dvr/bufferhub_rpc.h index ca0e0e0820..f9fd42d7bb 100644 --- a/libs/vr/libbufferhub/include/private/dvr/bufferhub_rpc.h +++ b/libs/vr/libbufferhub/include/private/dvr/bufferhub_rpc.h @@ -5,6 +5,7 @@ #include <gui/BufferQueueDefs.h> #include <sys/types.h> +#include <dvr/dvr_api.h> #include <pdx/channel_handle.h> #include <pdx/file_handle.h> #include <pdx/rpc/remote_method.h> @@ -14,6 +15,71 @@ namespace android { namespace dvr { +namespace BufferHubDefs { + +static constexpr uint32_t kMetadataFormat = HAL_PIXEL_FORMAT_BLOB; +static constexpr uint32_t kMetadataUsage = + GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN; + +// Single producuer multiple (up to 63) consumers ownership signal. +// 64-bit atomic unsigned int. +// +// MSB LSB +// | | +// v v +// [P|C62|...|C1|C0] +// Gain'ed state: [0|..|0|0] -> Exclusively Writable. +// Post'ed state: [1|..|0|0] +// Acquired'ed state: [1|..|X|X] -> At least one bit is set in lower 63 bits +// Released'ed state: [0|..|X|X] -> At least one bit is set in lower 63 bits +static constexpr uint64_t kProducerStateBit = 1ULL << 63; +static constexpr uint64_t kConsumerStateMask = (1ULL << 63) - 1; + +static inline void ModifyBufferState(std::atomic<uint64_t>* buffer_state, + uint64_t clear_mask, uint64_t set_mask) { + uint64_t old_state; + uint64_t new_state; + do { + old_state = buffer_state->load(); + new_state = (old_state & ~clear_mask) | set_mask; + } while (!buffer_state->compare_exchange_weak(old_state, new_state)); +} + +static inline bool IsBufferGained(uint64_t state) { return state == 0; } + +static inline bool IsBufferPosted(uint64_t state, + uint64_t consumer_bit = kConsumerStateMask) { + return (state & kProducerStateBit) && !(state & consumer_bit); +} + +static inline bool IsBufferAcquired(uint64_t state) { + return (state & kProducerStateBit) && (state & kConsumerStateMask); +} + +static inline bool IsBufferReleased(uint64_t state) { + return !(state & kProducerStateBit) && (state & kConsumerStateMask); +} + +struct __attribute__((packed, aligned(8))) MetadataHeader { + // Internal data format, which can be updated as long as the size, padding and + // field alignment of the struct is consistent within the same ABI. As this + // part is subject for future updates, it's not stable cross Android version, + // so don't have it visible from outside of the Android platform (include Apps + // and vendor HAL). + std::atomic<uint64_t> buffer_state; + std::atomic<uint64_t> fence_state; + uint64_t queue_index; + + // Public data format, which should be updated with caution. See more details + // in dvr_api.h + DvrNativeBufferMetadata metadata; +}; + +static_assert(sizeof(MetadataHeader) == 128, "Unexpected MetadataHeader size"); +static constexpr size_t kMetadataHeaderSize = sizeof(MetadataHeader); + +} // namespace BufferHubDefs + template <typename FileHandleType> class NativeBufferHandle { public: @@ -93,6 +159,57 @@ class NativeBufferHandle { void operator=(const NativeBufferHandle&) = delete; }; +template <typename FileHandleType> +class BufferDescription { + public: + BufferDescription() = default; + BufferDescription(const IonBuffer& buffer, const IonBuffer& metadata, int id, + uint64_t buffer_state_bit, + const FileHandleType& acquire_fence_fd, + const FileHandleType& release_fence_fd) + : id_(id), + buffer_state_bit_(buffer_state_bit), + buffer_(buffer, id), + metadata_(metadata, id), + acquire_fence_fd_(acquire_fence_fd.Borrow()), + release_fence_fd_(release_fence_fd.Borrow()) {} + + BufferDescription(BufferDescription&& other) = default; + BufferDescription& operator=(BufferDescription&& other) = default; + + // ID of the buffer client. All BufferHubBuffer clients derived from the same + // buffer in bufferhubd share the same buffer id. + int id() const { return id_; } + // State mask of the buffer client. Each BufferHubBuffer client backed by the + // same buffer channel has uniqued state bit among its siblings. For a + // producer buffer the bit must be kProducerStateBit; for a consumer the bit + // must be one of the kConsumerStateMask. + uint64_t buffer_state_bit() const { return buffer_state_bit_; } + FileHandleType take_acquire_fence() { return std::move(acquire_fence_fd_); } + FileHandleType take_release_fence() { return std::move(release_fence_fd_); } + + int ImportBuffer(IonBuffer* buffer) { return buffer_.Import(buffer); } + int ImportMetadata(IonBuffer* metadata) { return metadata_.Import(metadata); } + + private: + int id_{-1}; + uint64_t buffer_state_bit_{0}; + // Two IonBuffers: one for the graphic buffer and one for metadata. + NativeBufferHandle<FileHandleType> buffer_; + NativeBufferHandle<FileHandleType> metadata_; + + // Pamameters for shared fences. + FileHandleType acquire_fence_fd_; + FileHandleType release_fence_fd_; + + PDX_SERIALIZABLE_MEMBERS(BufferDescription<FileHandleType>, id_, + buffer_state_bit_, buffer_, metadata_, + acquire_fence_fd_, release_fence_fd_); + + BufferDescription(const BufferDescription&) = delete; + void operator=(const BufferDescription&) = delete; +}; + using BorrowedNativeBufferHandle = NativeBufferHandle<pdx::BorrowedHandle>; using LocalNativeBufferHandle = NativeBufferHandle<pdx::LocalHandle>; @@ -149,11 +266,11 @@ struct ProducerQueueConfig { // Size of the meta data associated with all the buffers allocated from the // queue. - size_t meta_size_bytes; + size_t user_metadata_size; private: PDX_SERIALIZABLE_MEMBERS(ProducerQueueConfig, is_async, default_width, - default_height, default_format, meta_size_bytes); + default_height, default_format, user_metadata_size); }; class ProducerQueueConfigBuilder { @@ -161,7 +278,7 @@ class ProducerQueueConfigBuilder { // Build a ProducerQueueConfig object. ProducerQueueConfig Build() { return {is_async_, default_width_, default_height_, default_format_, - meta_size_bytes_}; + user_metadata_size_}; } ProducerQueueConfigBuilder& SetIsAsync(bool is_async) { @@ -186,12 +303,12 @@ class ProducerQueueConfigBuilder { template <typename Meta> ProducerQueueConfigBuilder& SetMetadata() { - meta_size_bytes_ = sizeof(Meta); + user_metadata_size_ = sizeof(Meta); return *this; } - ProducerQueueConfigBuilder& SetMetadataSize(size_t meta_size_bytes) { - meta_size_bytes_ = meta_size_bytes; + ProducerQueueConfigBuilder& SetMetadataSize(size_t user_metadata_size) { + user_metadata_size_ = user_metadata_size; return *this; } @@ -200,7 +317,7 @@ class ProducerQueueConfigBuilder { uint32_t default_width_{1}; uint32_t default_height_{1}; uint32_t default_format_{1}; // PIXEL_FORMAT_RGBA_8888 - size_t meta_size_bytes_{0}; + size_t user_metadata_size_{0}; }; // Explicit specializations of ProducerQueueConfigBuilder::Build for void @@ -208,7 +325,7 @@ class ProducerQueueConfigBuilder { template <> inline ProducerQueueConfigBuilder& ProducerQueueConfigBuilder::SetMetadata<void>() { - meta_size_bytes_ = 0; + user_metadata_size_ = 0; return *this; } @@ -269,7 +386,6 @@ struct BufferHubRPC { }; // Aliases. - using MetaData = pdx::rpc::BufferWrapper<std::uint8_t*>; using LocalChannelHandle = pdx::LocalChannelHandle; using LocalHandle = pdx::LocalHandle; using Void = pdx::rpc::Void; @@ -277,25 +393,24 @@ struct BufferHubRPC { // Methods. PDX_REMOTE_METHOD(CreateBuffer, kOpCreateBuffer, void(uint32_t width, uint32_t height, uint32_t format, - uint64_t usage, size_t meta_size_bytes)); + uint64_t usage, size_t user_metadata_size)); PDX_REMOTE_METHOD(CreatePersistentBuffer, kOpCreatePersistentBuffer, void(const std::string& name, int user_id, int group_id, uint32_t width, uint32_t height, uint32_t format, - uint64_t usage, size_t meta_size_bytes)); + uint64_t usage, size_t user_metadata_size)); PDX_REMOTE_METHOD(GetPersistentBuffer, kOpGetPersistentBuffer, void(const std::string& name)); PDX_REMOTE_METHOD(GetBuffer, kOpGetBuffer, - NativeBufferHandle<LocalHandle>(Void)); + BufferDescription<LocalHandle>(Void)); PDX_REMOTE_METHOD(NewConsumer, kOpNewConsumer, LocalChannelHandle(Void)); PDX_REMOTE_METHOD(ProducerMakePersistent, kOpProducerMakePersistent, void(const std::string& name, int user_id, int group_id)); PDX_REMOTE_METHOD(ProducerRemovePersistence, kOpProducerRemovePersistence, void(Void)); PDX_REMOTE_METHOD(ProducerPost, kOpProducerPost, - void(LocalFence acquire_fence, MetaData)); + void(LocalFence acquire_fence)); PDX_REMOTE_METHOD(ProducerGain, kOpProducerGain, LocalFence(Void)); - PDX_REMOTE_METHOD(ConsumerAcquire, kOpConsumerAcquire, - std::pair<LocalFence, MetaData>(std::size_t metadata_size)); + PDX_REMOTE_METHOD(ConsumerAcquire, kOpConsumerAcquire, LocalFence(Void)); PDX_REMOTE_METHOD(ConsumerRelease, kOpConsumerRelease, void(LocalFence release_fence)); PDX_REMOTE_METHOD(ConsumerSetIgnore, kOpConsumerSetIgnore, void(bool ignore)); @@ -305,7 +420,7 @@ struct BufferHubRPC { QueueInfo(const ProducerQueueConfig& producer_config, const UsagePolicy& usage_policy)); PDX_REMOTE_METHOD(CreateConsumerQueue, kOpCreateConsumerQueue, - LocalChannelHandle(Void)); + LocalChannelHandle(bool silent_queue)); PDX_REMOTE_METHOD(GetQueueInfo, kOpGetQueueInfo, QueueInfo(Void)); PDX_REMOTE_METHOD(ProducerQueueAllocateBuffers, kOpProducerQueueAllocateBuffers, diff --git a/libs/vr/libbufferhubqueue/Android.bp b/libs/vr/libbufferhubqueue/Android.bp index 0b3b2f0fb9..93ccd0fda5 100644 --- a/libs/vr/libbufferhubqueue/Android.bp +++ b/libs/vr/libbufferhubqueue/Android.bp @@ -48,6 +48,7 @@ cc_library { cflags: [ "-DLOG_TAG=\"libbufferhubqueue\"", "-DTRACE=0", + "-DATRACE_TAG=ATRACE_TAG_GRAPHICS", ], srcs: sourceFiles, export_include_dirs: includeFiles, diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp index f9f87ff1b8..8bea0cde7a 100644 --- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp @@ -10,6 +10,7 @@ #include <pdx/default_transport/client_channel.h> #include <pdx/default_transport/client_channel_factory.h> #include <pdx/file_handle.h> +#include <pdx/trace.h> #define RETRY_EINTR(fnc_call) \ ([&]() -> decltype(fnc_call) { \ @@ -44,17 +45,6 @@ Status<int> PollEvents(int fd, short events) { } } -// Polls a buffer for the given events, taking care to do the proper -// translation. -Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer, - short events) { - auto poll_status = PollEvents(buffer->event_fd(), events); - if (!poll_status) - return poll_status; - - return buffer->GetEventMask(poll_status.get()); -} - std::pair<int32_t, int32_t> Unstuff(uint64_t value) { return {static_cast<int32_t>(value >> 32), static_cast<int32_t>(value & ((1ull << 32) - 1))}; @@ -115,27 +105,27 @@ void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) { default_width_ = queue_info.producer_config.default_width; default_height_ = queue_info.producer_config.default_height; default_format_ = queue_info.producer_config.default_format; - meta_size_ = queue_info.producer_config.meta_size_bytes; + user_metadata_size_ = queue_info.producer_config.user_metadata_size; id_ = queue_info.id; } std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() { - if (auto status = CreateConsumerQueueHandle()) + if (auto status = CreateConsumerQueueHandle(/*silent*/ false)) return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); else return nullptr; } std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() { - if (auto status = CreateConsumerQueueHandle()) - return std::unique_ptr<ConsumerQueue>( - new ConsumerQueue(status.take(), true)); + if (auto status = CreateConsumerQueueHandle(/*silent*/ true)) + return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); else return nullptr; } -Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() { - auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(); +Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle( + bool silent) { + auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent); if (!status) { ALOGE( "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: " @@ -148,6 +138,7 @@ Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() { } bool BufferHubQueue::WaitForBuffers(int timeout) { + ATRACE_NAME("BufferHubQueue::WaitForBuffers"); std::array<epoll_event, kMaxEvents> events; // Loop at least once to check for hangups. @@ -178,13 +169,18 @@ bool BufferHubQueue::WaitForBuffers(int timeout) { const int num_events = ret; // A BufferQueue's epoll fd tracks N+1 events, where there are N events, - // one for each buffer, in the queue and one extra event for the queue + // one for each buffer in the queue, and one extra event for the queue // client itself. for (int i = 0; i < num_events; i++) { int32_t event_fd; int32_t index; std::tie(event_fd, index) = Unstuff(events[i].data.u64); + PDX_TRACE_FORMAT( + "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;" + "slot=%d|", + id(), num_events, i, event_fd, index); + ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d", i, event_fd, index); @@ -208,6 +204,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) { Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, int poll_events) { + ATRACE_NAME("BufferHubQueue::HandleBufferEvent"); if (!buffers_[slot]) { ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot); return ErrorStatus(ENOENT); @@ -221,58 +218,19 @@ Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, } const int events = status.get(); + PDX_TRACE_FORMAT( + "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;" + "events=%d|", + id(), buffers_[slot]->id(), slot, event_fd, poll_events, events); + if (events & EPOLLIN) { - auto entry_status = OnBufferReady(buffers_[slot], slot); - if (entry_status.ok() || entry_status.error() == EALREADY) { - // Only enqueue the buffer if it moves to or is already in the state - // requested in OnBufferReady(). - return Enqueue(entry_status.take()); - } else if (entry_status.error() == EBUSY) { - // If the buffer is busy this means that the buffer moved from released to - // posted when a new consumer was created before the ProducerQueue had a - // chance to regain it. This is a valid transition that we have to handle - // because edge triggered poll events latch the ready state even if it is - // later de-asserted -- don't enqueue or print an error log in this case. - } else { - ALOGE( - "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, " - "queue_id=%d buffer_id=%d: %s", - id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str()); - } + return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()}); } else if (events & EPOLLHUP) { - // Check to see if the current buffer in the slot hung up. This is a bit of - // paranoia to deal with the epoll set getting out of sync with the buffer - // slots. - auto poll_status = PollEvents(buffers_[slot], POLLIN); - if (!poll_status && poll_status.error() != ETIMEDOUT) { - ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s", - poll_status.GetErrorMessage().c_str()); - return poll_status.error_status(); - } - - const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP); - ALOGW( "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu " - "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x", - slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending, - poll_status.get()); - - if (hangup_pending) { - return RemoveBuffer(slot); - } else { - // Clean up the bookkeeping for the event fd. This is a bit of paranoia to - // deal with the epoll set getting out of sync with the buffer slots. - // Hitting this path should be very unusual. - const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr); - if (ret < 0) { - ALOGE( - "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from " - "epoll set: %s", - event_fd, strerror(-ret)); - return ErrorStatus(-ret); - } - } + "event_fd=%d buffer_id=%d", + slot, buffers_[slot]->event_fd(), buffers_[slot]->id()); + return RemoveBuffer(slot); } else { ALOGW( "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll " @@ -284,6 +242,7 @@ Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, } Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { + ATRACE_NAME("BufferHubQueue::HandleQueueEvent"); auto status = GetEventMask(poll_event); if (!status) { ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s", @@ -330,13 +289,16 @@ Status<void> BufferHubQueue::AddBuffer( return remove_status.error_status(); } - epoll_event event = {.events = EPOLLIN | EPOLLET, - .data = {.u64 = Stuff(buffer->event_fd(), slot)}}; - const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event); - if (ret < 0) { - ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", - strerror(-ret)); - return ErrorStatus(-ret); + for (const auto& event_source : buffer->GetEventSources()) { + epoll_event event = {.events = event_source.event_mask | EPOLLET, + .data = {.u64 = Stuff(buffer->event_fd(), slot)}}; + const int ret = + epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); + if (ret < 0) { + ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", + strerror(-ret)); + return ErrorStatus(-ret); + } } buffers_[slot] = buffer; @@ -348,15 +310,16 @@ Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); if (buffers_[slot]) { - const int ret = - epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr); - if (ret < 0) { - ALOGE( - "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " - "set: " - "%s", - strerror(-ret)); - return ErrorStatus(-ret); + for (const auto& event_source : buffers_[slot]->GetEventSources()) { + const int ret = + epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); + if (ret < 0) { + ALOGE( + "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " + "set: %s", + strerror(-ret)); + return ErrorStatus(-ret); + } } // Trigger OnBufferRemoved callback if registered. @@ -372,7 +335,7 @@ Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { Status<void> BufferHubQueue::Enqueue(Entry entry) { if (!is_full()) { - available_buffers_.Append(std::move(entry)); + available_buffers_.push(std::move(entry)); // Trigger OnBufferAvailable callback if registered. if (on_buffer_available_) @@ -385,25 +348,26 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) { } } -Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue( - int timeout, size_t* slot, void* meta, LocalHandle* fence) { +Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout, + size_t* slot) { ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), timeout); - if (!WaitForBuffers(timeout)) - return ErrorStatus(ETIMEDOUT); + PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); + + if (count() == 0) { + if (!WaitForBuffers(timeout)) + return ErrorStatus(ETIMEDOUT); + } - auto& entry = available_buffers_.Front(); + auto& entry = available_buffers_.top(); + PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(), + entry.slot); std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer); *slot = entry.slot; - *fence = std::move(entry.fence); - if (meta && entry.metadata) { - std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_, - reinterpret_cast<uint8_t*>(meta)); - } - available_buffers_.PopFront(); + available_buffers_.pop(); return {std::move(buffer)}; } @@ -419,7 +383,8 @@ void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) { pdx::Status<void> BufferHubQueue::FreeAllBuffers() { // Clear all available buffers. - available_buffers_.Clear(); + while (!available_buffers_.empty()) + available_buffers_.pop(); pdx::Status<void> last_error; // No error. // Clear all buffers this producer queue is tracking. @@ -429,7 +394,7 @@ pdx::Status<void> BufferHubQueue::FreeAllBuffers() { if (!status) { ALOGE( "ProducerQueue::FreeAllBuffers: Failed to remove buffer at " - "slot=%d.", + "slot=%zu.", slot); last_error = status.error_status(); } @@ -548,7 +513,7 @@ Status<void> ProducerQueue::AddBuffer( if (!status) return status; - return Enqueue(buffer, slot); + return BufferHubQueue::Enqueue({buffer, slot, 0ULL}); } Status<void> ProducerQueue::RemoveBuffer(size_t slot) { @@ -565,40 +530,33 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) { Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( int timeout, size_t* slot, LocalHandle* release_fence) { - if (slot == nullptr || release_fence == nullptr) { - ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p", - slot, release_fence); - return ErrorStatus(EINVAL); - } - - auto buffer_status = - BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence); - if (!buffer_status) - return buffer_status.error_status(); - - return {std::static_pointer_cast<BufferProducer>(buffer_status.take())}; + DvrNativeBufferMetadata canonical_meta; + return Dequeue(timeout, slot, &canonical_meta, release_fence); } -Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady( - const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) { - ALOGD_IF(TRACE, - "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu", - id(), buffer->id(), slot); +pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( + int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, + pdx::LocalHandle* release_fence) { + ATRACE_NAME("ProducerQueue::Dequeue"); + if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { + ALOGE("ProducerQueue::Dequeue: Invalid parameter."); + return ErrorStatus(EINVAL); + } - // Avoid taking a transient reference, buffer is valid for the duration of - // this method call. - auto* producer_buffer = static_cast<BufferProducer*>(buffer.get()); - LocalHandle release_fence; + auto status = BufferHubQueue::Dequeue(timeout, slot); + if (!status) + return status.error_status(); - const int ret = producer_buffer->Gain(&release_fence); - if (ret < 0) + auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); + const int ret = buffer->GainAsync(out_meta, release_fence); + if (ret < 0 && ret != -EALREADY) return ErrorStatus(-ret); - else - return {{buffer, nullptr, std::move(release_fence), slot}}; + + return {std::move(buffer)}; } -ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import) - : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) { +ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) + : BufferHubQueue(std::move(handle)) { auto status = ImportQueue(); if (!status) { ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", @@ -619,9 +577,17 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import) Status<size_t> ConsumerQueue::ImportBuffers() { auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); if (!status) { - ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", + if (status.error() == EBADR) { + ALOGI( + "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " + "imported."); + return {0}; + } else { + ALOGE( + "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", status.GetErrorMessage().c_str()); - return status.error_status(); + return status.error_status(); + } } int ret; @@ -642,22 +608,6 @@ Status<size_t> ConsumerQueue::ImportBuffers() { continue; } - // Setup ignore state before adding buffer to the queue. - if (ignore_on_import_) { - ALOGD_IF(TRACE, - "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: " - "buffer_id=%d", - buffer_consumer->id()); - ret = buffer_consumer->SetIgnore(true); - if (ret < 0) { - ALOGE( - "ConsumerQueue::ImportBuffers: Failed to set ignored state on " - "imported buffer buffer_id=%d: %s", - buffer_consumer->id(), strerror(-ret)); - last_error = ErrorStatus(-ret); - } - } - auto add_status = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); if (!add_status) { @@ -685,7 +635,7 @@ Status<void> ConsumerQueue::AddBuffer( // Check to see if the buffer is already signaled. This is necessary to catch // cases where buffers are already available; epoll edge triggered mode does - // not fire until and edge transition when adding new buffers to the epoll + // not fire until an edge transition when adding new buffers to the epoll // set. Note that we only poll the fd events because HandleBufferEvent() takes // care of checking the translated buffer events. auto poll_status = PollEvents(buffer->event_fd(), POLLIN); @@ -703,51 +653,53 @@ Status<void> ConsumerQueue::AddBuffer( } Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( - int timeout, size_t* slot, void* meta, size_t meta_size, + int timeout, size_t* slot, void* meta, size_t user_metadata_size, LocalHandle* acquire_fence) { - if (meta_size != meta_size_) { + if (user_metadata_size != user_metadata_size_) { ALOGE( "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " "does not match metadata size (%zu) for the queue.", - meta_size, meta_size_); + user_metadata_size, user_metadata_size_); return ErrorStatus(EINVAL); } - if (slot == nullptr || acquire_fence == nullptr) { - ALOGE( - "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p " - "acquire_fence=%p", - slot, meta, acquire_fence); - return ErrorStatus(EINVAL); - } + DvrNativeBufferMetadata canonical_meta; + auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence); + if (!status) + return status.error_status(); - auto buffer_status = - BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence); - if (!buffer_status) - return buffer_status.error_status(); + if (meta && user_metadata_size) { + void* metadata_src = + reinterpret_cast<void*>(canonical_meta.user_metadata_ptr); + if (metadata_src) { + memcpy(meta, metadata_src, user_metadata_size); + } else { + ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); + } + } - return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())}; + return status; } -Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady( - const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) { - ALOGD_IF(TRACE, - "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu", - id(), buffer->id(), slot); +Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( + int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, + pdx::LocalHandle* acquire_fence) { + ATRACE_NAME("ConsumerQueue::Dequeue"); + if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { + ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); + return ErrorStatus(EINVAL); + } - // Avoid taking a transient reference, buffer is valid for the duration of - // this method call. - auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get()); - std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_] - : nullptr); - LocalHandle acquire_fence; + auto status = BufferHubQueue::Dequeue(timeout, slot); + if (!status) + return status.error_status(); - const int ret = - consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_); + auto buffer = std::static_pointer_cast<BufferConsumer>(status.take()); + const int ret = buffer->AcquireAsync(out_meta, acquire_fence); if (ret < 0) return ErrorStatus(-ret); - else - return {{buffer, std::move(metadata), std::move(acquire_fence), slot}}; + + return {std::move(buffer)}; } Status<void> ConsumerQueue::OnBufferAllocated() { diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp index 53eed8924a..221bc4f9d2 100644 --- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp +++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp @@ -328,7 +328,7 @@ status_t BufferHubQueueProducer::queueBuffer(int slot, LocalHandle fence_fd(fence->isValid() ? fence->dup() : -1); - DvrNativeBufferMetadata meta_data = {}; + DvrNativeBufferMetadata meta_data; meta_data.timestamp = timestamp; meta_data.is_auto_timestamp = static_cast<int32_t>(is_auto_timestamp); meta_data.dataspace = static_cast<int32_t>(dataspace); @@ -339,7 +339,7 @@ status_t BufferHubQueueProducer::queueBuffer(int slot, meta_data.scaling_mode = static_cast<int32_t>(scaling_mode); meta_data.transform = static_cast<int32_t>(transform); - buffer_producer->Post(fence_fd, &meta_data, sizeof(meta_data)); + buffer_producer->PostAsync(&meta_data, fence_fd); buffers_[slot].mBufferState.queue(); output->width = buffer_producer->width(); @@ -384,7 +384,7 @@ status_t BufferHubQueueProducer::cancelBuffer(int slot, } auto buffer_producer = buffers_[slot].mBufferProducer; - queue_->Enqueue(buffer_producer, slot); + queue_->Enqueue(buffer_producer, slot, 0ULL); buffers_[slot].mBufferState.cancel(); buffers_[slot].mFence = fence; ALOGD_IF(TRACE, "cancelBuffer: slot %d", slot); diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h index 0699fefd38..6962d6c9f8 100644 --- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h @@ -5,12 +5,13 @@ #include <pdx/client.h> #include <pdx/status.h> -#include <private/dvr/bufferhub_rpc.h> #include <private/dvr/buffer_hub_client.h> +#include <private/dvr/bufferhub_rpc.h> #include <private/dvr/epoll_file_descriptor.h> #include <private/dvr/ring_buffer.h> #include <memory> +#include <queue> #include <vector> namespace android { @@ -50,19 +51,22 @@ class BufferHubQueue : public pdx::Client { uint32_t default_format() const { return default_format_; } // Creates a new consumer in handle form for immediate transport over RPC. - pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle(); + pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle( + bool silent = false); // Returns the number of buffers avaiable for dequeue. - size_t count() const { return available_buffers_.GetSize(); } + size_t count() const { return available_buffers_.size(); } // Returns the total number of buffers that the queue is tracking. size_t capacity() const { return capacity_; } // Returns the size of metadata structure associated with this queue. - size_t metadata_size() const { return meta_size_; } + size_t metadata_size() const { return user_metadata_size_; } // Returns whether the buffer queue is full. - bool is_full() const { return available_buffers_.IsFull(); } + bool is_full() const { + return available_buffers_.size() >= kMaxQueueCapacity; + } explicit operator bool() const { return epoll_fd_.IsValid(); } @@ -136,8 +140,8 @@ class BufferHubQueue : public pdx::Client { // block. Specifying a timeout of -1 causes Dequeue() to block indefinitely, // while specifying a timeout equal to zero cause Dequeue() to return // immediately, even if no buffers are available. - pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue( - int timeout, size_t* slot, void* meta, pdx::LocalHandle* fence); + pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue(int timeout, + size_t* slot); // Waits for buffers to become available and adds them to the available queue. bool WaitForBuffers(int timeout); @@ -150,8 +154,9 @@ class BufferHubQueue : public pdx::Client { // per-buffer data. struct Entry { Entry() : slot(0) {} - Entry(const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) - : buffer(buffer), slot(slot) {} + Entry(const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot, + uint64_t index) + : buffer(buffer), slot(slot), index(index) {} Entry(const std::shared_ptr<BufferHubBuffer>& buffer, std::unique_ptr<uint8_t[]> metadata, pdx::LocalHandle fence, size_t slot) @@ -166,20 +171,24 @@ class BufferHubQueue : public pdx::Client { std::unique_ptr<uint8_t[]> metadata; pdx::LocalHandle fence; size_t slot; + uint64_t index; + }; + + struct EntryComparator { + bool operator()(const Entry& lhs, const Entry& rhs) { + return lhs.index > rhs.index; + } }; // Enqueues a buffer to the available list (Gained for producer or Acquireed // for consumer). pdx::Status<void> Enqueue(Entry entry); - virtual pdx::Status<Entry> OnBufferReady( - const std::shared_ptr<BufferHubBuffer>& buf, size_t slot) = 0; - // Called when a buffer is allocated remotely. virtual pdx::Status<void> OnBufferAllocated() { return {}; } // Size of the metadata that buffers in this queue cary. - size_t meta_size_{0}; + size_t user_metadata_size_{0}; private: void Initialize(); @@ -226,7 +235,9 @@ class BufferHubQueue : public pdx::Client { std::array<std::shared_ptr<BufferHubBuffer>, kMaxQueueCapacity> buffers_; // Buffers and related data that are available for dequeue. - RingBuffer<Entry> available_buffers_{kMaxQueueCapacity}; + // RingBuffer<Entry> available_buffers_{kMaxQueueCapacity}; + std::priority_queue<Entry, std::vector<Entry>, EntryComparator> + available_buffers_; // Keeps track with how many buffers have been added into the queue. size_t capacity_{0}; @@ -316,11 +327,14 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // to the consumer side. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, pdx::LocalHandle* release_fence); + pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( + int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, + pdx::LocalHandle* release_fence); // Enqueues a producer buffer in the queue. pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer, - size_t slot) { - return BufferHubQueue::Enqueue({buffer, slot}); + size_t slot, uint64_t index) { + return BufferHubQueue::Enqueue({buffer, slot, index}); } private: @@ -331,9 +345,6 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // arguments as the constructors. explicit ProducerQueue(pdx::LocalChannelHandle handle); ProducerQueue(const ProducerQueueConfig& config, const UsagePolicy& usage); - - pdx::Status<Entry> OnBufferReady( - const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) override; }; class ConsumerQueue : public BufferHubQueue { @@ -352,10 +363,9 @@ class ConsumerQueue : public BufferHubQueue { // used to avoid participation in the buffer lifecycle by a consumer queue // that is only used to spawn other consumer queues, such as in an // intermediate service. - static std::unique_ptr<ConsumerQueue> Import(pdx::LocalChannelHandle handle, - bool ignore_on_import = false) { + static std::unique_ptr<ConsumerQueue> Import(pdx::LocalChannelHandle handle) { return std::unique_ptr<ConsumerQueue>( - new ConsumerQueue(std::move(handle), ignore_on_import)); + new ConsumerQueue(std::move(handle))); } // Import newly created buffers from the service side. @@ -379,13 +389,16 @@ class ConsumerQueue : public BufferHubQueue { } pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue( - int timeout, size_t* slot, void* meta, size_t meta_size, + int timeout, size_t* slot, void* meta, size_t user_metadata_size, + pdx::LocalHandle* acquire_fence); + pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue( + int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, pdx::LocalHandle* acquire_fence); private: friend BufferHubQueue; - ConsumerQueue(pdx::LocalChannelHandle handle, bool ignore_on_import = false); + ConsumerQueue(pdx::LocalChannelHandle handle); // Add a consumer buffer to populate the queue. Once added, a consumer buffer // is NOT available to use until the producer side |Post| it. |WaitForBuffers| @@ -394,14 +407,7 @@ class ConsumerQueue : public BufferHubQueue { pdx::Status<void> AddBuffer(const std::shared_ptr<BufferConsumer>& buffer, size_t slot); - pdx::Status<Entry> OnBufferReady( - const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) override; - pdx::Status<void> OnBufferAllocated() override; - - // Flag indicating that imported (consumer) buffers should be ignored when - // imported to avoid participating in the buffer ownership flow. - bool ignore_on_import_; }; } // namespace dvr diff --git a/libs/vr/libbufferhubqueue/tests/Android.bp b/libs/vr/libbufferhubqueue/tests/Android.bp index 865573cafd..8bd1ef1414 100644 --- a/libs/vr/libbufferhubqueue/tests/Android.bp +++ b/libs/vr/libbufferhubqueue/tests/Android.bp @@ -1,4 +1,7 @@ +header_libraries = [ + "libdvr_headers", +] shared_libraries = [ "libbase", @@ -21,6 +24,7 @@ static_libraries = [ cc_test { srcs: ["buffer_hub_queue-test.cpp"], + header_libs: header_libraries, static_libs: static_libraries, shared_libs: shared_libraries, cflags: [ @@ -35,6 +39,7 @@ cc_test { cc_test { srcs: ["buffer_hub_queue_producer-test.cpp"], + header_libs: header_libraries, static_libs: static_libraries, shared_libs: shared_libraries, cflags: [ diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp index 7581a065b3..8a72531ed5 100644 --- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp @@ -3,6 +3,8 @@ #include <private/dvr/buffer_hub_queue_client.h> #include <gtest/gtest.h> +#include <poll.h> +#include <sys/eventfd.h> #include <vector> @@ -46,9 +48,9 @@ class BufferHubQueueTest : public ::testing::Test { void AllocateBuffer(size_t* slot_out = nullptr) { // Create producer buffer. - auto status = producer_queue_->AllocateBuffer( - kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat, - kBufferUsage); + auto status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight, + kBufferLayerCount, + kBufferFormat, kBufferUsage); ASSERT_TRUE(status.ok()); size_t slot = status.take(); @@ -56,6 +58,23 @@ class BufferHubQueueTest : public ::testing::Test { *slot_out = slot; } + bool WaitAndHandleOnce(BufferHubQueue* queue, int timeout_ms) { + pollfd pfd{queue->queue_fd(), POLLIN, 0}; + int ret; + do { + ret = poll(&pfd, 1, timeout_ms); + } while (ret == -1 && errno == EINTR); + + if (ret < 0) { + ALOGW("Failed to poll queue %d's event fd, error: %s.", queue->id(), + strerror(errno)); + return false; + } else if (ret == 0) { + return false; + } + return queue->HandleQueueEvents(); + } + protected: ProducerQueueConfigBuilder config_builder_; std::unique_ptr<ProducerQueue> producer_queue_; @@ -75,7 +94,7 @@ TEST_F(BufferHubQueueTest, TestDequeue) { for (size_t i = 0; i < nb_dequeue_times; i++) { size_t slot; LocalHandle fence; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(nullptr, p1); @@ -113,31 +132,26 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { // Dequeue returns timeout since no buffer is ready to consumer, but // this implicitly triggers buffer import and bump up |capacity|. LocalHandle fence; - auto status = consumer_queue_->Dequeue(0, &slot, &seq, &fence); + auto status = consumer_queue_->Dequeue(100, &slot, &seq, &fence); ASSERT_FALSE(status.ok()); ASSERT_EQ(ETIMEDOUT, status.error()); ASSERT_EQ(consumer_queue_->capacity(), i + 1); } - // Use /dev/zero as a stand-in for a fence. As long as BufferHub does not need - // to merge fences, which only happens when multiple consumers release the - // same buffer with release fences, the file object should simply pass - // through. - LocalHandle post_fence("/dev/zero", O_RDONLY); - struct stat post_fence_stat; - ASSERT_EQ(0, fstat(post_fence.Get(), &post_fence_stat)); + // Use eventfd as a stand-in for a fence. + LocalHandle post_fence(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); for (size_t i = 0; i < kBufferCount; i++) { LocalHandle fence; // First time there is no buffer available to dequeue. - auto consumer_status = consumer_queue_->Dequeue(0, &slot, &seq, &fence); + auto consumer_status = consumer_queue_->Dequeue(100, &slot, &seq, &fence); ASSERT_FALSE(consumer_status.ok()); ASSERT_EQ(ETIMEDOUT, consumer_status.error()); // Make sure Producer buffer is POSTED so that it's ready to Accquire // in the consumer's Dequeue() function. - auto producer_status = producer_queue_->Dequeue(0, &slot, &fence); + auto producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); auto producer = producer_status.take(); ASSERT_NE(nullptr, producer); @@ -147,20 +161,10 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { // Second time the just the POSTED buffer should be dequeued. uint64_t seq_out = 0; - consumer_status = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence); + consumer_status = consumer_queue_->Dequeue(100, &slot, &seq_out, &fence); ASSERT_TRUE(consumer_status.ok()); EXPECT_TRUE(fence.IsValid()); - struct stat acquire_fence_stat; - ASSERT_EQ(0, fstat(fence.Get(), &acquire_fence_stat)); - - // The file descriptors should refer to the same file object. Testing the - // device id and inode is a proxy for testing that the fds refer to the same - // file object. - EXPECT_NE(post_fence.Get(), fence.Get()); - EXPECT_EQ(post_fence_stat.st_dev, acquire_fence_stat.st_dev); - EXPECT_EQ(post_fence_stat.st_ino, acquire_fence_stat.st_ino); - auto consumer = consumer_status.take(); ASSERT_NE(nullptr, consumer); ASSERT_EQ(seq_in, seq_out); @@ -196,12 +200,11 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) { for (size_t i = 0; i < kBufferCount; i++) { Entry* entry = &buffers[i]; - auto producer_status = - producer_queue_->Dequeue(0, &entry->slot, &entry->fence); + auto producer_status = producer_queue_->Dequeue( + /*timeout_ms=*/100, &entry->slot, &entry->fence); ASSERT_TRUE(producer_status.ok()); entry->buffer = producer_status.take(); ASSERT_NE(nullptr, entry->buffer); - EXPECT_EQ(i, entry->slot); } // Remove a buffer and make sure both queues reflect the change. @@ -218,8 +221,8 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) { buffers[0].buffer = nullptr; // Now the consumer queue should know it's gone. - EXPECT_FALSE(consumer_queue_->HandleQueueEvents()); - EXPECT_EQ(kBufferCount - 1, consumer_queue_->capacity()); + EXPECT_FALSE(WaitAndHandleOnce(consumer_queue_.get(), /*timeout_ms=*/100)); + ASSERT_EQ(kBufferCount - 1, consumer_queue_->capacity()); // Allocate a new buffer. This should take the first empty slot. size_t slot; @@ -286,17 +289,20 @@ TEST_F(BufferHubQueueTest, TestMultipleConsumers) { auto silent_queue = producer_queue_->CreateSilentConsumerQueue(); ASSERT_NE(nullptr, silent_queue); - // Check that buffers are correctly imported on construction. - EXPECT_EQ(kBufferCount, silent_queue->capacity()); + // Check that silent queue doesn't import buffers on creation. + EXPECT_EQ(0, silent_queue->capacity()); // Dequeue and post a buffer. size_t slot; LocalHandle fence; - auto producer_status = producer_queue_->Dequeue(0, &slot, &fence); + auto producer_status = + producer_queue_->Dequeue(/*timeout_ms=*/100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); auto producer_buffer = producer_status.take(); ASSERT_NE(nullptr, producer_buffer); ASSERT_EQ(0, producer_buffer->Post<void>({})); + // After post, check the number of remaining available buffers. + EXPECT_EQ(kBufferCount - 1, producer_queue_->count()); // Currently we expect no buffer to be available prior to calling // WaitForBuffers/HandleQueueEvents. @@ -314,23 +320,30 @@ TEST_F(BufferHubQueueTest, TestMultipleConsumers) { EXPECT_EQ(1u, consumer_queue_->count()); // Reclaim released/ignored buffers. - producer_queue_->HandleQueueEvents(); + ASSERT_EQ(kBufferCount - 1, producer_queue_->count()); + + usleep(10000); + WaitAndHandleOnce(producer_queue_.get(), /*timeout_ms=*/100); ASSERT_EQ(kBufferCount - 1, producer_queue_->count()); // Post another buffer. - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(/*timeout_ms=*/100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); producer_buffer = producer_status.take(); ASSERT_NE(nullptr, producer_buffer); ASSERT_EQ(0, producer_buffer->Post<void>({})); // Verify that the consumer queue receives it. - EXPECT_EQ(1u, consumer_queue_->count()); - EXPECT_TRUE(consumer_queue_->HandleQueueEvents()); - EXPECT_EQ(2u, consumer_queue_->count()); + size_t consumer_queue_count = consumer_queue_->count(); + WaitAndHandleOnce(consumer_queue_.get(), /*timeout_ms=*/100); + EXPECT_LT(consumer_queue_count, consumer_queue_->count()); + + // Save the current consumer queue buffer count to compare after the dequeue. + consumer_queue_count = consumer_queue_->count(); // Dequeue and acquire/release (discard) buffers on the consumer end. - auto consumer_status = consumer_queue_->Dequeue(0, &slot, &fence); + auto consumer_status = + consumer_queue_->Dequeue(/*timeout_ms=*/100, &slot, &fence); ASSERT_TRUE(consumer_status.ok()); auto consumer_buffer = consumer_status.take(); ASSERT_NE(nullptr, consumer_buffer); @@ -338,7 +351,7 @@ TEST_F(BufferHubQueueTest, TestMultipleConsumers) { // Buffer should be returned to the producer queue without being handled by // the silent consumer queue. - EXPECT_EQ(1u, consumer_queue_->count()); + EXPECT_GT(consumer_queue_count, consumer_queue_->count()); EXPECT_EQ(kBufferCount - 2, producer_queue_->count()); EXPECT_TRUE(producer_queue_->HandleQueueEvents()); EXPECT_EQ(kBufferCount - 1, producer_queue_->count()); @@ -362,13 +375,13 @@ TEST_F(BufferHubQueueTest, TestMetadata) { for (auto mi : ms) { size_t slot; LocalHandle fence; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(nullptr, p1); ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0); TestMetadata mo; - auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence); + auto c1_status = consumer_queue_->Dequeue(100, &slot, &mo, &fence); ASSERT_TRUE(c1_status.ok()); auto c1 = c1_status.take(); ASSERT_EQ(mi.a, mo.a); @@ -387,7 +400,7 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) { int64_t mi = 3; size_t slot; LocalHandle fence; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(nullptr, p1); @@ -395,7 +408,7 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) { int32_t mo; // Acquire a buffer with mismatched metadata is not OK. - auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence); + auto c1_status = consumer_queue_->Dequeue(100, &slot, &mo, &fence); ASSERT_FALSE(c1_status.ok()); } @@ -406,14 +419,14 @@ TEST_F(BufferHubQueueTest, TestEnqueue) { size_t slot; LocalHandle fence; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(nullptr, p1); int64_t mo; - producer_queue_->Enqueue(p1, slot); - auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence); + producer_queue_->Enqueue(p1, slot, 0ULL); + auto c1_status = consumer_queue_->Dequeue(100, &slot, &mo, &fence); ASSERT_FALSE(c1_status.ok()); } @@ -424,14 +437,14 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { size_t s1; AllocateBuffer(); LocalHandle fence; - auto p1_status = producer_queue_->Dequeue(0, &s1, &fence); + auto p1_status = producer_queue_->Dequeue(100, &s1, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(nullptr, p1); // producer queue is exhausted size_t s2; - auto p2_status = producer_queue_->Dequeue(0, &s2, &fence); + auto p2_status = producer_queue_->Dequeue(100, &s2, &fence); ASSERT_FALSE(p2_status.ok()); ASSERT_EQ(ETIMEDOUT, p2_status.error()); @@ -441,7 +454,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { ASSERT_EQ(producer_queue_->capacity(), 2U); // now we can dequeue again - p2_status = producer_queue_->Dequeue(0, &s2, &fence); + p2_status = producer_queue_->Dequeue(100, &s2, &fence); ASSERT_TRUE(p2_status.ok()); auto p2 = p2_status.take(); ASSERT_NE(nullptr, p2); @@ -456,7 +469,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { int64_t seq = 1; ASSERT_EQ(p1->Post(LocalHandle(), seq), 0); size_t cs1, cs2; - auto c1_status = consumer_queue_->Dequeue(0, &cs1, &seq, &fence); + auto c1_status = consumer_queue_->Dequeue(100, &cs1, &seq, &fence); ASSERT_TRUE(c1_status.ok()); auto c1 = c1_status.take(); ASSERT_NE(nullptr, c1); @@ -465,7 +478,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { ASSERT_EQ(cs1, s1); ASSERT_EQ(p2->Post(LocalHandle(), seq), 0); - auto c2_status = consumer_queue_->Dequeue(0, &cs2, &seq, &fence); + auto c2_status = consumer_queue_->Dequeue(100, &cs2, &seq, &fence); ASSERT_TRUE(c2_status.ok()); auto c2 = c2_status.take(); ASSERT_NE(nullptr, c2); @@ -485,7 +498,7 @@ TEST_F(BufferHubQueueTest, TestUsageSetMask) { LocalHandle fence; size_t slot; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_EQ(p1->usage() & set_mask, set_mask); @@ -504,7 +517,7 @@ TEST_F(BufferHubQueueTest, TestUsageClearMask) { LocalHandle fence; size_t slot; - auto p1_status = producer_queue_->Dequeue(0, &slot, &fence); + auto p1_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_EQ(0u, p1->usage() & clear_mask); @@ -543,9 +556,9 @@ TEST_F(BufferHubQueueTest, TestUsageDenyClearMask) { ASSERT_TRUE(status.ok()); // While allocation without those bits should fail. - status = producer_queue_->AllocateBuffer( - kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat, - kBufferUsage & ~deny_clear_mask); + status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight, + kBufferLayerCount, kBufferFormat, + kBufferUsage & ~deny_clear_mask); ASSERT_FALSE(status.ok()); ASSERT_EQ(EINVAL, status.error()); } @@ -603,7 +616,7 @@ TEST_F(BufferHubQueueTest, TestFreeAllBuffers) { // Free all buffers when one buffer is dequeued. CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount); - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); status = producer_queue_->FreeAllBuffers(); EXPECT_TRUE(status.ok()); @@ -611,7 +624,7 @@ TEST_F(BufferHubQueueTest, TestFreeAllBuffers) { // Free all buffers when all buffers are dequeued. CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount); for (size_t i = 0; i < kBufferCount; i++) { - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); } status = producer_queue_->FreeAllBuffers(); @@ -619,7 +632,7 @@ TEST_F(BufferHubQueueTest, TestFreeAllBuffers) { // Free all buffers when one buffer is posted. CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount); - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); producer_buffer = producer_status.take(); ASSERT_NE(nullptr, producer_buffer); @@ -630,7 +643,7 @@ TEST_F(BufferHubQueueTest, TestFreeAllBuffers) { // Free all buffers when all buffers are posted. CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount); for (size_t i = 0; i < kBufferCount; i++) { - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); producer_buffer = producer_status.take(); ASSERT_NE(nullptr, producer_buffer); @@ -642,12 +655,12 @@ TEST_F(BufferHubQueueTest, TestFreeAllBuffers) { // Free all buffers when all buffers are acquired. CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount); for (size_t i = 0; i < kBufferCount; i++) { - producer_status = producer_queue_->Dequeue(0, &slot, &fence); + producer_status = producer_queue_->Dequeue(100, &slot, &fence); ASSERT_TRUE(producer_status.ok()); producer_buffer = producer_status.take(); ASSERT_NE(nullptr, producer_buffer); ASSERT_EQ(0, producer_buffer->Post(fence, &seq, sizeof(seq))); - consumer_status = consumer_queue_->Dequeue(0, &slot, &seq, &fence); + consumer_status = consumer_queue_->Dequeue(100, &slot, &seq, &fence); ASSERT_TRUE(consumer_status.ok()); } diff --git a/libs/vr/libdvr/dvr_buffer_queue.cpp b/libs/vr/libdvr/dvr_buffer_queue.cpp index 035252d0b9..09a49dd713 100644 --- a/libs/vr/libdvr/dvr_buffer_queue.cpp +++ b/libs/vr/libdvr/dvr_buffer_queue.cpp @@ -27,15 +27,6 @@ DvrWriteBufferQueue::DvrWriteBufferQueue( format_(producer_queue->default_format()) {} int DvrWriteBufferQueue::GetNativeWindow(ANativeWindow** out_window) { - if (producer_queue_->metadata_size() != sizeof(DvrNativeBufferMetadata)) { - ALOGE( - "DvrWriteBufferQueue::GetNativeWindow: The size of buffer metadata " - "(%zu) of the write queue does not match of size of " - "DvrNativeBufferMetadata (%zu).", - producer_queue_->metadata_size(), sizeof(DvrNativeBufferMetadata)); - return -EINVAL; - } - if (native_window_ == nullptr) { // Lazy creation of |native_window|, as not everyone is using // DvrWriteBufferQueue as an external surface. @@ -63,10 +54,27 @@ int DvrWriteBufferQueue::CreateReadQueue(DvrReadBufferQueue** out_read_queue) { } int DvrWriteBufferQueue::Dequeue(int timeout, DvrWriteBuffer* write_buffer, - int* out_fence_fd, size_t* out_slot) { + int* out_fence_fd) { + DvrNativeBufferMetadata meta; + DvrWriteBuffer* buffer = nullptr; + int fence_fd = -1; + if (const int ret = GainBuffer(timeout, &buffer, &meta, &fence_fd)) + return ret; + if (!buffer) + return -ENOMEM; + + write_buffers_[buffer->slot].reset(buffer); + write_buffer->write_buffer = std::move(buffer->write_buffer); + *out_fence_fd = fence_fd; + return 0; +} + +int DvrWriteBufferQueue::GainBuffer(int timeout, + DvrWriteBuffer** out_write_buffer, + DvrNativeBufferMetadata* out_meta, + int* out_fence_fd) { size_t slot; - pdx::LocalHandle fence; - std::shared_ptr<BufferProducer> buffer_producer; + pdx::LocalHandle release_fence; // Need to retry N+1 times, where N is total number of buffers in the queue. // As in the worst case, we will dequeue all N buffers and reallocate them, on @@ -75,15 +83,29 @@ int DvrWriteBufferQueue::Dequeue(int timeout, DvrWriteBuffer* write_buffer, size_t retry = 0; for (; retry < max_retries; retry++) { - auto buffer_status = producer_queue_->Dequeue(timeout, &slot, &fence); + auto buffer_status = + producer_queue_->Dequeue(timeout, &slot, out_meta, &release_fence); if (!buffer_status) { ALOGE_IF(buffer_status.error() != ETIMEDOUT, - "DvrWriteBufferQueue::Dequeue: Failed to dequeue buffer: %s", + "DvrWriteBufferQueue::GainBuffer: Failed to dequeue buffer: %s", buffer_status.GetErrorMessage().c_str()); return -buffer_status.error(); } - buffer_producer = buffer_status.take(); + if (write_buffers_[slot] == nullptr) { + // Lazy initialization of a write_buffers_ slot. Note that a slot will + // only be dynamically allocated once during the entire cycle life of a + // queue. + write_buffers_[slot] = std::make_unique<DvrWriteBuffer>(); + write_buffers_[slot]->slot = slot; + } + + LOG_ALWAYS_FATAL_IF( + write_buffers_[slot]->write_buffer, + "DvrWriteBufferQueue::GainBuffer: Buffer slot is not empty: %zu", slot); + write_buffers_[slot]->write_buffer = std::move(buffer_status.take()); + + const auto& buffer_producer = write_buffers_[slot]->write_buffer; if (!buffer_producer) return -ENOMEM; @@ -122,6 +144,9 @@ int DvrWriteBufferQueue::Dequeue(int timeout, DvrWriteBuffer* write_buffer, remove_status.GetErrorMessage().c_str()); return -remove_status.error(); } + // Make sure that the previously allocated buffer is dereferenced from + // write_buffers_ array. + write_buffers_[slot]->write_buffer = nullptr; auto allocate_status = producer_queue_->AllocateBuffer( width_, height_, old_layer_count, format_, old_usage); @@ -139,46 +164,8 @@ int DvrWriteBufferQueue::Dequeue(int timeout, DvrWriteBuffer* write_buffer, return -ENOMEM; } - write_buffer->write_buffer = std::move(buffer_producer); - *out_fence_fd = fence.Release(); - if (out_slot) { - // TODO(b/65469368): Remove this null check once dvrWriteBufferQueueDequeue - // is deprecated. - *out_slot = slot; - } - return 0; -} - -int DvrWriteBufferQueue::GainBuffer(int timeout, - DvrWriteBuffer** out_write_buffer, - DvrNativeBufferMetadata* out_meta, - int* out_fence_fd) { - DvrWriteBuffer write_buffer; - int fence_fd; - size_t slot; - const int ret = Dequeue(timeout, &write_buffer, &fence_fd, &slot); - if (ret < 0) { - ALOGE_IF( - ret != -ETIMEDOUT, - "DvrWriteBufferQueue::GainBuffer: Failed to dequeue buffer, ret=%d", - ret); - return ret; - } - - if (write_buffers_[slot] == nullptr) { - // Lazy initialization of a write_buffers_ slot. Note that a slot will only - // be dynamically allocated once during the entire cycle life of a queue. - write_buffers_[slot] = std::make_unique<DvrWriteBuffer>(); - write_buffers_[slot]->slot = slot; - } - - LOG_ALWAYS_FATAL_IF( - write_buffers_[slot]->write_buffer, - "DvrWriteBufferQueue::GainBuffer: Buffer slot is not empty: %zu", slot); - write_buffers_[slot]->write_buffer = std::move(write_buffer.write_buffer); - *out_write_buffer = write_buffers_[slot].release(); - *out_fence_fd = fence_fd; + *out_fence_fd = release_fence.Release(); return 0; } @@ -202,14 +189,16 @@ int DvrWriteBufferQueue::PostBuffer(DvrWriteBuffer* write_buffer, } if (write_buffer->write_buffer->id() != producer_queue_->GetBufferId(slot)) { ALOGE( - "DvrWriteBufferQueue::PostBuffer: Buffer to be released does not " - "belong to this buffer queue."); + "DvrWriteBufferQueue::PostBuffer: Buffer to be posted does not " + "belong to this buffer queue. Posting buffer: id=%d, buffer in " + "queue: id=%d", + write_buffer->write_buffer->id(), producer_queue_->GetBufferId(slot)); return -EINVAL; } + write_buffer->write_buffer->SetQueueIndex(next_post_index_++); pdx::LocalHandle fence(ready_fence_fd); - // TODO(b/65455724): All BufferHub operations should be async. - const int ret = write_buffer->write_buffer->Post(fence, meta, sizeof(*meta)); + const int ret = write_buffer->write_buffer->PostAsync(meta, fence); if (ret < 0) { ALOGE("DvrWriteBufferQueue::PostBuffer: Failed to post buffer, ret=%d", ret); @@ -316,8 +305,7 @@ int dvrWriteBufferQueueDequeue(DvrWriteBufferQueue* write_queue, int timeout, if (!write_queue || !write_buffer || !out_fence_fd) return -EINVAL; - // TODO(b/65469368): Deprecate this API once new GainBuffer API is in use. - return write_queue->Dequeue(timeout, write_buffer, out_fence_fd, nullptr); + return write_queue->Dequeue(timeout, write_buffer, out_fence_fd); } int dvrWriteBufferQueueGainBuffer(DvrWriteBufferQueue* write_queue, int timeout, @@ -370,8 +358,8 @@ int DvrReadBufferQueue::CreateReadQueue(DvrReadBufferQueue** out_read_queue) { } int DvrReadBufferQueue::Dequeue(int timeout, DvrReadBuffer* read_buffer, - int* out_fence_fd, size_t* out_slot, - void* out_meta, size_t meta_size_bytes) { + int* out_fence_fd, void* out_meta, + size_t meta_size_bytes) { if (meta_size_bytes != consumer_queue_->metadata_size()) { ALOGE( "DvrReadBufferQueue::Dequeue: Invalid metadata size, expected (%zu), " @@ -394,11 +382,6 @@ int DvrReadBufferQueue::Dequeue(int timeout, DvrReadBuffer* read_buffer, read_buffer->read_buffer = buffer_status.take(); *out_fence_fd = acquire_fence.Release(); - if (out_slot) { - // TODO(b/65469368): Remove this null check once dvrReadBufferQueueDequeue - // is deprecated. - *out_slot = slot; - } return 0; } @@ -406,17 +389,15 @@ int DvrReadBufferQueue::AcquireBuffer(int timeout, DvrReadBuffer** out_read_buffer, DvrNativeBufferMetadata* out_meta, int* out_fence_fd) { - DvrReadBuffer read_buffer; - int fence_fd; size_t slot; - const int ret = Dequeue(timeout, &read_buffer, &fence_fd, &slot, out_meta, - sizeof(*out_meta)); - if (ret < 0) { - ALOGE_IF( - ret != -ETIMEDOUT, - "DvrReadBufferQueue::AcquireBuffer: Failed to dequeue buffer, error=%d", - ret); - return ret; + pdx::LocalHandle acquire_fence; + auto buffer_status = + consumer_queue_->Dequeue(timeout, &slot, out_meta, &acquire_fence); + if (!buffer_status) { + ALOGE_IF(buffer_status.error() != ETIMEDOUT, + "DvrReadBufferQueue::AcquireBuffer: Failed to dequeue buffer: %s", + buffer_status.GetErrorMessage().c_str()); + return -buffer_status.error(); } if (read_buffers_[slot] == nullptr) { @@ -429,10 +410,10 @@ int DvrReadBufferQueue::AcquireBuffer(int timeout, LOG_FATAL_IF( read_buffers_[slot]->read_buffer, "DvrReadBufferQueue::AcquireBuffer: Buffer slot is not empty: %zu", slot); - read_buffers_[slot]->read_buffer = std::move(read_buffer.read_buffer); + read_buffers_[slot]->read_buffer = std::move(buffer_status.take()); *out_read_buffer = read_buffers_[slot].release(); - *out_fence_fd = fence_fd; + *out_fence_fd = acquire_fence.Release(); return 0; } @@ -457,20 +438,14 @@ int DvrReadBufferQueue::ReleaseBuffer(DvrReadBuffer* read_buffer, if (read_buffer->read_buffer->id() != consumer_queue_->GetBufferId(slot)) { ALOGE( "DvrReadBufferQueue::ReleaseBuffer: Buffer to be released does not " - "belong to this buffer queue."); + "belong to this buffer queue. Releasing buffer: id=%d, buffer in " + "queue: id=%d", + read_buffer->read_buffer->id(), consumer_queue_->GetBufferId(slot)); return -EINVAL; } pdx::LocalHandle fence(release_fence_fd); - int ret = 0; - if (fence) { - ret = read_buffer->read_buffer->Release(fence); - } else { - // TODO(b/65458354): Send metadata back to producer once shared memory based - // metadata is implemented. - // TODO(b/65455724): All BufferHub operations should be async. - ret = read_buffer->read_buffer->ReleaseAsync(); - } + int ret = read_buffer->read_buffer->ReleaseAsync(meta, fence); if (ret < 0) { ALOGE("DvrReadBufferQueue::ReleaseBuffer: Failed to release buffer, ret=%d", ret); @@ -559,9 +534,8 @@ int dvrReadBufferQueueDequeue(DvrReadBufferQueue* read_queue, int timeout, if (meta_size_bytes != 0 && !out_meta) return -EINVAL; - // TODO(b/65469368): Deprecate this API once new AcquireBuffer API is in use. - return read_queue->Dequeue(timeout, read_buffer, out_fence_fd, nullptr, - out_meta, meta_size_bytes); + return read_queue->Dequeue(timeout, read_buffer, out_fence_fd, out_meta, + meta_size_bytes); } int dvrReadBufferQueueAcquireBuffer(DvrReadBufferQueue* read_queue, int timeout, diff --git a/libs/vr/libdvr/dvr_buffer_queue_internal.h b/libs/vr/libdvr/dvr_buffer_queue_internal.h index f9c0bfd7c7..e53a6868ff 100644 --- a/libs/vr/libdvr/dvr_buffer_queue_internal.h +++ b/libs/vr/libdvr/dvr_buffer_queue_internal.h @@ -42,8 +42,7 @@ struct DvrWriteBufferQueue { int GetNativeWindow(ANativeWindow** out_window); int CreateReadQueue(DvrReadBufferQueue** out_read_queue); - int Dequeue(int timeout, DvrWriteBuffer* write_buffer, int* out_fence_fd, - size_t* out_slot); + int Dequeue(int timeout, DvrWriteBuffer* write_buffer, int* out_fence_fd); int GainBuffer(int timeout, DvrWriteBuffer** out_write_buffer, DvrNativeBufferMetadata* out_meta, int* out_fence_fd); int PostBuffer(DvrWriteBuffer* write_buffer, @@ -55,6 +54,7 @@ struct DvrWriteBufferQueue { std::array<std::unique_ptr<DvrWriteBuffer>, BufferHubQueue::kMaxQueueCapacity> write_buffers_; + int64_t next_post_index_ = 0; uint32_t width_; uint32_t height_; uint32_t format_; @@ -75,7 +75,7 @@ struct DvrReadBufferQueue { int CreateReadQueue(DvrReadBufferQueue** out_read_queue); int Dequeue(int timeout, DvrReadBuffer* read_buffer, int* out_fence_fd, - size_t* out_slot, void* out_meta, size_t meta_size_bytes); + void* out_meta, size_t user_metadata_size); int AcquireBuffer(int timeout, DvrReadBuffer** out_read_buffer, DvrNativeBufferMetadata* out_meta, int* out_fence_fd); int ReleaseBuffer(DvrReadBuffer* read_buffer, diff --git a/libs/vr/libdvr/include/dvr/dvr_api.h b/libs/vr/libdvr/include/dvr/dvr_api.h index 8f45ce7e40..499b7c190a 100644 --- a/libs/vr/libdvr/include/dvr/dvr_api.h +++ b/libs/vr/libdvr/include/dvr/dvr_api.h @@ -15,6 +15,12 @@ extern "C" { #endif +#ifdef __GNUC__ +#define ALIGNED_DVR_STRUCT(x) __attribute__((packed, aligned(x))) +#else +#define ALIGNED_DVR_STRUCT(x) +#endif + typedef struct ANativeWindow ANativeWindow; typedef struct DvrPoseAsync DvrPoseAsync; @@ -367,7 +373,24 @@ typedef int (*DvrPerformanceSetSchedulerPolicyPtr)( // existing data members. If new fields need to be added, please take extra care // to make sure that new data field is padded properly the size of the struct // stays same. -struct DvrNativeBufferMetadata { +struct ALIGNED_DVR_STRUCT(8) DvrNativeBufferMetadata { +#ifdef __cplusplus + DvrNativeBufferMetadata() + : timestamp(0), + is_auto_timestamp(0), + dataspace(0), + crop_left(0), + crop_top(0), + crop_right(0), + crop_bottom(0), + scaling_mode(0), + transform(0), + index(0), + user_metadata_size(0), + user_metadata_ptr(0), + release_fence_mask(0), + reserved{0} {} +#endif // Timestamp of the frame. int64_t timestamp; @@ -391,10 +414,32 @@ struct DvrNativeBufferMetadata { // android/native_window.h int32_t transform; - // Reserved bytes for so that the struct is forward compatible. - int32_t reserved[16]; + // The index of the frame. + int64_t index; + + // Size of additional metadata requested by user. + uint64_t user_metadata_size; + + // Raw memory address of the additional user defined metadata. Only valid when + // user_metadata_size is non-zero. + uint64_t user_metadata_ptr; + + // Only applicable for metadata retrieved from GainAsync. This indicates which + // consumer has pending fence that producer should epoll on. + uint64_t release_fence_mask; + + // Reserved bytes for so that the struct is forward compatible and padding to + // 104 bytes so the size is a multiple of 8. + int32_t reserved[8]; }; +#ifdef __cplusplus +// Warning: DvrNativeBufferMetadata is part of the DVR API and changing its size +// will cause compatiblity issues between different DVR API releases. +static_assert(sizeof(DvrNativeBufferMetadata) == 104, + "Unexpected size for DvrNativeBufferMetadata"); +#endif + struct DvrApi_v1 { // Defines an API entry for V1 (no version suffix). #define DVR_V1_API_ENTRY(name) Dvr##name##Ptr name diff --git a/libs/vr/libdvr/tests/Android.bp b/libs/vr/libdvr/tests/Android.bp index a9302a7561..887766a535 100644 --- a/libs/vr/libdvr/tests/Android.bp +++ b/libs/vr/libdvr/tests/Android.bp @@ -42,6 +42,7 @@ cc_test { "dvr_named_buffer-test.cpp", ], + header_libs: ["libdvr_headers"], static_libs: static_libraries, shared_libs: shared_libraries, cflags: [ diff --git a/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp b/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp index f1c5e48916..62cd8d4e53 100644 --- a/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp +++ b/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp @@ -131,7 +131,7 @@ TEST_F(DvrBufferQueueTest, GainBuffer) { DvrWriteBuffer* wb = nullptr; EXPECT_FALSE(dvrWriteBufferIsValid(wb)); - DvrNativeBufferMetadata meta = {0}; + DvrNativeBufferMetadata meta; int fence_fd = -1; ret = dvrWriteBufferQueueGainBuffer(write_queue_, /*timeout=*/0, &wb, &meta, &fence_fd); @@ -150,8 +150,8 @@ TEST_F(DvrBufferQueueTest, AcquirePostGainRelease) { DvrReadBufferQueue* read_queue = nullptr; DvrReadBuffer* rb = nullptr; DvrWriteBuffer* wb = nullptr; - DvrNativeBufferMetadata meta1 = {0}; - DvrNativeBufferMetadata meta2 = {0}; + DvrNativeBufferMetadata meta1; + DvrNativeBufferMetadata meta2; int fence_fd = -1; ret = dvrWriteBufferQueueCreateReadQueue(write_queue_, &read_queue); @@ -180,7 +180,7 @@ TEST_F(DvrBufferQueueTest, AcquirePostGainRelease) { wb = nullptr; // Acquire buffer for reading. - ret = dvrReadBufferQueueAcquireBuffer(read_queue, /*timeout=*/0, &rb, &meta2, + ret = dvrReadBufferQueueAcquireBuffer(read_queue, /*timeout=*/10, &rb, &meta2, &fence_fd); ASSERT_EQ(ret, 0); ASSERT_NE(rb, nullptr); @@ -245,7 +245,7 @@ TEST_F(DvrBufferQueueTest, ResizeBuffer) { int fence_fd = -1; - DvrNativeBufferMetadata meta = {0}; + DvrNativeBufferMetadata meta; DvrReadBufferQueue* read_queue = nullptr; DvrWriteBuffer* wb1 = nullptr; DvrWriteBuffer* wb2 = nullptr; @@ -400,7 +400,7 @@ TEST_F(DvrBufferQueueTest, StableBufferIdAndHardwareBuffer) { // This test runs the following operations many many times. Thus we prefer to // use ASSERT_XXX rather than EXPECT_XXX to avoid spamming the output. std::function<void(size_t i)> Gain = [&](size_t i) { - int ret = dvrWriteBufferQueueGainBuffer(write_queue_, /*timeout=*/0, + int ret = dvrWriteBufferQueueGainBuffer(write_queue_, /*timeout=*/10, &wbs[i], &metas[i], &fence_fd); ASSERT_EQ(ret, 0); ASSERT_LT(fence_fd, 0); // expect invalid fence. @@ -434,7 +434,7 @@ TEST_F(DvrBufferQueueTest, StableBufferIdAndHardwareBuffer) { }; std::function<void(size_t i)> Acquire = [&](size_t i) { - int ret = dvrReadBufferQueueAcquireBuffer(read_queue, /*timeout=*/0, + int ret = dvrReadBufferQueueAcquireBuffer(read_queue, /*timeout=*/10, &rbs[i], &metas[i], &fence_fd); ASSERT_EQ(ret, 0); ASSERT_LT(fence_fd, 0); // expect invalid fence. diff --git a/libs/vr/libpdx/Android.bp b/libs/vr/libpdx/Android.bp index 8fce140307..10c0b31c57 100644 --- a/libs/vr/libpdx/Android.bp +++ b/libs/vr/libpdx/Android.bp @@ -36,6 +36,7 @@ cc_test { "variant_tests.cpp", ], static_libs: [ + "libcutils", "libgmock", "libpdx", "liblog", diff --git a/libs/vr/libpdx/private/pdx/client_channel.h b/libs/vr/libpdx/private/pdx/client_channel.h index dbfd626d6f..10a49bb8d7 100644 --- a/libs/vr/libpdx/private/pdx/client_channel.h +++ b/libs/vr/libpdx/private/pdx/client_channel.h @@ -1,6 +1,8 @@ #ifndef ANDROID_PDX_CLIENT_CHANNEL_H_ #define ANDROID_PDX_CLIENT_CHANNEL_H_ +#include <vector> + #include <pdx/channel_handle.h> #include <pdx/file_handle.h> #include <pdx/status.h> @@ -20,6 +22,15 @@ class ClientChannel { virtual int event_fd() const = 0; virtual Status<int> GetEventMask(int events) = 0; + struct EventSource { + int event_fd; + int event_mask; + }; + + // Returns a set of event-generating fds with and event mask for each. These + // fds are owned by the ClientChannel and must never be closed by the caller. + virtual std::vector<EventSource> GetEventSources() const = 0; + virtual LocalChannelHandle& GetChannelHandle() = 0; virtual void* AllocateTransactionState() = 0; virtual void FreeTransactionState(void* state) = 0; diff --git a/libs/vr/libpdx/private/pdx/mock_client_channel.h b/libs/vr/libpdx/private/pdx/mock_client_channel.h index 561c939daf..49e0682bc9 100644 --- a/libs/vr/libpdx/private/pdx/mock_client_channel.h +++ b/libs/vr/libpdx/private/pdx/mock_client_channel.h @@ -11,6 +11,7 @@ class MockClientChannel : public ClientChannel { public: MOCK_CONST_METHOD0(GetIpcTag, uint32_t()); MOCK_CONST_METHOD0(event_fd, int()); + MOCK_CONST_METHOD0(GetEventSources, std::vector<EventSource>()); MOCK_METHOD1(GetEventMask, Status<int>(int)); MOCK_METHOD0(GetChannelHandle, LocalChannelHandle&()); MOCK_METHOD0(AllocateTransactionState, void*()); diff --git a/libs/vr/libpdx/private/pdx/trace.h b/libs/vr/libpdx/private/pdx/trace.h index ebe8491ebc..c687fd6259 100644 --- a/libs/vr/libpdx/private/pdx/trace.h +++ b/libs/vr/libpdx/private/pdx/trace.h @@ -1,35 +1,82 @@ #ifndef ANDROID_PDX_TRACE_H_ #define ANDROID_PDX_TRACE_H_ -// Tracing utilities for libpdx. Tracing in the service framework is enabled -// under these conditions: -// 1. ATRACE_TAG is defined, AND -// 2. ATRACE_TAG does not equal ATRACE_TAG_NEVER, AND -// 3. PDX_TRACE_ENABLED is defined, AND -// 4. PDX_TRACE_ENABLED is equal to logical true. -// -// If any of these conditions are not met tracing is completely removed from the -// library and headers. - -// If ATRACE_TAG is not defined, default to never. -#ifndef ATRACE_TAG -#define ATRACE_TAG ATRACE_TAG_NEVER -#endif +#include <array> -// Include tracing functions after the trace tag is defined. #include <utils/Trace.h> -// If PDX_TRACE_ENABLED is not defined, default to off. -#ifndef PDX_TRACE_ENABLED -#define PDX_TRACE_ENABLED 0 +// Enables internal tracing in libpdx. This is disabled by default to avoid +// spamming the trace buffers during normal trace activities. libpdx must be +// built with this set to true to enable internal tracing. +#ifndef PDX_LIB_TRACE_ENABLED +#define PDX_LIB_TRACE_ENABLED false #endif -#if (ATRACE_TAG) != (ATRACE_TAG_NEVER) && (PDX_TRACE_ENABLED) -#define PDX_TRACE_NAME ATRACE_NAME -#else -#define PDX_TRACE_NAME(name) \ - do { \ - } while (0) -#endif +namespace android { +namespace pdx { + +// Utility to generate scoped tracers with arguments. +class ScopedTraceArgs { + public: + template <typename... Args> + ScopedTraceArgs(uint64_t tag, const char* format, Args&&... args) + : tag_{tag} { + if (atrace_is_tag_enabled(tag_)) { + std::array<char, 1024> buffer; + snprintf(buffer.data(), buffer.size(), format, + std::forward<Args>(args)...); + atrace_begin(tag_, buffer.data()); + } + } + + ~ScopedTraceArgs() { atrace_end(tag_); } + + private: + uint64_t tag_; + + ScopedTraceArgs(const ScopedTraceArgs&) = delete; + void operator=(const ScopedTraceArgs&) = delete; +}; + +// Utility to generate scoped tracers. +class ScopedTrace { + public: + template <typename... Args> + ScopedTrace(uint64_t tag, bool enabled, const char* name) + : tag_{tag}, enabled_{enabled} { + if (enabled_) + atrace_begin(tag_, name); + } + + ~ScopedTrace() { + if (enabled_) + atrace_end(tag_); + } + + private: + uint64_t tag_; + bool enabled_; + + ScopedTrace(const ScopedTrace&) = delete; + void operator=(const ScopedTrace&) = delete; +}; + +} // namespace pdx +} // namespace android + +// Macro to define a scoped tracer with arguments. Uses PASTE(x, y) macro +// defined in utils/Trace.h. +#define PDX_TRACE_FORMAT(format, ...) \ + ::android::pdx::ScopedTraceArgs PASTE(__tracer, __LINE__) { \ + ATRACE_TAG, format, ##__VA_ARGS__ \ + } + +// TODO(eieio): Rename this to PDX_LIB_TRACE_NAME() for internal use by libpdx +// and rename internal uses inside the library. This version is only enabled +// when PDX_LIB_TRACE_ENABLED is true. +#define PDX_TRACE_NAME(name) \ + ::android::pdx::ScopedTrace PASTE(__tracer, __LINE__) { \ + ATRACE_TAG, PDX_LIB_TRACE_ENABLED, name \ + } #endif // ANDROID_PDX_TRACE_H_ diff --git a/libs/vr/libpdx_uds/channel_event_set.cpp b/libs/vr/libpdx_uds/channel_event_set.cpp index ebe7cea7e3..c68968e1f2 100644 --- a/libs/vr/libpdx_uds/channel_event_set.cpp +++ b/libs/vr/libpdx_uds/channel_event_set.cpp @@ -1,6 +1,10 @@ #include "private/uds/channel_event_set.h" +#include <errno.h> #include <log/log.h> +#include <poll.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> #include <uds/ipc_helper.h> @@ -8,109 +12,137 @@ namespace android { namespace pdx { namespace uds { +namespace { + +template <typename FileHandleType> +Status<void> SetupHandle(int fd, FileHandleType* handle, + const char* error_name) { + const int error = errno; + handle->Reset(fd); + if (!*handle) { + ALOGE("SetupHandle: Failed to setup %s handle: %s", error_name, + strerror(error)); + return ErrorStatus{error}; + } + return {}; +} + +} // anonymous namespace + ChannelEventSet::ChannelEventSet() { const int flags = EFD_CLOEXEC | EFD_NONBLOCK; - LocalHandle epoll_fd, event_fd; + LocalHandle pollin_event_fd, pollhup_event_fd; - if (!SetupHandle(epoll_create1(EPOLL_CLOEXEC), &epoll_fd, "epoll") || - !SetupHandle(eventfd(0, flags), &event_fd, "event")) { + if (!SetupHandle(eventfd(0, flags), &pollin_event_fd, "pollin_event") || + !SetupHandle(eventfd(0, flags), &pollhup_event_fd, "pollhup_event")) { + return; + } + + pollin_event_fd_ = std::move(pollin_event_fd); + pollhup_event_fd_ = std::move(pollhup_event_fd); +} + +int ChannelEventSet::ModifyEvents(int clear_mask, int set_mask) { + ALOGD_IF(TRACE, "ChannelEventSet::ModifyEvents: clear_mask=%x set_mask=%x", + clear_mask, set_mask); + const int old_bits = event_bits_; + const int new_bits = (event_bits_ & ~clear_mask) | set_mask; + event_bits_ = new_bits; + eventfd_t value; + + // Calculate which bits changed and how. Bits that haven't changed since last + // modification will not change the state of an eventfd. + const int set_bits = new_bits & ~old_bits; + const int clear_bits = ~new_bits & old_bits; + + if (set_bits & EPOLLIN) + eventfd_write(pollin_event_fd_.Get(), 1); + else if (clear_bits & EPOLLIN) + eventfd_read(pollin_event_fd_.Get(), &value); + + if (set_bits & EPOLLHUP) + eventfd_write(pollhup_event_fd_.Get(), 1); + else if (clear_bits & EPOLLHUP) + eventfd_read(pollhup_event_fd_.Get(), &value); + + return 0; +} + +ChannelEventReceiver::ChannelEventReceiver(LocalHandle data_fd, + LocalHandle pollin_event_fd, + LocalHandle pollhup_event_fd) { + LocalHandle epoll_fd; + if (!SetupHandle(epoll_create1(EPOLL_CLOEXEC), &epoll_fd, "epoll")) { return; } epoll_event event; - event.events = 0; + event.events = EPOLLHUP | EPOLLRDHUP; event.data.u32 = 0; - if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, event_fd.Get(), &event) < 0) { + if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, data_fd.Get(), &event) < 0) { const int error = errno; - ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s", + ALOGE("ChannelEventSet::ChannelEventSet: Failed to add data_fd: %s", strerror(error)); return; } - epoll_fd_ = std::move(epoll_fd); - event_fd_ = std::move(event_fd); -} - -Status<void> ChannelEventSet::AddDataFd(const LocalHandle& data_fd) { - epoll_event event; - event.events = EPOLLHUP | EPOLLRDHUP; - event.data.u32 = event.events; - if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, data_fd.Get(), &event) < 0) { + event.events = EPOLLIN; + event.data.u32 = 0; + if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, pollin_event_fd.Get(), &event) < + 0) { const int error = errno; - ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s", + ALOGE("ChannelEventSet::ChannelEventSet: Failed to add pollin_event_fd: %s", strerror(error)); - return ErrorStatus{error}; - } else { - return {}; + return; } -} -int ChannelEventSet::ModifyEvents(int clear_mask, int set_mask) { - ALOGD_IF(TRACE, "ChannelEventSet::ModifyEvents: clear_mask=%x set_mask=%x", - clear_mask, set_mask); - const int old_bits = event_bits_; - const int new_bits = (event_bits_ & ~clear_mask) | set_mask; - event_bits_ = new_bits; - - // If anything changed clear the event and update the event mask. - if (old_bits != new_bits) { - eventfd_t value; - eventfd_read(event_fd_.Get(), &value); - - epoll_event event; - event.events = POLLIN; - event.data.u32 = event_bits_; - if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, event_fd_.Get(), &event) < - 0) { - const int error = errno; - ALOGE("ChannelEventSet::AddEventHandle: Failed to update event: %s", - strerror(error)); - return -error; - } + event.events = EPOLLIN; + event.data.u32 = 0; + if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, pollhup_event_fd.Get(), &event) < + 0) { + const int error = errno; + ALOGE( + "ChannelEventSet::ChannelEventSet: Failed to add pollhup_event_fd: %s", + strerror(error)); + return; } - // If there are any bits set, re-trigger the eventfd. - if (new_bits) - eventfd_write(event_fd_.Get(), 1); - - return 0; + pollin_event_fd_ = std::move(pollin_event_fd); + pollhup_event_fd_ = std::move(pollhup_event_fd); + data_fd_ = std::move(data_fd); + epoll_fd_ = std::move(epoll_fd); } -Status<void> ChannelEventSet::SetupHandle(int fd, LocalHandle* handle, - const char* error_name) { - const int error = errno; - handle->Reset(fd); - if (!*handle) { - ALOGE("ChannelEventSet::SetupHandle: Failed to setup %s handle: %s", - error_name, strerror(error)); +Status<int> ChannelEventReceiver::PollPendingEvents(int timeout_ms) const { + std::array<pollfd, 3> pfds = {{{pollin_event_fd_.Get(), POLLIN, 0}, + {pollhup_event_fd_.Get(), POLLIN, 0}, + {data_fd_.Get(), POLLHUP | POLLRDHUP, 0}}}; + if (RETRY_EINTR(poll(pfds.data(), pfds.size(), timeout_ms)) < 0) { + const int error = errno; + ALOGE( + "ChannelEventReceiver::PollPendingEvents: Failed to poll for events: " + "%s", + strerror(error)); return ErrorStatus{error}; } - return {}; + + const int event_mask = + ((pfds[0].revents & POLLIN) ? EPOLLIN : 0) | + ((pfds[1].revents & POLLIN) ? EPOLLHUP : 0) | + ((pfds[2].revents & (POLLHUP | POLLRDHUP)) ? EPOLLHUP : 0); + return {event_mask}; } Status<int> ChannelEventReceiver::GetPendingEvents() const { constexpr long kTimeoutMs = 0; - epoll_event event; - const int count = - RETRY_EINTR(epoll_wait(epoll_fd_.Get(), &event, 1, kTimeoutMs)); - - Status<int> status; - if (count < 0) { - status.SetError(errno); - ALOGE("ChannelEventReceiver::GetPendingEvents: Failed to get events: %s", - status.GetErrorMessage().c_str()); - return status; - } else if (count == 0) { - status.SetError(ETIMEDOUT); - return status; - } - - const int mask_out = event.data.u32; - ALOGD_IF(TRACE, "ChannelEventReceiver::GetPendingEvents: mask_out=%x", - mask_out); + return PollPendingEvents(kTimeoutMs); +} - status.SetValue(mask_out); - return status; +std::vector<ClientChannel::EventSource> ChannelEventReceiver::GetEventSources() + const { + return {{data_fd_.Get(), EPOLLHUP | EPOLLRDHUP}, + {pollin_event_fd_.Get(), EPOLLIN}, + {pollhup_event_fd_.Get(), POLLIN}}; } } // namespace uds diff --git a/libs/vr/libpdx_uds/channel_manager.cpp b/libs/vr/libpdx_uds/channel_manager.cpp index afc0a4f041..43ebe05021 100644 --- a/libs/vr/libpdx_uds/channel_manager.cpp +++ b/libs/vr/libpdx_uds/channel_manager.cpp @@ -22,18 +22,26 @@ void ChannelManager::CloseHandle(int32_t handle) { } LocalChannelHandle ChannelManager::CreateHandle(LocalHandle data_fd, - LocalHandle event_fd) { - if (data_fd && event_fd) { + LocalHandle pollin_event_fd, + LocalHandle pollhup_event_fd) { + if (data_fd && pollin_event_fd && pollhup_event_fd) { std::lock_guard<std::mutex> autolock(mutex_); - int32_t handle = data_fd.Get(); - channels_.emplace(handle, - ChannelData{std::move(data_fd), std::move(event_fd)}); + const int32_t handle = data_fd.Get(); + channels_.emplace( + handle, + ChannelEventReceiver{std::move(data_fd), std::move(pollin_event_fd), + std::move(pollhup_event_fd)}); return LocalChannelHandle(this, handle); + } else { + ALOGE( + "ChannelManager::CreateHandle: Invalid arguments: data_fd=%d " + "pollin_event_fd=%d pollhup_event_fd=%d", + data_fd.Get(), pollin_event_fd.Get(), pollhup_event_fd.Get()); + return LocalChannelHandle(nullptr, -1); } - return LocalChannelHandle(nullptr, -1); } -ChannelManager::ChannelData* ChannelManager::GetChannelData(int32_t handle) { +ChannelEventReceiver* ChannelManager::GetChannelData(int32_t handle) { std::lock_guard<std::mutex> autolock(mutex_); auto channel = channels_.find(handle); return channel != channels_.end() ? &channel->second : nullptr; diff --git a/libs/vr/libpdx_uds/client_channel.cpp b/libs/vr/libpdx_uds/client_channel.cpp index 3f785fa62e..2e9c1def3b 100644 --- a/libs/vr/libpdx_uds/client_channel.cpp +++ b/libs/vr/libpdx_uds/client_channel.cpp @@ -33,7 +33,9 @@ struct TransactionState { } else if (static_cast<size_t>(index) < response.channels.size()) { auto& channel_info = response.channels[index]; *handle = ChannelManager::Get().CreateHandle( - std::move(channel_info.data_fd), std::move(channel_info.event_fd)); + std::move(channel_info.data_fd), + std::move(channel_info.pollin_event_fd), + std::move(channel_info.pollhup_event_fd)); } else { return false; } @@ -53,9 +55,9 @@ struct TransactionState { if (auto* channel_data = ChannelManager::Get().GetChannelData(handle.value())) { - ChannelInfo<BorrowedHandle> channel_info; - channel_info.data_fd.Reset(handle.value()); - channel_info.event_fd = channel_data->event_receiver.event_fd(); + ChannelInfo<BorrowedHandle> channel_info{ + channel_data->data_fd(), channel_data->pollin_event_fd(), + channel_data->pollhup_event_fd()}; request.channels.push_back(std::move(channel_info)); return request.channels.size() - 1; } else { diff --git a/libs/vr/libpdx_uds/client_channel_factory.cpp b/libs/vr/libpdx_uds/client_channel_factory.cpp index 433f459769..09dc7beb76 100644 --- a/libs/vr/libpdx_uds/client_channel_factory.cpp +++ b/libs/vr/libpdx_uds/client_channel_factory.cpp @@ -139,20 +139,33 @@ Status<std::unique_ptr<pdx::ClientChannel>> ClientChannelFactory::Connect( RequestHeader<BorrowedHandle> request; InitRequest(&request, opcodes::CHANNEL_OPEN, 0, 0, false); + status = SendData(socket_.Borrow(), request); if (!status) return status.error_status(); + ResponseHeader<LocalHandle> response; status = ReceiveData(socket_.Borrow(), &response); if (!status) return status.error_status(); - int ref = response.ret_code; - if (ref < 0 || static_cast<size_t>(ref) > response.file_descriptors.size()) + else if (response.ret_code < 0 || response.channels.size() != 1) + return ErrorStatus(EIO); + + LocalHandle pollin_event_fd = std::move(response.channels[0].pollin_event_fd); + LocalHandle pollhup_event_fd = + std::move(response.channels[0].pollhup_event_fd); + + if (!pollin_event_fd || !pollhup_event_fd) { + ALOGE( + "ClientChannelFactory::Connect: Required fd was not returned from the " + "service: pollin_event_fd=%d pollhup_event_fd=%d", + pollin_event_fd.Get(), pollhup_event_fd.Get()); return ErrorStatus(EIO); + } - LocalHandle event_fd = std::move(response.file_descriptors[ref]); return ClientChannel::Create(ChannelManager::Get().CreateHandle( - std::move(socket_), std::move(event_fd))); + std::move(socket_), std::move(pollin_event_fd), + std::move(pollhup_event_fd))); } } // namespace uds diff --git a/libs/vr/libpdx_uds/private/uds/channel_event_set.h b/libs/vr/libpdx_uds/private/uds/channel_event_set.h index 1f464d5f91..99e75028d1 100644 --- a/libs/vr/libpdx_uds/private/uds/channel_event_set.h +++ b/libs/vr/libpdx_uds/private/uds/channel_event_set.h @@ -1,11 +1,9 @@ #ifndef ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_ #define ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_ -#include <errno.h> -#include <poll.h> -#include <sys/epoll.h> -#include <sys/eventfd.h> +#include <vector> +#include <pdx/client_channel.h> #include <pdx/file_handle.h> #include <pdx/status.h> @@ -19,21 +17,20 @@ class ChannelEventSet { ChannelEventSet(ChannelEventSet&&) = default; ChannelEventSet& operator=(ChannelEventSet&&) = default; - BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); } + BorrowedHandle pollin_event_fd() const { return pollin_event_fd_.Borrow(); } + BorrowedHandle pollhup_event_fd() const { return pollhup_event_fd_.Borrow(); } - explicit operator bool() const { return !!epoll_fd_ && !!event_fd_; } + explicit operator bool() const { + return !!pollin_event_fd_ && !!pollhup_event_fd_; + } - Status<void> AddDataFd(const LocalHandle& data_fd); int ModifyEvents(int clear_mask, int set_mask); private: - LocalHandle epoll_fd_; - LocalHandle event_fd_; + LocalHandle pollin_event_fd_; + LocalHandle pollhup_event_fd_; uint32_t event_bits_ = 0; - static Status<void> SetupHandle(int fd, LocalHandle* handle, - const char* error_name); - ChannelEventSet(const ChannelEventSet&) = delete; void operator=(const ChannelEventSet&) = delete; }; @@ -41,14 +38,31 @@ class ChannelEventSet { class ChannelEventReceiver { public: ChannelEventReceiver() = default; - ChannelEventReceiver(LocalHandle epoll_fd) : epoll_fd_{std::move(epoll_fd)} {} + ChannelEventReceiver(LocalHandle data_fd, LocalHandle pollin_event_fd, + LocalHandle pollhup_event_fd); ChannelEventReceiver(ChannelEventReceiver&&) = default; ChannelEventReceiver& operator=(ChannelEventReceiver&&) = default; + explicit operator bool() const { + return !!pollin_event_fd_ && !!pollhup_event_fd_ && !!data_fd_ && + !!epoll_fd_; + } + BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); } + + BorrowedHandle pollin_event_fd() const { return pollin_event_fd_.Borrow(); } + BorrowedHandle pollhup_event_fd() const { return pollhup_event_fd_.Borrow(); } + BorrowedHandle data_fd() const { return data_fd_.Borrow(); } + Status<int> GetPendingEvents() const; + Status<int> PollPendingEvents(int timeout_ms) const; + + std::vector<ClientChannel::EventSource> GetEventSources() const; private: + LocalHandle data_fd_; + LocalHandle pollin_event_fd_; + LocalHandle pollhup_event_fd_; LocalHandle epoll_fd_; ChannelEventReceiver(const ChannelEventReceiver&) = delete; diff --git a/libs/vr/libpdx_uds/private/uds/channel_manager.h b/libs/vr/libpdx_uds/private/uds/channel_manager.h index 2aca41421a..5f6a514340 100644 --- a/libs/vr/libpdx_uds/private/uds/channel_manager.h +++ b/libs/vr/libpdx_uds/private/uds/channel_manager.h @@ -16,13 +16,11 @@ class ChannelManager : public ChannelManagerInterface { public: static ChannelManager& Get(); - LocalChannelHandle CreateHandle(LocalHandle data_fd, LocalHandle event_fd); - struct ChannelData { - LocalHandle data_fd; - ChannelEventReceiver event_receiver; - }; + LocalChannelHandle CreateHandle(LocalHandle data_fd, + LocalHandle pollin_event_fd, + LocalHandle pollhup_event_fd); - ChannelData* GetChannelData(int32_t handle); + ChannelEventReceiver* GetChannelData(int32_t handle); private: ChannelManager() = default; @@ -30,7 +28,7 @@ class ChannelManager : public ChannelManagerInterface { void CloseHandle(int32_t handle) override; std::mutex mutex_; - std::unordered_map<int32_t, ChannelData> channels_; + std::unordered_map<int32_t, ChannelEventReceiver> channels_; }; } // namespace uds diff --git a/libs/vr/libpdx_uds/private/uds/client_channel.h b/libs/vr/libpdx_uds/private/uds/client_channel.h index 8f607f56a1..7a5ddf40eb 100644 --- a/libs/vr/libpdx_uds/private/uds/client_channel.h +++ b/libs/vr/libpdx_uds/private/uds/client_channel.h @@ -23,11 +23,19 @@ class ClientChannel : public pdx::ClientChannel { uint32_t GetIpcTag() const override { return Endpoint::kIpcTag; } int event_fd() const override { - return channel_data_ ? channel_data_->event_receiver.event_fd().Get() : -1; + return channel_data_ ? channel_data_->event_fd().Get() : -1; } + + std::vector<EventSource> GetEventSources() const override { + if (channel_data_) + return channel_data_->GetEventSources(); + else + return {}; + } + Status<int> GetEventMask(int /*events*/) override { if (channel_data_) - return channel_data_->event_receiver.GetPendingEvents(); + return channel_data_->GetPendingEvents(); else return ErrorStatus(EINVAL); } @@ -74,7 +82,7 @@ class ClientChannel : public pdx::ClientChannel { const iovec* receive_vector, size_t receive_count); LocalChannelHandle channel_handle_; - ChannelManager::ChannelData* channel_data_; + ChannelEventReceiver* channel_data_; std::mutex socket_mutex_; }; diff --git a/libs/vr/libpdx_uds/private/uds/ipc_helper.h b/libs/vr/libpdx_uds/private/uds/ipc_helper.h index 664a0d1a1b..63b5b1078a 100644 --- a/libs/vr/libpdx_uds/private/uds/ipc_helper.h +++ b/libs/vr/libpdx_uds/private/uds/ipc_helper.h @@ -110,10 +110,12 @@ template <typename FileHandleType> class ChannelInfo { public: FileHandleType data_fd; - FileHandleType event_fd; + FileHandleType pollin_event_fd; + FileHandleType pollhup_event_fd; private: - PDX_SERIALIZABLE_MEMBERS(ChannelInfo, data_fd, event_fd); + PDX_SERIALIZABLE_MEMBERS(ChannelInfo, data_fd, pollin_event_fd, + pollhup_event_fd); }; template <typename FileHandleType> diff --git a/libs/vr/libpdx_uds/private/uds/service_endpoint.h b/libs/vr/libpdx_uds/private/uds/service_endpoint.h index a163812702..01ebf6519a 100644 --- a/libs/vr/libpdx_uds/private/uds/service_endpoint.h +++ b/libs/vr/libpdx_uds/private/uds/service_endpoint.h @@ -7,6 +7,7 @@ #include <mutex> #include <string> #include <unordered_map> +#include <utility> #include <vector> #include <pdx/service.h> @@ -139,7 +140,8 @@ class Endpoint : public pdx::Endpoint { Status<void> ReenableEpollEvent(const BorrowedHandle& channel_fd); Channel* GetChannelState(int32_t channel_id); BorrowedHandle GetChannelSocketFd(int32_t channel_id); - BorrowedHandle GetChannelEventFd(int32_t channel_id); + Status<std::pair<BorrowedHandle, BorrowedHandle>> GetChannelEventFd( + int32_t channel_id); int32_t GetChannelId(const BorrowedHandle& channel_fd); Status<void> CreateChannelSocketPair(LocalHandle* local_socket, LocalHandle* remote_socket); diff --git a/libs/vr/libpdx_uds/service_endpoint.cpp b/libs/vr/libpdx_uds/service_endpoint.cpp index 27a56f9fe0..0ee77f43a6 100644 --- a/libs/vr/libpdx_uds/service_endpoint.cpp +++ b/libs/vr/libpdx_uds/service_endpoint.cpp @@ -49,7 +49,9 @@ struct MessageState { } else if (static_cast<size_t>(index) < request.channels.size()) { auto& channel_info = request.channels[index]; *handle = ChannelManager::Get().CreateHandle( - std::move(channel_info.data_fd), std::move(channel_info.event_fd)); + std::move(channel_info.data_fd), + std::move(channel_info.pollin_event_fd), + std::move(channel_info.pollhup_event_fd)); } else { return false; } @@ -69,9 +71,9 @@ struct MessageState { if (auto* channel_data = ChannelManager::Get().GetChannelData(handle.value())) { - ChannelInfo<BorrowedHandle> channel_info; - channel_info.data_fd.Reset(handle.value()); - channel_info.event_fd = channel_data->event_receiver.event_fd(); + ChannelInfo<BorrowedHandle> channel_info{ + channel_data->data_fd(), channel_data->pollin_event_fd(), + channel_data->pollhup_event_fd()}; response.channels.push_back(std::move(channel_info)); return response.channels.size() - 1; } else { @@ -80,12 +82,13 @@ struct MessageState { } Status<ChannelReference> PushChannelHandle(BorrowedHandle data_fd, - BorrowedHandle event_fd) { - if (!data_fd || !event_fd) + BorrowedHandle pollin_event_fd, + BorrowedHandle pollhup_event_fd) { + if (!data_fd || !pollin_event_fd || !pollhup_event_fd) return ErrorStatus{EINVAL}; - ChannelInfo<BorrowedHandle> channel_info; - channel_info.data_fd = std::move(data_fd); - channel_info.event_fd = std::move(event_fd); + ChannelInfo<BorrowedHandle> channel_info{std::move(data_fd), + std::move(pollin_event_fd), + std::move(pollhup_event_fd)}; response.channels.push_back(std::move(channel_info)); return response.channels.size() - 1; } @@ -287,7 +290,6 @@ Status<std::pair<int32_t, Endpoint::ChannelData*>> Endpoint::OnNewChannelLocked( return ErrorStatus(errno); } ChannelData channel_data; - channel_data.event_set.AddDataFd(channel_fd); channel_data.data_fd = std::move(channel_fd); channel_data.channel_state = channel_state; for (;;) { @@ -431,18 +433,21 @@ Status<RemoteChannelHandle> Endpoint::PushChannel(Message* message, return status.error_status(); std::lock_guard<std::mutex> autolock(channel_mutex_); - auto channel_data = OnNewChannelLocked(std::move(local_socket), channel); - if (!channel_data) - return channel_data.error_status(); - *channel_id = channel_data.get().first; + auto channel_data_status = + OnNewChannelLocked(std::move(local_socket), channel); + if (!channel_data_status) + return channel_data_status.error_status(); + + ChannelData* channel_data; + std::tie(*channel_id, channel_data) = channel_data_status.take(); // Flags are ignored for now. // TODO(xiaohuit): Implement those. auto* state = static_cast<MessageState*>(message->GetState()); Status<ChannelReference> ref = state->PushChannelHandle( - remote_socket.Borrow(), - channel_data.get().second->event_set.event_fd().Borrow()); + remote_socket.Borrow(), channel_data->event_set.pollin_event_fd(), + channel_data->event_set.pollhup_event_fd()); if (!ref) return ref.error_status(); state->sockets_to_close.push_back(std::move(remote_socket)); @@ -472,13 +477,15 @@ BorrowedHandle Endpoint::GetChannelSocketFd(int32_t channel_id) { return handle; } -BorrowedHandle Endpoint::GetChannelEventFd(int32_t channel_id) { +Status<std::pair<BorrowedHandle, BorrowedHandle>> Endpoint::GetChannelEventFd( + int32_t channel_id) { std::lock_guard<std::mutex> autolock(channel_mutex_); - BorrowedHandle handle; auto channel_data = channels_.find(channel_id); - if (channel_data != channels_.end()) - handle = channel_data->second.event_set.event_fd().Borrow(); - return handle; + if (channel_data != channels_.end()) { + return {{channel_data->second.event_set.pollin_event_fd(), + channel_data->second.event_set.pollhup_event_fd()}}; + } + return ErrorStatus(ENOENT); } int32_t Endpoint::GetChannelId(const BorrowedHandle& channel_fd) { @@ -593,11 +600,6 @@ Status<void> Endpoint::MessageReceive(Message* message) { } BorrowedHandle channel_fd{event.data.fd}; - if (event.events & (EPOLLRDHUP | EPOLLHUP)) { - BuildCloseMessage(GetChannelId(channel_fd), message); - return {}; - } - return ReceiveMessageForChannel(channel_fd, message); } @@ -616,12 +618,23 @@ Status<void> Endpoint::MessageReply(Message* message, int return_code) { if (return_code < 0) { return CloseChannel(channel_id); } else { - // Reply with the event fd. - auto push_status = state->PushFileHandle(GetChannelEventFd(channel_id)); - state->response_data.clear(); // Just in case... - if (!push_status) - return push_status.error_status(); - return_code = push_status.get(); + // Open messages do not have a payload and may not transfer any channels + // or file descriptors on behalf of the service. + state->response_data.clear(); + state->response.file_descriptors.clear(); + state->response.channels.clear(); + + // Return the channel event-related fds in a single ChannelInfo entry + // with an empty data_fd member. + auto status = GetChannelEventFd(channel_id); + if (!status) + return status.error_status(); + + auto handles = status.take(); + state->response.channels.push_back({BorrowedHandle(), + std::move(handles.first), + std::move(handles.second)}); + return_code = 0; } break; } diff --git a/libs/vr/libpdx_uds/service_framework_tests.cpp b/libs/vr/libpdx_uds/service_framework_tests.cpp index 5943b0a1be..27427162f5 100644 --- a/libs/vr/libpdx_uds/service_framework_tests.cpp +++ b/libs/vr/libpdx_uds/service_framework_tests.cpp @@ -1,5 +1,6 @@ #include <errno.h> #include <fcntl.h> +#include <poll.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <unistd.h> @@ -506,6 +507,37 @@ TEST_F(ServiceFrameworkTest, Impulse) { EXPECT_EQ(-EINVAL, client->SendAsync(invalid_pointer, sizeof(int))); } +// Test impulses. +TEST_F(ServiceFrameworkTest, ImpulseHangup) { + // Create a test service and add it to the dispatcher. + auto service = TestService::Create(kTestService1); + ASSERT_NE(nullptr, service); + ASSERT_EQ(0, dispatcher_->AddService(service)); + + auto client = TestClient::Create(kTestService1); + ASSERT_NE(nullptr, client); + + const int kMaxIterations = 1000; + for (int i = 0; i < kMaxIterations; i++) { + auto impulse_client = TestClient::Create(kTestService1); + ASSERT_NE(nullptr, impulse_client); + + const uint8_t a = (i >> 0) & 0xff; + const uint8_t b = (i >> 8) & 0xff; + const uint8_t c = (i >> 16) & 0xff; + const uint8_t d = (i >> 24) & 0xff; + ImpulsePayload expected_payload = {{a, b, c, d}}; + EXPECT_EQ(0, impulse_client->SendAsync(expected_payload.data(), 4)); + + // Hangup the impulse test client, then send a sync message over client to + // make sure the hangup message is handled before checking the impulse + // payload. + impulse_client = nullptr; + client->GetThisChannelId(); + EXPECT_EQ(expected_payload, service->GetImpulsePayload()); + } +} + // Test Message::PushChannel/Service::PushChannel API. TEST_F(ServiceFrameworkTest, PushChannel) { // Create a test service and add it to the dispatcher. @@ -574,9 +606,7 @@ TEST_F(ServiceFrameworkTest, Ids) { pid_t process_id2; - std::thread thread([&]() { - process_id2 = client->GetThisProcessId(); - }); + std::thread thread([&]() { process_id2 = client->GetThisProcessId(); }); thread.join(); EXPECT_LT(2, process_id2); @@ -614,15 +644,15 @@ TEST_F(ServiceFrameworkTest, PollIn) { auto client = TestClient::Create(kTestService1); ASSERT_NE(nullptr, client); - epoll_event event; - int count = epoll_wait(client->event_fd(), &event, 1, 0); + pollfd pfd{client->event_fd(), POLLIN, 0}; + int count = poll(&pfd, 1, 0); ASSERT_EQ(0, count); client->SendPollInEvent(); - count = epoll_wait(client->event_fd(), &event, 1, -1); + count = poll(&pfd, 1, 10000 /*10s*/); ASSERT_EQ(1, count); - ASSERT_TRUE((EPOLLIN & event.events) != 0); + ASSERT_TRUE((POLLIN & pfd.revents) != 0); } TEST_F(ServiceFrameworkTest, PollHup) { @@ -635,15 +665,15 @@ TEST_F(ServiceFrameworkTest, PollHup) { auto client = TestClient::Create(kTestService1); ASSERT_NE(nullptr, client); - epoll_event event; - int count = epoll_wait(client->event_fd(), &event, 1, 0); + pollfd pfd{client->event_fd(), POLLIN, 0}; + int count = poll(&pfd, 1, 0); ASSERT_EQ(0, count); client->SendPollHupEvent(); - count = epoll_wait(client->event_fd(), &event, 1, -1); + count = poll(&pfd, 1, 10000 /*10s*/); ASSERT_EQ(1, count); - auto event_status = client->GetEventMask(event.events); + auto event_status = client->GetEventMask(pfd.revents); ASSERT_TRUE(event_status.ok()); ASSERT_TRUE((EPOLLHUP & event_status.get()) != 0); } diff --git a/libs/vr/libvrflinger/display_surface.cpp b/libs/vr/libvrflinger/display_surface.cpp index 3d132c95ea..87c823e5b9 100644 --- a/libs/vr/libvrflinger/display_surface.cpp +++ b/libs/vr/libvrflinger/display_surface.cpp @@ -213,8 +213,8 @@ Status<LocalChannelHandle> ApplicationDisplaySurface::OnCreateQueue( ATRACE_NAME("ApplicationDisplaySurface::OnCreateQueue"); ALOGD_IF(TRACE, "ApplicationDisplaySurface::OnCreateQueue: surface_id=%d, " - "meta_size_bytes=%zu", - surface_id(), config.meta_size_bytes); + "user_metadata_size=%zu", + surface_id(), config.user_metadata_size); std::lock_guard<std::mutex> autolock(lock_); auto producer = ProducerQueue::Create(config, UsagePolicy{}); @@ -280,10 +280,10 @@ std::vector<int32_t> DirectDisplaySurface::GetQueueIds() const { Status<LocalChannelHandle> DirectDisplaySurface::OnCreateQueue( Message& /*message*/, const ProducerQueueConfig& config) { ATRACE_NAME("DirectDisplaySurface::OnCreateQueue"); - ALOGD_IF( - TRACE, - "DirectDisplaySurface::OnCreateQueue: surface_id=%d meta_size_bytes=%zu", - surface_id(), config.meta_size_bytes); + ALOGD_IF(TRACE, + "DirectDisplaySurface::OnCreateQueue: surface_id=%d " + "user_metadata_size=%zu", + surface_id(), config.user_metadata_size); std::lock_guard<std::mutex> autolock(lock_); if (!direct_queue_) { diff --git a/services/vr/bufferhubd/Android.mk b/services/vr/bufferhubd/Android.mk index 97f0332d7a..28cf53dd82 100644 --- a/services/vr/bufferhubd/Android.mk +++ b/services/vr/bufferhubd/Android.mk @@ -22,6 +22,9 @@ sourceFiles := \ consumer_queue_channel.cpp \ producer_queue_channel.cpp \ +headerLibraries := \ + libdvr_headers + staticLibraries := \ libperformance \ libpdx_default_transport \ @@ -41,6 +44,7 @@ LOCAL_SRC_FILES := $(sourceFiles) LOCAL_CFLAGS := -DLOG_TAG=\"bufferhubd\" LOCAL_CFLAGS += -DTRACE=0 LOCAL_CFLAGS += -DATRACE_TAG=ATRACE_TAG_GRAPHICS +LOCAL_HEADER_LIBRARIES := $(headerLibraries) LOCAL_STATIC_LIBRARIES := $(staticLibraries) LOCAL_SHARED_LIBRARIES := $(sharedLibraries) LOCAL_MODULE := bufferhubd diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp index 26843c99c8..cdb1f91795 100644 --- a/services/vr/bufferhubd/buffer_hub.cpp +++ b/services/vr/bufferhubd/buffer_hub.cpp @@ -20,8 +20,8 @@ using android::pdx::Channel; using android::pdx::ErrorStatus; using android::pdx::Message; using android::pdx::Status; -using android::pdx::rpc::DispatchRemoteMethod; using android::pdx::default_transport::Endpoint; +using android::pdx::rpc::DispatchRemoteMethod; namespace android { namespace dvr { @@ -53,7 +53,15 @@ std::string BufferHubService::DumpState(size_t /*max_length*/) { stream << " "; stream << std::setw(6) << "Format"; stream << " "; - stream << std::setw(11) << "Usage"; + stream << std::setw(10) << "Usage"; + stream << " "; + stream << std::setw(9) << "Pending"; + stream << " "; + stream << std::setw(18) << "State"; + stream << " "; + stream << std::setw(18) << "Signaled"; + stream << " "; + stream << std::setw(10) << "Index"; stream << " "; stream << "Name"; stream << std::endl; @@ -83,46 +91,15 @@ std::string BufferHubService::DumpState(size_t /*max_length*/) { stream << std::setw(8) << info.usage; stream << std::dec << std::setfill(' '); stream << " "; - stream << info.name; - stream << std::endl; - } - } - - stream << "Active Consumer Buffers:\n"; - stream << std::right; - stream << std::setw(6) << "Id"; - stream << " "; - stream << std::setw(14) << "Geometry"; - stream << " "; - stream << "Name"; - stream << std::endl; - - for (const auto& channel : channels) { - if (channel->channel_type() == BufferHubChannel::kConsumerType) { - BufferHubChannel::BufferInfo info = channel->GetBufferInfo(); - - stream << std::right; - stream << std::setw(6) << info.id; + stream << std::setw(9) << info.pending_count; stream << " "; - - if (info.consumer_count == 0) { - // consumer_count is tracked by producer. When it's zero, producer must - // have already hung up and the consumer is orphaned. - stream << std::setw(14) << "Orphaned."; - stream << (" channel_id=" + std::to_string(channel->channel_id())); - stream << std::endl; - continue; - } - - if (info.format == HAL_PIXEL_FORMAT_BLOB) { - std::string size = std::to_string(info.width) + " B"; - stream << std::setw(14) << size; - } else { - std::string dimensions = std::to_string(info.width) + "x" + - std::to_string(info.height) + "x" + - std::to_string(info.layer_count); - stream << std::setw(14) << dimensions; - } + stream << "0x" << std::hex << std::setfill('0'); + stream << std::setw(16) << info.state; + stream << " "; + stream << "0x" << std::setw(16) << info.signaled_mask; + stream << std::dec << std::setfill(' '); + stream << " "; + stream << std::setw(8) << info.index; stream << " "; stream << info.name; stream << std::endl; @@ -184,6 +161,32 @@ std::string BufferHubService::DumpState(size_t /*max_length*/) { } } + stream << std::endl; + stream << "Orphaned Consumer Buffers:\n"; + stream << std::right; + stream << std::setw(6) << "Id"; + stream << " "; + stream << std::setw(14) << "Geometry"; + stream << " "; + stream << "Name"; + stream << std::endl; + + for (const auto& channel : channels) { + BufferHubChannel::BufferInfo info = channel->GetBufferInfo(); + // consumer_count is tracked by producer. When it's zero, producer must have + // already hung up and the consumer is orphaned. + if (channel->channel_type() == BufferHubChannel::kConsumerType && + info.consumer_count == 0) { + stream << std::right; + stream << std::setw(6) << info.id; + stream << " "; + + stream << std::setw(14) << "Orphaned."; + stream << (" channel_id=" + std::to_string(channel->channel_id())); + stream << std::endl; + } + } + return stream.str(); } @@ -444,6 +447,7 @@ void BufferHubChannel::SignalAvailable() { "BufferHubChannel::SignalAvailable: channel_id=%d buffer_id=%d", channel_id(), buffer_id()); if (!IsDetached()) { + signaled_ = true; const auto status = service_->ModifyChannelEvents(channel_id_, 0, POLLIN); ALOGE_IF(!status, "BufferHubChannel::SignalAvailable: failed to signal availability " @@ -460,6 +464,7 @@ void BufferHubChannel::ClearAvailable() { "BufferHubChannel::ClearAvailable: channel_id=%d buffer_id=%d", channel_id(), buffer_id()); if (!IsDetached()) { + signaled_ = false; const auto status = service_->ModifyChannelEvents(channel_id_, POLLIN, 0); ALOGE_IF(!status, "BufferHubChannel::ClearAvailable: failed to clear availability " diff --git a/services/vr/bufferhubd/buffer_hub.h b/services/vr/bufferhubd/buffer_hub.h index b0df11f2ac..270ac95114 100644 --- a/services/vr/bufferhubd/buffer_hub.h +++ b/services/vr/bufferhubd/buffer_hub.h @@ -53,6 +53,10 @@ class BufferHubChannel : public pdx::Channel { uint32_t layer_count = 0; uint32_t format = 0; uint64_t usage = 0; + size_t pending_count = 0; + uint64_t state = 0; + uint64_t signaled_mask = 0; + uint64_t index = 0; std::string name; // Data filed for producer queue. @@ -60,7 +64,9 @@ class BufferHubChannel : public pdx::Channel { UsagePolicy usage_policy{0, 0, 0, 0}; BufferInfo(int id, size_t consumer_count, uint32_t width, uint32_t height, - uint32_t layer_count, uint32_t format, uint64_t usage, const std::string& name) + uint32_t layer_count, uint32_t format, uint64_t usage, + size_t pending_count, uint64_t state, uint64_t signaled_mask, + uint64_t index, const std::string& name) : id(id), type(kProducerType), consumer_count(consumer_count), @@ -69,6 +75,10 @@ class BufferHubChannel : public pdx::Channel { layer_count(layer_count), format(format), usage(usage), + pending_count(pending_count), + state(state), + signaled_mask(signaled_mask), + index(index), name(name) {} BufferInfo(int id, size_t consumer_count, size_t capacity, @@ -101,6 +111,8 @@ class BufferHubChannel : public pdx::Channel { int channel_id() const { return channel_id_; } bool IsDetached() const { return channel_id_ == kDetachedId; } + bool signaled() const { return signaled_; } + void Detach() { if (channel_type_ == kProducerType) channel_id_ = kDetachedId; @@ -124,6 +136,8 @@ class BufferHubChannel : public pdx::Channel { // buffer if it is detached and re-attached to another channel. int channel_id_; + bool signaled_; + ChannelType channel_type_; BufferHubChannel(const BufferHubChannel&) = delete; diff --git a/services/vr/bufferhubd/bufferhubd.cpp b/services/vr/bufferhubd/bufferhubd.cpp index 1613821290..b27f218eb6 100644 --- a/services/vr/bufferhubd/bufferhubd.cpp +++ b/services/vr/bufferhubd/bufferhubd.cpp @@ -2,6 +2,7 @@ #include <unistd.h> #include <log/log.h> +#include <sys/resource.h> #include <dvr/performance_client_api.h> #include <pdx/service_dispatcher.h> @@ -16,6 +17,23 @@ int main(int, char**) { // We need to be able to create endpoints with full perms. umask(0000); + // Bump up the soft limit of open fd to the hard limit. + struct rlimit64 rlim; + ret = getrlimit64(RLIMIT_NOFILE, &rlim); + LOG_ALWAYS_FATAL_IF(ret != 0, "Failed to get nofile limit."); + + ALOGI("Current nofile limit is %llu/%llu.", rlim.rlim_cur, rlim.rlim_max); + rlim.rlim_cur = rlim.rlim_max; + ret = setrlimit64(RLIMIT_NOFILE, &rlim); + ALOGE_IF(ret < 0, "Failed to set nofile limit, error=%s", strerror(errno)); + + rlim.rlim_cur = -1; + rlim.rlim_max = -1; + if (getrlimit64(RLIMIT_NOFILE, &rlim) < 0) + ALOGE("Failed to get nofile limit."); + else + ALOGI("New nofile limit is %llu/%llu.", rlim.rlim_cur, rlim.rlim_max); + dispatcher = android::pdx::ServiceDispatcher::Create(); CHECK_ERROR(!dispatcher, error, "Failed to create service dispatcher\n"); diff --git a/services/vr/bufferhubd/consumer_channel.cpp b/services/vr/bufferhubd/consumer_channel.cpp index ac6896ae84..a6d2dbb60c 100644 --- a/services/vr/bufferhubd/consumer_channel.cpp +++ b/services/vr/bufferhubd/consumer_channel.cpp @@ -19,9 +19,10 @@ namespace android { namespace dvr { ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, - int channel_id, + int channel_id, uint64_t consumer_state_bit, const std::shared_ptr<Channel> producer) : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), + consumer_state_bit_(consumer_state_bit), producer_(producer) { GetProducer()->AddConsumer(this); } @@ -32,8 +33,6 @@ ConsumerChannel::~ConsumerChannel() { channel_id(), buffer_id()); if (auto producer = GetProducer()) { - if (!released_) // Producer is waiting for our Release. - producer->OnConsumerIgnored(); producer->RemoveConsumer(this); } } @@ -43,6 +42,8 @@ BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { if (auto producer = GetProducer()) { // If producer has not hung up, copy most buffer info from the producer. info = producer->GetBufferInfo(); + } else { + info.signaled_mask = consumer_state_bit(); } info.id = buffer_id(); return info; @@ -55,6 +56,9 @@ std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { void ConsumerChannel::HandleImpulse(Message& message) { ATRACE_NAME("ConsumerChannel::HandleImpulse"); switch (message.GetOp()) { + case BufferHubRPC::ConsumerAcquire::Opcode: + OnConsumerAcquire(message); + break; case BufferHubRPC::ConsumerRelease::Opcode: OnConsumerRelease(message, {}); break; @@ -70,7 +74,7 @@ bool ConsumerChannel::HandleMessage(Message& message) { switch (message.GetOp()) { case BufferHubRPC::GetBuffer::Opcode: DispatchRemoteMethod<BufferHubRPC::GetBuffer>( - *producer, &ProducerChannel::OnGetBuffer, message); + *this, &ConsumerChannel::OnGetBuffer, message); return true; case BufferHubRPC::NewConsumer::Opcode: @@ -98,9 +102,18 @@ bool ConsumerChannel::HandleMessage(Message& message) { } } -Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>> -ConsumerChannel::OnConsumerAcquire(Message& message, - std::size_t metadata_size) { +Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer( + Message& /*message*/) { + ATRACE_NAME("ConsumerChannel::OnGetBuffer"); + ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id()); + if (auto producer = GetProducer()) { + return {producer->GetBuffer(consumer_state_bit_)}; + } else { + return ErrorStatus(EPIPE); + } +} + +Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) { ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); auto producer = GetProducer(); if (!producer) @@ -114,7 +127,7 @@ ConsumerChannel::OnConsumerAcquire(Message& message, producer->buffer_id()); return ErrorStatus(EBUSY); } else { - auto status = producer->OnConsumerAcquire(message, metadata_size); + auto status = producer->OnConsumerAcquire(message); if (status) { ClearAvailable(); acquired_ = true; diff --git a/services/vr/bufferhubd/consumer_channel.h b/services/vr/bufferhubd/consumer_channel.h index 208a002272..55cf96920d 100644 --- a/services/vr/bufferhubd/consumer_channel.h +++ b/services/vr/bufferhubd/consumer_channel.h @@ -12,32 +12,35 @@ namespace dvr { // Consumer channels are attached to a Producer channel class ConsumerChannel : public BufferHubChannel { public: + using BorrowedHandle = pdx::BorrowedHandle; using Channel = pdx::Channel; using Message = pdx::Message; ConsumerChannel(BufferHubService* service, int buffer_id, int channel_id, + uint64_t consumer_state_bit, const std::shared_ptr<Channel> producer); ~ConsumerChannel() override; bool HandleMessage(Message& message) override; void HandleImpulse(Message& message) override; + uint64_t consumer_state_bit() const { return consumer_state_bit_; } BufferInfo GetBufferInfo() const override; bool OnProducerPosted(); void OnProducerClosed(); private: - using MetaData = pdx::rpc::BufferWrapper<std::uint8_t*>; - std::shared_ptr<ProducerChannel> GetProducer() const; - pdx::Status<std::pair<BorrowedFence, MetaData>> OnConsumerAcquire( - Message& message, std::size_t metadata_size); + pdx::Status<BufferDescription<BorrowedHandle>> OnGetBuffer(Message& message); + + pdx::Status<LocalFence> OnConsumerAcquire(Message& message); pdx::Status<void> OnConsumerRelease(Message& message, LocalFence release_fence); pdx::Status<void> OnConsumerSetIgnore(Message& message, bool ignore); + uint64_t consumer_state_bit_{0}; bool acquired_{false}; bool released_{true}; bool ignored_{false}; // True if we are ignoring events. diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp index f447e00c31..4d430012f1 100644 --- a/services/vr/bufferhubd/consumer_queue_channel.cpp +++ b/services/vr/bufferhubd/consumer_queue_channel.cpp @@ -15,10 +15,11 @@ namespace dvr { ConsumerQueueChannel::ConsumerQueueChannel( BufferHubService* service, int buffer_id, int channel_id, - const std::shared_ptr<Channel>& producer) + const std::shared_ptr<Channel>& producer, bool silent) : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), producer_(producer), - capacity_(0) { + capacity_(0), + silent_(silent) { GetProducer()->AddConsumer(this); } @@ -83,23 +84,30 @@ BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { void ConsumerQueueChannel::RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) { ALOGD_IF(TRACE, - "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu", - producer_channel->buffer_id(), slot); - pending_buffer_slots_.emplace(producer_channel, slot); - - // Signal the client that there is new buffer available throught POLLIN. - SignalAvailable(); + "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d " + "slot=%zu silent=%d", + buffer_id(), producer_channel->buffer_id(), slot, silent_); + // Only register buffers if the queue is not silent. + if (!silent_) { + pending_buffer_slots_.emplace(producer_channel, slot); + + // Signal the client that there is new buffer available. + SignalAvailable(); + } } Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers"); - ALOGD_IF( - TRACE, - "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to " - "import: %zu", - pending_buffer_slots_.size()); + ALOGD_IF(TRACE, + "ConsumerQueueChannel::OnConsumerQueueImportBuffers: " + "pending_buffer_slots=%zu", + pending_buffer_slots_.size()); + + // Indicate this is a silent queue that will not import buffers. + if (silent_) + return ErrorStatus(EBADR); while (!pending_buffer_slots_.empty()) { auto producer_channel = pending_buffer_slots_.front().first.lock(); diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h index aa3f531c7f..8437c4cd04 100644 --- a/services/vr/bufferhubd/consumer_queue_channel.h +++ b/services/vr/bufferhubd/consumer_queue_channel.h @@ -19,7 +19,7 @@ class ConsumerQueueChannel : public BufferHubChannel { using RemoteChannelHandle = pdx::RemoteChannelHandle; ConsumerQueueChannel(BufferHubService* service, int buffer_id, int channel_id, - const std::shared_ptr<Channel>& producer); + const std::shared_ptr<Channel>& producer, bool silent); ~ConsumerQueueChannel() override; bool HandleMessage(Message& message) override; @@ -54,6 +54,10 @@ class ConsumerQueueChannel : public BufferHubChannel { // Tracks how many buffers have this queue imported. size_t capacity_; + // A silent queue does not signal or export buffers. It is only used to spawn + // another consumer queue. + bool silent_; + ConsumerQueueChannel(const ConsumerQueueChannel&) = delete; void operator=(const ConsumerQueueChannel&) = delete; }; diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp index b2db795717..716db5eeac 100644 --- a/services/vr/bufferhubd/producer_channel.cpp +++ b/services/vr/bufferhubd/producer_channel.cpp @@ -2,6 +2,8 @@ #include <log/log.h> #include <sync/sync.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> #include <sys/poll.h> #include <utils/Trace.h> @@ -24,24 +26,88 @@ using android::pdx::rpc::WrapBuffer; namespace android { namespace dvr { +namespace { + +static inline uint64_t FindNextClearedBit(uint64_t bits) { + return ~bits - (~bits & (~bits - 1)); +} + +} // namespace + ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, - uint64_t usage, size_t meta_size_bytes, + uint64_t usage, size_t user_metadata_size, int* error) : BufferHubChannel(service, channel_id, channel_id, kProducerType), pending_consumers_(0), producer_owns_(true), - meta_size_bytes_(meta_size_bytes), - meta_(meta_size_bytes ? new uint8_t[meta_size_bytes] : nullptr) { - const int ret = buffer_.Alloc(width, height, layer_count, format, usage); - if (ret < 0) { + user_metadata_size_(user_metadata_size), + metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize + + user_metadata_size) { + if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) { ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s", strerror(-ret)); *error = ret; return; } + if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1, + /*layer_count=*/1, + BufferHubDefs::kMetadataFormat, + BufferHubDefs::kMetadataUsage)) { + ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s", + strerror(-ret)); + *error = ret; + return; + } + + void* metadata_ptr = nullptr; + if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0, + /*y=*/0, metadata_buf_size_, + /*height=*/1, &metadata_ptr)) { + ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata."); + *error = -ret; + return; + } + metadata_header_ = + reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr); + + // Using placement new here to reuse shared memory instead of new allocation + // and also initialize the value to zero. + buffer_state_ = + new (&metadata_header_->buffer_state) std::atomic<uint64_t>(0); + fence_state_ = + new (&metadata_header_->fence_state) std::atomic<uint64_t>(0); + + acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); + release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); + if (!acquire_fence_fd_ || !release_fence_fd_) { + ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences."); + *error = -EIO; + return; + } + + dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); + if (!dummy_fence_fd_) { + ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences."); + *error = -EIO; + return; + } + + epoll_event event; + event.events = 0; + event.data.u64 = 0ULL; + if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(), + &event) < 0) { + ALOGE( + "ProducerChannel::ProducerChannel: Failed to modify the shared " + "release fence to include the dummy fence: %s", + strerror(errno)); + *error = -EIO; + return; + } + // Success. *error = 0; } @@ -49,11 +115,11 @@ ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id, Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create( BufferHubService* service, int channel_id, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage, - size_t meta_size_bytes) { + size_t user_metadata_size) { int error; std::shared_ptr<ProducerChannel> producer( new ProducerChannel(service, channel_id, width, height, layer_count, - format, usage, meta_size_bytes, &error)); + format, usage, user_metadata_size, &error)); if (error < 0) return ErrorStatus(-error); else @@ -62,16 +128,24 @@ Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create( ProducerChannel::~ProducerChannel() { ALOGD_IF(TRACE, - "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d", - channel_id(), buffer_id()); + "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d " + "state=%" PRIx64 ".", + channel_id(), buffer_id(), buffer_state_->load()); for (auto consumer : consumer_channels_) consumer->OnProducerClosed(); } BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const { + // Derive the mask of signaled buffers in this producer / consumer set. + uint64_t signaled_mask = signaled() ? BufferHubDefs::kProducerStateBit : 0; + for (const ConsumerChannel* consumer : consumer_channels_) { + signaled_mask |= consumer->signaled() ? consumer->consumer_state_bit() : 0; + } + return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(), buffer_.height(), buffer_.layer_count(), buffer_.format(), - buffer_.usage(), name_); + buffer_.usage(), pending_consumers_, buffer_state_->load(), + signaled_mask, metadata_header_->queue_index, name_); } void ProducerChannel::HandleImpulse(Message& message) { @@ -80,6 +154,9 @@ void ProducerChannel::HandleImpulse(Message& message) { case BufferHubRPC::ProducerGain::Opcode: OnProducerGain(message); break; + case BufferHubRPC::ProducerPost::Opcode: + OnProducerPost(message, {}); + break; } } @@ -121,16 +198,26 @@ bool ProducerChannel::HandleMessage(Message& message) { } } -Status<NativeBufferHandle<BorrowedHandle>> ProducerChannel::OnGetBuffer( +BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer( + uint64_t buffer_state_bit) { + return { + buffer_, metadata_buffer_, buffer_id(), + buffer_state_bit, acquire_fence_fd_.Borrow(), release_fence_fd_.Borrow()}; +} + +Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer( Message& /*message*/) { ATRACE_NAME("ProducerChannel::OnGetBuffer"); - ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d", buffer_id()); - return {NativeBufferHandle<BorrowedHandle>(buffer_, buffer_id())}; + ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx64 ".", + buffer_id(), buffer_state_->load()); + return {GetBuffer(BufferHubDefs::kProducerStateBit)}; } Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { ATRACE_NAME("ProducerChannel::CreateConsumer"); - ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d", buffer_id()); + ALOGD_IF(TRACE, + "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", + buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); @@ -141,8 +228,21 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { return ErrorStatus(ENOMEM); } - auto consumer = std::make_shared<ConsumerChannel>( - service(), buffer_id(), channel_id, shared_from_this()); + // Try find the next consumer state bit which has not been claimed by any + // consumer yet. + uint64_t consumer_state_bit = FindNextClearedBit( + active_consumer_bit_mask_ | orphaned_consumer_bit_mask_ | + BufferHubDefs::kProducerStateBit); + if (consumer_state_bit == 0ULL) { + ALOGE( + "ProducerChannel::CreateConsumer: reached the maximum mumber of " + "consumers per producer: 63."); + return ErrorStatus(E2BIG); + } + + auto consumer = + std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id, + consumer_state_bit, shared_from_this()); const auto channel_status = service()->SetChannel(channel_id, consumer); if (!channel_status) { ALOGE( @@ -152,12 +252,14 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { return ErrorStatus(ENOMEM); } - if (!producer_owns_) { + if (!producer_owns_ && + !BufferHubDefs::IsBufferReleased(buffer_state_->load())) { // Signal the new consumer when adding it to a posted producer. if (consumer->OnProducerPosted()) pending_consumers_++; } + active_consumer_bit_mask_ |= consumer_state_bit; return {status.take()}; } @@ -168,8 +270,7 @@ Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) { } Status<void> ProducerChannel::OnProducerPost( - Message&, LocalFence acquire_fence, - BufferWrapper<std::vector<std::uint8_t>> metadata) { + Message&, LocalFence acquire_fence) { ATRACE_NAME("ProducerChannel::OnProducerPost"); ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: buffer_id=%d", buffer_id()); if (!producer_owns_) { @@ -177,27 +278,45 @@ Status<void> ProducerChannel::OnProducerPost( return ErrorStatus(EBUSY); } - if (meta_size_bytes_ != metadata.size()) { - ALOGD_IF(TRACE, - "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu " - "got size=%zu", - meta_size_bytes_, metadata.size()); - return ErrorStatus(EINVAL); + epoll_event event; + event.events = 0; + event.data.u64 = 0ULL; + int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD, + dummy_fence_fd_.Get(), &event); + ALOGE_IF(ret < 0, + "ProducerChannel::OnProducerPost: Failed to modify the shared " + "release fence to include the dummy fence: %s", + strerror(errno)); + + eventfd_t dummy_fence_count = 0ULL; + if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) { + const int error = errno; + if (error != EAGAIN) { + ALOGE( + "ProducerChannel::ProducerChannel: Failed to read dummy fence, " + "error: %s", + strerror(error)); + return ErrorStatus(error); + } } - std::copy(metadata.begin(), metadata.end(), meta_.get()); + ALOGW_IF(dummy_fence_count > 0, + "ProducerChannel::ProducerChannel: %" PRIu64 + " dummy fence(s) was signaled during last release/gain cycle " + "buffer_id=%d.", + dummy_fence_count, buffer_id()); + post_fence_ = std::move(acquire_fence); producer_owns_ = false; - // Signal any interested consumers. If there are none, automatically release - // the buffer. + // Signal any interested consumers. If there are none, the buffer will stay + // in posted state until a consumer comes online. This behavior guarantees + // that no frame is silently dropped. pending_consumers_ = 0; for (auto consumer : consumer_channels_) { if (consumer->OnProducerPosted()) pending_consumers_++; } - if (pending_consumers_ == 0) - SignalAvailable(); ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: %d pending consumers", pending_consumers_); @@ -214,8 +333,13 @@ Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) { } // There are still pending consumers, return busy. - if (pending_consumers_ > 0) + if (pending_consumers_ > 0) { + ALOGE( + "ProducerChannel::OnGain: Producer (id=%d) is gaining a buffer that " + "still has %d pending consumer(s).", + buffer_id(), pending_consumers_); return ErrorStatus(EBUSY); + } ClearAvailable(); producer_owns_ = true; @@ -223,9 +347,7 @@ Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) { return {std::move(returned_fence_)}; } -Status<std::pair<BorrowedFence, BufferWrapper<std::uint8_t*>>> -ProducerChannel::OnConsumerAcquire(Message& /*message*/, - std::size_t metadata_size) { +Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) { ATRACE_NAME("ProducerChannel::OnConsumerAcquire"); ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d", buffer_id()); @@ -236,12 +358,7 @@ ProducerChannel::OnConsumerAcquire(Message& /*message*/, // Return a borrowed fd to avoid unnecessary duplication of the underlying fd. // Serialization just needs to read the handle. - if (metadata_size == 0) - return {std::make_pair(post_fence_.borrow(), - WrapBuffer<std::uint8_t>(nullptr, 0))}; - else - return {std::make_pair(post_fence_.borrow(), - WrapBuffer(meta_.get(), meta_size_bytes_))}; + return {std::move(post_fence_)}; } Status<void> ProducerChannel::OnConsumerRelease(Message&, @@ -273,17 +390,75 @@ Status<void> ProducerChannel::OnConsumerRelease(Message&, } OnConsumerIgnored(); + if (pending_consumers_ == 0) { + // Clear the producer bit atomically to transit into released state. This + // has to done by BufferHub as it requries synchronization among all + // consumers. + BufferHubDefs::ModifyBufferState(buffer_state_, + BufferHubDefs::kProducerStateBit, 0ULL); + ALOGD_IF(TRACE, + "ProducerChannel::OnConsumerRelease: releasing last consumer: " + "buffer_id=%d state=%" PRIx64 ".", + buffer_id(), buffer_state_->load()); + + if (orphaned_consumer_bit_mask_) { + ALOGW( + "ProducerChannel::OnConsumerRelease: orphaned buffer detected " + "during the this acquire/release cycle: id=%d orphaned=0x%" PRIx64 + " queue_index=%" PRIu64 ".", + buffer_id(), orphaned_consumer_bit_mask_, + metadata_header_->queue_index); + orphaned_consumer_bit_mask_ = 0; + } + + SignalAvailable(); + } + + ALOGE_IF(pending_consumers_ && + BufferHubDefs::IsBufferReleased(buffer_state_->load()), + "ProducerChannel::OnConsumerRelease: buffer state inconsistent: " + "pending_consumers=%d, buffer buffer is in releaed state.", + pending_consumers_); return {}; } void ProducerChannel::OnConsumerIgnored() { - if (!--pending_consumers_) - SignalAvailable(); + if (pending_consumers_ == 0) { + ALOGE("ProducerChannel::OnConsumerIgnored: no pending consumer."); + return; + } + + --pending_consumers_; ALOGD_IF(TRACE, "ProducerChannel::OnConsumerIgnored: buffer_id=%d %d consumers left", buffer_id(), pending_consumers_); } +void ProducerChannel::OnConsumerOrphaned(ConsumerChannel* channel) { + // Ignore the orphaned consumer. + OnConsumerIgnored(); + + const uint64_t consumer_state_bit = channel->consumer_state_bit(); + ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_bit, + "ProducerChannel::OnConsumerOrphaned: Consumer " + "(consumer_state_bit=%" PRIx64 ") is already orphaned.", + consumer_state_bit); + orphaned_consumer_bit_mask_ |= consumer_state_bit; + + // Atomically clear the fence state bit as an orphaned consumer will never + // signal a release fence. Also clear the buffer state as it won't be released + // as well. + fence_state_->fetch_and(~consumer_state_bit); + BufferHubDefs::ModifyBufferState(buffer_state_, consumer_state_bit, 0ULL); + + ALOGW( + "ProducerChannel::OnConsumerOrphaned: detected new orphaned consumer " + "buffer_id=%d consumer_state_bit=%" PRIx64 " queue_index=%" PRIu64 + " buffer_state=%" PRIx64 " fence_state=%" PRIx64 ".", + buffer_id(), consumer_state_bit, metadata_header_->queue_index, + buffer_state_->load(), fence_state_->load()); +} + Status<void> ProducerChannel::OnProducerMakePersistent(Message& message, const std::string& name, int user_id, @@ -335,6 +510,40 @@ void ProducerChannel::AddConsumer(ConsumerChannel* channel) { void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) { consumer_channels_.erase( std::find(consumer_channels_.begin(), consumer_channels_.end(), channel)); + active_consumer_bit_mask_ &= ~channel->consumer_state_bit(); + + const uint64_t buffer_state = buffer_state_->load(); + if (BufferHubDefs::IsBufferPosted(buffer_state) || + BufferHubDefs::IsBufferAcquired(buffer_state)) { + // The consumer client is being destoryed without releasing. This could + // happen in corner cases when the consumer crashes. Here we mark it + // orphaned before remove it from producer. + OnConsumerOrphaned(channel); + } + + if (BufferHubDefs::IsBufferReleased(buffer_state) || + BufferHubDefs::IsBufferGained(buffer_state)) { + // The consumer is being close while it is suppose to signal a release + // fence. Signal the dummy fence here. + if (fence_state_->load() & channel->consumer_state_bit()) { + epoll_event event; + event.events = EPOLLIN; + event.data.u64 = channel->consumer_state_bit(); + if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD, + dummy_fence_fd_.Get(), &event) < 0) { + ALOGE( + "ProducerChannel::RemoveConsumer: Failed to modify the shared " + "release fence to include the dummy fence: %s", + strerror(errno)); + return; + } + ALOGW( + "ProducerChannel::RemoveConsumer: signal dummy release fence " + "buffer_id=%d", + buffer_id()); + eventfd_write(dummy_fence_fd_.Get(), 1); + } + } } // Returns true if either the user or group ids match the owning ids or both @@ -350,10 +559,12 @@ bool ProducerChannel::CheckAccess(int euid, int egid) { // Returns true if the given parameters match the underlying buffer parameters. bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, - uint64_t usage, size_t meta_size_bytes) { - return meta_size_bytes == meta_size_bytes_ && buffer_.width() == width && - buffer_.height() == height && buffer_.layer_count() == layer_count && - buffer_.format() == format && buffer_.usage() == usage; + uint64_t usage, + size_t user_metadata_size) { + return user_metadata_size == user_metadata_size_ && + buffer_.width() == width && buffer_.height() == height && + buffer_.layer_count() == layer_count && buffer_.format() == format && + buffer_.usage() == usage; } } // namespace dvr diff --git a/services/vr/bufferhubd/producer_channel.h b/services/vr/bufferhubd/producer_channel.h index 5ada47880e..e280f4de8b 100644 --- a/services/vr/bufferhubd/producer_channel.h +++ b/services/vr/bufferhubd/producer_channel.h @@ -33,7 +33,7 @@ class ProducerChannel : public BufferHubChannel { static pdx::Status<std::shared_ptr<ProducerChannel>> Create( BufferHubService* service, int channel_id, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage, - size_t meta_size_bytes); + size_t user_metadata_size); ~ProducerChannel() override; @@ -42,24 +42,25 @@ class ProducerChannel : public BufferHubChannel { BufferInfo GetBufferInfo() const override; - pdx::Status<NativeBufferHandle<BorrowedHandle>> OnGetBuffer(Message& message); + BufferDescription<BorrowedHandle> GetBuffer(uint64_t buffer_state_bit); pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message); pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message); - pdx::Status<std::pair<BorrowedFence, BufferWrapper<std::uint8_t*>>> - OnConsumerAcquire(Message& message, std::size_t metadata_size); + pdx::Status<LocalFence> OnConsumerAcquire(Message& message); pdx::Status<void> OnConsumerRelease(Message& message, LocalFence release_fence); void OnConsumerIgnored(); + void OnConsumerOrphaned(ConsumerChannel* channel); void AddConsumer(ConsumerChannel* channel); void RemoveConsumer(ConsumerChannel* channel); bool CheckAccess(int euid, int egid); bool CheckParameters(uint32_t width, uint32_t height, uint32_t layer_count, - uint32_t format, uint64_t usage, size_t meta_size_bytes); + uint32_t format, uint64_t usage, + size_t user_metadata_size); pdx::Status<void> OnProducerMakePersistent(Message& message, const std::string& name, @@ -74,11 +75,28 @@ class ProducerChannel : public BufferHubChannel { IonBuffer buffer_; + // IonBuffer that is shared between bufferhubd, producer, and consumers. + IonBuffer metadata_buffer_; + BufferHubDefs::MetadataHeader* metadata_header_ = nullptr; + std::atomic<uint64_t>* buffer_state_ = nullptr; + std::atomic<uint64_t>* fence_state_ = nullptr; + + // All active consumer bits. Valid bits are the lower 63 bits, while the + // highest bit is reserved for the producer and should not be set. + uint64_t active_consumer_bit_mask_{0ULL}; + // All orphaned consumer bits. Valid bits are the lower 63 bits, while the + // highest bit is reserved for the producer and should not be set. + uint64_t orphaned_consumer_bit_mask_{0ULL}; + bool producer_owns_; LocalFence post_fence_; LocalFence returned_fence_; - size_t meta_size_bytes_; - std::unique_ptr<uint8_t[]> meta_; + size_t user_metadata_size_; // size of user requested buffer buffer size. + size_t metadata_buf_size_; // size of the ion buffer that holds metadata. + + pdx::LocalHandle acquire_fence_fd_; + pdx::LocalHandle release_fence_fd_; + pdx::LocalHandle dummy_fence_fd_; static constexpr int kNoCheckId = -1; static constexpr int kUseCallerId = 0; @@ -92,11 +110,10 @@ class ProducerChannel : public BufferHubChannel { ProducerChannel(BufferHubService* service, int channel, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, - uint64_t usage, size_t meta_size_bytes, int* error); + uint64_t usage, size_t user_metadata_size, int* error); - pdx::Status<void> OnProducerPost( - Message& message, LocalFence acquire_fence, - BufferWrapper<std::vector<std::uint8_t>> metadata); + pdx::Status<BufferDescription<BorrowedHandle>> OnGetBuffer(Message& message); + pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence); pdx::Status<LocalFence> OnProducerGain(Message& message); ProducerChannel(const ProducerChannel&) = delete; diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp index b8bb728b70..c0c48c2dc1 100644 --- a/services/vr/bufferhubd/producer_queue_channel.cpp +++ b/services/vr/bufferhubd/producer_queue_channel.cpp @@ -7,8 +7,8 @@ using android::pdx::ErrorStatus; using android::pdx::Message; -using android::pdx::Status; using android::pdx::RemoteChannelHandle; +using android::pdx::Status; using android::pdx::rpc::DispatchRemoteMethod; namespace android { @@ -96,10 +96,12 @@ BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const { } Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue( - Message& message) { + Message& message, bool silent) { ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue"); - ALOGD_IF(TRACE, "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d", - channel_id()); + ALOGD_IF( + TRACE, + "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d", + channel_id(), silent); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); @@ -112,7 +114,7 @@ Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue( } auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>( - service(), buffer_id(), channel_id, shared_from_this()); + service(), buffer_id(), channel_id, shared_from_this(), silent); // Register the existing buffers with the new consumer queue. for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { @@ -222,7 +224,7 @@ ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width, auto producer_channel_status = ProducerChannel::Create(service(), buffer_id, width, height, layer_count, - format, usage, config_.meta_size_bytes); + format, usage, config_.user_metadata_size); if (!producer_channel_status) { ALOGE( "ProducerQueueChannel::AllocateBuffer: Failed to create producer " diff --git a/services/vr/bufferhubd/producer_queue_channel.h b/services/vr/bufferhubd/producer_queue_channel.h index fd519c55e0..e825f47774 100644 --- a/services/vr/bufferhubd/producer_queue_channel.h +++ b/services/vr/bufferhubd/producer_queue_channel.h @@ -26,7 +26,7 @@ class ProducerQueueChannel : public BufferHubChannel { // Returns a handle for the service channel, as well as the size of the // metadata associated with the queue. pdx::Status<pdx::RemoteChannelHandle> OnCreateConsumerQueue( - pdx::Message& message); + pdx::Message& message, bool silent); pdx::Status<QueueInfo> OnGetQueueInfo(pdx::Message& message); |