summaryrefslogtreecommitdiff
path: root/profcollectd/libprofcollectd/scheduler.rs
blob: 31a495a52dd2cb1374dd61875335d13c594b667a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

//! ProfCollect tracing scheduler.

use std::fs;
use std::path::Path;
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

use crate::config::{Config, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
use crate::trace_provider::{self, TraceProvider};
use anyhow::{anyhow, ensure, Context, Result};

pub struct Scheduler {
    /// Signal to terminate the periodic collection worker thread, None if periodic collection is
    /// not scheduled.
    termination_ch: Option<SyncSender<()>>,
    /// The preferred trace provider for the system.
    trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
}

impl Scheduler {
    pub fn new() -> Result<Self> {
        let p = trace_provider::get_trace_provider()?;
        Ok(Scheduler { termination_ch: None, trace_provider: p })
    }

    fn is_scheduled(&self) -> bool {
        self.termination_ch.is_some()
    }

    pub fn schedule_periodic(&mut self, config: &Config) -> Result<()> {
        ensure!(!self.is_scheduled(), "Already scheduled.");

        let (sender, receiver) = sync_channel(1);
        self.termination_ch = Some(sender);

        // Clone config and trace_provider ARC for the worker thread.
        let config = config.clone();
        let trace_provider = self.trace_provider.clone();

        thread::spawn(move || {
            loop {
                match receiver.recv_timeout(config.collection_interval) {
                    Ok(_) => break,
                    Err(_) => {
                        // Did not receive a termination signal, initiate trace event.
                        if check_space_limit(*TRACE_OUTPUT_DIR, &config).unwrap() {
                            trace_provider.lock().unwrap().trace(
                                &TRACE_OUTPUT_DIR,
                                "periodic",
                                &config.sampling_period,
                            );
                        }
                    }
                }
            }
        });
        Ok(())
    }

    pub fn terminate_periodic(&mut self) -> Result<()> {
        self.termination_ch
            .as_ref()
            .ok_or_else(|| anyhow!("Not scheduled"))?
            .send(())
            .context("Scheduler worker disappeared.")?;
        self.termination_ch = None;
        Ok(())
    }

    pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> {
        let trace_provider = self.trace_provider.clone();
        if check_space_limit(*TRACE_OUTPUT_DIR, config)? {
            trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period);
        }
        Ok(())
    }

    pub fn process(&self, config: &Config) -> Result<()> {
        let trace_provider = self.trace_provider.clone();
        trace_provider
            .lock()
            .unwrap()
            .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
            .context("Failed to process profiles.")?;
        Ok(())
    }

    pub fn get_trace_provider_name(&self) -> &'static str {
        self.trace_provider.lock().unwrap().get_name()
    }
}

/// Run if space usage is under limit.
fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
    // Returns the size of a directory, non-recursive.
    let dir_size = |path| -> Result<u64> {
        fs::read_dir(path)?.try_fold(0, |acc, file| {
            let metadata = file?.metadata()?;
            let size = if metadata.is_file() { metadata.len() } else { 0 };
            Ok(acc + size)
        })
    };

    if dir_size(path)? > config.max_trace_limit {
        log::error!("trace storage exhausted.");
        return Ok(false);
    }
    Ok(true)
}