1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use crate::{
buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher,
BufferSubscriber, Frame, StreamConfig,
};
/// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber.
pub struct BufferPoolPublisher {
stream_config: StreamConfig,
buffer_pool: BufferPool,
subscription: SharedBufferSubscription,
subscriber: Option<Box<dyn BufferSubscriber>>,
}
impl BufferPoolPublisher {
/// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create
/// all buffers at initialization using the stream_config.
pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> {
BufferPool::new(size, stream_config).map(|buffer_pool| Self {
stream_config,
buffer_pool,
subscription: SharedBufferSubscription::new(),
subscriber: None,
})
}
/// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from
/// [BufferPool] and sent over to the [BufferSubscriber].
pub fn send_next_frame(&mut self, present_time: Instant) -> bool {
if let Some(subscriber) = self.subscriber.as_mut() {
if self.subscription.take_request() {
if let Some(buffer) = self.buffer_pool.next_buffer() {
let frame = Frame { buffer, present_time, fence: 0 };
subscriber.on_next(frame);
return true;
}
}
}
false
}
}
impl BufferPublisher for BufferPoolPublisher {
fn get_publisher_stream_config(&self) -> StreamConfig {
self.stream_config
}
fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
assert!(self.subscriber.is_none());
self.subscriber = Some(Box::new(subscriber));
self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber());
}
}
#[cfg(test)]
mod test {
use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
use super::*;
use crate::{
subscribers::{
testing::{TestSubscriber, TestingSubscriberEvent},
SharedSubscriber,
},
StreamConfig,
};
const STREAM_CONFIG: StreamConfig = StreamConfig {
width: 1,
height: 1,
layers: 1,
format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
stride: 0,
};
#[test]
fn test_send_next_frame() {
let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap();
buffer_pool_publisher.subscribe(subscriber.clone());
subscriber.map_inner(|s| s.request(1));
assert!(buffer_pool_publisher.send_next_frame(Instant::now()));
let events = subscriber.map_inner_mut(|s| s.take_events());
assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0);
}
}
|