diff options
Diffstat (limited to 'profcollectd/libprofcollectd/scheduler.rs')
-rw-r--r-- | profcollectd/libprofcollectd/scheduler.rs | 107 |
1 files changed, 18 insertions, 89 deletions
diff --git a/profcollectd/libprofcollectd/scheduler.rs b/profcollectd/libprofcollectd/scheduler.rs index f58c4995..cfd0381f 100644 --- a/profcollectd/libprofcollectd/scheduler.rs +++ b/profcollectd/libprofcollectd/scheduler.rs @@ -16,14 +16,10 @@ //! ProfCollect tracing scheduler. -use std::fs; -use std::mem; -use std::path::Path; use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::Arc; use std::sync::Mutex; use std::thread; -use std::time::{Duration, Instant}; use crate::config::{Config, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR}; use crate::trace_provider::{self, TraceProvider}; @@ -35,17 +31,12 @@ pub struct Scheduler { termination_ch: Option<SyncSender<()>>, /// The preferred trace provider for the system. trace_provider: Arc<Mutex<dyn TraceProvider + Send>>, - provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>, } impl Scheduler { pub fn new() -> Result<Self> { let p = trace_provider::get_trace_provider()?; - Ok(Scheduler { - termination_ch: None, - trace_provider: p, - provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())), - }) + Ok(Scheduler { termination_ch: None, trace_provider: p }) } fn is_scheduled(&self) -> bool { @@ -68,13 +59,11 @@ impl Scheduler { Ok(_) => break, Err(_) => { // Did not receive a termination signal, initiate trace event. - if check_space_limit(*TRACE_OUTPUT_DIR, &config).unwrap() { - trace_provider.lock().unwrap().trace( - &TRACE_OUTPUT_DIR, - "periodic", - &config.sampling_period, - ); - } + trace_provider.lock().unwrap().trace( + &TRACE_OUTPUT_DIR, + "periodic", + &config.sampling_period, + ); } } } @@ -94,86 +83,26 @@ impl Scheduler { pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> { let trace_provider = self.trace_provider.clone(); - if check_space_limit(*TRACE_OUTPUT_DIR, config)? { - trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period); - } + trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period); Ok(()) } - pub fn process(&self, config: &Config) -> Result<()> { + pub fn process(&self, blocking: bool) -> Result<()> { let trace_provider = self.trace_provider.clone(); - trace_provider - .lock() - .unwrap() - .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter) - .context("Failed to process profiles.")?; + let handle = thread::spawn(move || { + trace_provider + .lock() + .unwrap() + .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR) + .expect("Failed to process profiles."); + }); + if blocking { + handle.join().map_err(|_| anyhow!("Profile process thread panicked."))?; + } Ok(()) } pub fn get_trace_provider_name(&self) -> &'static str { self.trace_provider.lock().unwrap().get_name() } - - pub fn is_provider_ready(&self) -> bool { - self.trace_provider.lock().unwrap().is_ready() - } - - pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) { - let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap(); - locked_callbacks.push(cb); - if locked_callbacks.len() == 1 { - self.start_thread_waiting_for_provider_ready(); - } - } - - fn start_thread_waiting_for_provider_ready(&self) { - let provider = self.trace_provider.clone(); - let callbacks = self.provider_ready_callbacks.clone(); - - thread::spawn(move || { - let start_time = Instant::now(); - loop { - let elapsed = Instant::now().duration_since(start_time); - if provider.lock().unwrap().is_ready() { - break; - } - // Decide check period based on how long we have waited: - // For the first 10s waiting, check every 100ms (likely to work on EVT devices). - // For the first 10m waiting, check every 10s (likely to work on DVT devices). - // For others, check every 10m. - let sleep_duration = if elapsed < Duration::from_secs(10) { - Duration::from_millis(100) - } else if elapsed < Duration::from_secs(60 * 10) { - Duration::from_secs(10) - } else { - Duration::from_secs(60 * 10) - }; - thread::sleep(sleep_duration); - } - - let mut locked_callbacks = callbacks.lock().unwrap(); - let v = mem::take(&mut *locked_callbacks); - for cb in v { - cb(); - } - }); - } -} - -/// Run if space usage is under limit. -fn check_space_limit(path: &Path, config: &Config) -> Result<bool> { - // Returns the size of a directory, non-recursive. - let dir_size = |path| -> Result<u64> { - fs::read_dir(path)?.try_fold(0, |acc, file| { - let metadata = file?.metadata()?; - let size = if metadata.is_file() { metadata.len() } else { 0 }; - Ok(acc + size) - }) - }; - - if dir_size(path)? > config.max_trace_limit { - log::error!("trace storage exhausted."); - return Ok(false); - } - Ok(true) } |