summaryrefslogtreecommitdiff
path: root/profcollectd/libprofcollectd/scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'profcollectd/libprofcollectd/scheduler.rs')
-rw-r--r--profcollectd/libprofcollectd/scheduler.rs107
1 files changed, 18 insertions, 89 deletions
diff --git a/profcollectd/libprofcollectd/scheduler.rs b/profcollectd/libprofcollectd/scheduler.rs
index f58c4995..cfd0381f 100644
--- a/profcollectd/libprofcollectd/scheduler.rs
+++ b/profcollectd/libprofcollectd/scheduler.rs
@@ -16,14 +16,10 @@
//! ProfCollect tracing scheduler.
-use std::fs;
-use std::mem;
-use std::path::Path;
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
-use std::time::{Duration, Instant};
use crate::config::{Config, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
use crate::trace_provider::{self, TraceProvider};
@@ -35,17 +31,12 @@ pub struct Scheduler {
termination_ch: Option<SyncSender<()>>,
/// The preferred trace provider for the system.
trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
- provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>,
}
impl Scheduler {
pub fn new() -> Result<Self> {
let p = trace_provider::get_trace_provider()?;
- Ok(Scheduler {
- termination_ch: None,
- trace_provider: p,
- provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())),
- })
+ Ok(Scheduler { termination_ch: None, trace_provider: p })
}
fn is_scheduled(&self) -> bool {
@@ -68,13 +59,11 @@ impl Scheduler {
Ok(_) => break,
Err(_) => {
// Did not receive a termination signal, initiate trace event.
- if check_space_limit(*TRACE_OUTPUT_DIR, &config).unwrap() {
- trace_provider.lock().unwrap().trace(
- &TRACE_OUTPUT_DIR,
- "periodic",
- &config.sampling_period,
- );
- }
+ trace_provider.lock().unwrap().trace(
+ &TRACE_OUTPUT_DIR,
+ "periodic",
+ &config.sampling_period,
+ );
}
}
}
@@ -94,86 +83,26 @@ impl Scheduler {
pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> {
let trace_provider = self.trace_provider.clone();
- if check_space_limit(*TRACE_OUTPUT_DIR, config)? {
- trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period);
- }
+ trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period);
Ok(())
}
- pub fn process(&self, config: &Config) -> Result<()> {
+ pub fn process(&self, blocking: bool) -> Result<()> {
let trace_provider = self.trace_provider.clone();
- trace_provider
- .lock()
- .unwrap()
- .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
- .context("Failed to process profiles.")?;
+ let handle = thread::spawn(move || {
+ trace_provider
+ .lock()
+ .unwrap()
+ .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR)
+ .expect("Failed to process profiles.");
+ });
+ if blocking {
+ handle.join().map_err(|_| anyhow!("Profile process thread panicked."))?;
+ }
Ok(())
}
pub fn get_trace_provider_name(&self) -> &'static str {
self.trace_provider.lock().unwrap().get_name()
}
-
- pub fn is_provider_ready(&self) -> bool {
- self.trace_provider.lock().unwrap().is_ready()
- }
-
- pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) {
- let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap();
- locked_callbacks.push(cb);
- if locked_callbacks.len() == 1 {
- self.start_thread_waiting_for_provider_ready();
- }
- }
-
- fn start_thread_waiting_for_provider_ready(&self) {
- let provider = self.trace_provider.clone();
- let callbacks = self.provider_ready_callbacks.clone();
-
- thread::spawn(move || {
- let start_time = Instant::now();
- loop {
- let elapsed = Instant::now().duration_since(start_time);
- if provider.lock().unwrap().is_ready() {
- break;
- }
- // Decide check period based on how long we have waited:
- // For the first 10s waiting, check every 100ms (likely to work on EVT devices).
- // For the first 10m waiting, check every 10s (likely to work on DVT devices).
- // For others, check every 10m.
- let sleep_duration = if elapsed < Duration::from_secs(10) {
- Duration::from_millis(100)
- } else if elapsed < Duration::from_secs(60 * 10) {
- Duration::from_secs(10)
- } else {
- Duration::from_secs(60 * 10)
- };
- thread::sleep(sleep_duration);
- }
-
- let mut locked_callbacks = callbacks.lock().unwrap();
- let v = mem::take(&mut *locked_callbacks);
- for cb in v {
- cb();
- }
- });
- }
-}
-
-/// Run if space usage is under limit.
-fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
- // Returns the size of a directory, non-recursive.
- let dir_size = |path| -> Result<u64> {
- fs::read_dir(path)?.try_fold(0, |acc, file| {
- let metadata = file?.metadata()?;
- let size = if metadata.is_file() { metadata.len() } else { 0 };
- Ok(acc + size)
- })
- };
-
- if dir_size(path)? > config.max_trace_limit {
- log::error!("trace storage exhausted.");
- return Ok(false);
- }
- Ok(true)
}