summaryrefslogtreecommitdiff
path: root/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs
blob: c5c1fd37c1f9f6f6e357ea0454ed1e7530bd1e5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Instant;

use crate::{
    buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher,
    BufferSubscriber, Frame, StreamConfig,
};

/// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber.
pub struct BufferPoolPublisher {
    stream_config: StreamConfig,
    buffer_pool: BufferPool,
    subscription: SharedBufferSubscription,
    subscriber: Option<Box<dyn BufferSubscriber>>,
}

impl BufferPoolPublisher {
    /// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create
    /// all buffers at initialization using the stream_config.
    pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> {
        BufferPool::new(size, stream_config).map(|buffer_pool| Self {
            stream_config,
            buffer_pool,
            subscription: SharedBufferSubscription::new(),
            subscriber: None,
        })
    }

    /// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from
    /// [BufferPool] and sent over to the [BufferSubscriber].
    pub fn send_next_frame(&mut self, present_time: Instant) -> bool {
        if let Some(subscriber) = self.subscriber.as_mut() {
            if self.subscription.take_request() {
                if let Some(buffer) = self.buffer_pool.next_buffer() {
                    let frame = Frame { buffer, present_time, fence: 0 };

                    subscriber.on_next(frame);
                    return true;
                }
            }
        }
        false
    }
}

impl BufferPublisher for BufferPoolPublisher {
    fn get_publisher_stream_config(&self) -> StreamConfig {
        self.stream_config
    }

    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
        assert!(self.subscriber.is_none());

        self.subscriber = Some(Box::new(subscriber));
        self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber());
    }
}

#[cfg(test)]
mod test {
    use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};

    use super::*;

    use crate::{
        subscribers::{
            testing::{TestSubscriber, TestingSubscriberEvent},
            SharedSubscriber,
        },
        StreamConfig,
    };

    const STREAM_CONFIG: StreamConfig = StreamConfig {
        width: 1,
        height: 1,
        layers: 1,
        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
        stride: 0,
    };

    #[test]
    fn test_send_next_frame() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));

        let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap();
        buffer_pool_publisher.subscribe(subscriber.clone());

        subscriber.map_inner(|s| s.request(1));

        assert!(buffer_pool_publisher.send_next_frame(Instant::now()));

        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
        assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0);
    }
}