aboutsummaryrefslogtreecommitdiff
path: root/utils/frozen_chromite/lib/ts_mon_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/frozen_chromite/lib/ts_mon_config.py')
-rw-r--r--utils/frozen_chromite/lib/ts_mon_config.py397
1 files changed, 397 insertions, 0 deletions
diff --git a/utils/frozen_chromite/lib/ts_mon_config.py b/utils/frozen_chromite/lib/ts_mon_config.py
new file mode 100644
index 0000000000..c9cc9a4fb8
--- /dev/null
+++ b/utils/frozen_chromite/lib/ts_mon_config.py
@@ -0,0 +1,397 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Wrapper for inframon's command-line flag based configuration."""
+
+from __future__ import print_function
+
+import argparse
+import contextlib
+import multiprocessing
+import os
+import socket
+import signal
+import time
+
+from six.moves import queue as Queue
+
+import six
+
+from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
+from autotest_lib.utils.frozen_chromite.lib import metrics
+from autotest_lib.utils.frozen_chromite.lib import parallel
+
+try:
+ from infra_libs.ts_mon import config
+ from infra_libs.ts_mon import BooleanField
+ from infra_libs.ts_mon import IntegerField
+ from infra_libs.ts_mon import StringField
+ import googleapiclient.discovery
+except (ImportError, RuntimeError) as e:
+ config = None
+ logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)
+
+
+_WasSetup = False
+_CommonMetricFields = {}
+
+FLUSH_INTERVAL = 60
+
+
+@contextlib.contextmanager
+def TrivialContextManager():
+ """Context manager with no side effects."""
+ yield
+
+
+def GetMetricFieldSpec(fields=None):
+ """Return the corresponding field_spec for metric fields.
+
+ Args:
+ fields: Dictionary containing metric fields.
+
+ Returns:
+ field_spec: List containing any *Field object associated with metric.
+ """
+ field_spec = []
+ if fields:
+ for key, val in fields.items():
+ if isinstance(val, bool):
+ field_spec.append(BooleanField(key))
+ elif isinstance(val, int):
+ field_spec.append(IntegerField(key))
+ elif isinstance(val, six.string_types):
+ field_spec.append(StringField(key))
+ else:
+ logging.error("Couldn't classify the metric field %s:%s",
+ key, val)
+
+ return field_spec
+
+def AddCommonFields(fields=None, field_spec=None):
+ """Add cbuildbot-wide common fields to a given field set.
+
+ Args:
+ fields: Dictionary containing metric fields to which common metric fields
+ will be added.
+ field_spec: List containing any *Field object associated with metric.
+
+ Returns:
+ Dictionary containing complete set of metric fields to be applied to
+ metric and a list of corresponding field_spec.
+ """
+ metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields
+ else {})
+
+ if metric_fields:
+ metric_fields.update(fields or {})
+ return metric_fields, GetMetricFieldSpec(metric_fields)
+ else:
+ return fields, field_spec
+
+
+def SetupTsMonGlobalState(service_name,
+ indirect=False,
+ suppress_exception=True,
+ short_lived=False,
+ auto_flush=True,
+ common_metric_fields=None,
+ debug_file=None,
+ task_num=0):
+ """Uses a dummy argument parser to get the default behavior from ts-mon.
+
+ Args:
+ service_name: The name of the task we are sending metrics from.
+ indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
+ process for indirect metrics flushing. Useful for forking,
+ because forking would normally create a duplicate ts_mon thread.
+ suppress_exception: True to silence any exception during the setup. Default
+ is set to True.
+ short_lived: Whether this process is short-lived and should use the autogen
+ hostname prefix.
+ auto_flush: Whether to create a thread to automatically flush metrics every
+ minute.
+ common_metric_fields: Dictionary containing the metric fields that will be
+ added to all metrics.
+ debug_file: If non-none, send metrics to this path instead of to PubSub.
+ task_num: (Default 0) The task_num target field of the metrics to emit.
+ """
+ if not config:
+ return TrivialContextManager()
+
+ # The flushing subprocess calls .flush manually.
+ if indirect:
+ auto_flush = False
+
+ if common_metric_fields:
+ _CommonMetricFields.update(common_metric_fields)
+
+ # google-api-client has too much noisey logging.
+ options = _GenerateTsMonArgparseOptions(
+ service_name, short_lived, auto_flush, debug_file, task_num)
+
+ if indirect:
+ return _CreateTsMonFlushingProcess(options)
+ else:
+ _SetupTsMonFromOptions(options, suppress_exception)
+ return TrivialContextManager()
+
+
+def _SetupTsMonFromOptions(options, suppress_exception):
+ """Sets up ts-mon global state given parsed argparse options.
+
+ Args:
+ options: An argparse options object containing ts-mon flags.
+ suppress_exception: True to silence any exception during the setup. Default
+ is set to True.
+ """
+ googleapiclient.discovery.logger.setLevel(logging.WARNING)
+ try:
+ config.process_argparse_options(options)
+ logging.notice('ts_mon was set up.')
+ global _WasSetup # pylint: disable=global-statement
+ _WasSetup = True
+ except Exception as e:
+ logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
+ exc_info=True)
+ if not suppress_exception:
+ raise
+
+
+def _GenerateTsMonArgparseOptions(service_name, short_lived,
+ auto_flush, debug_file, task_num):
+ """Generates an arg list for ts-mon to consume.
+
+ Args:
+ service_name: The name of the task we are sending metrics from.
+ short_lived: Whether this process is short-lived and should use the autogen
+ hostname prefix.
+ auto_flush: Whether to create a thread to automatically flush metrics every
+ minute.
+ debug_file: If non-none, send metrics to this path instead of to PubSub.
+ task_num: Override the default task num of 0.
+ """
+ parser = argparse.ArgumentParser()
+ config.add_argparse_options(parser)
+
+ args = [
+ '--ts-mon-target-type', 'task',
+ '--ts-mon-task-service-name', service_name,
+ '--ts-mon-task-job-name', service_name,
+ ]
+
+ if debug_file:
+ args.extend(['--ts-mon-endpoint', 'file://' + debug_file])
+
+ # Short lived processes will have autogen: prepended to their hostname and
+ # use task-number=PID to trigger shorter retention policies under
+ # chrome-infra@, and used by a Monarch precomputation to group across the
+ # task number.
+ # Furthermore, we assume they manually call ts_mon.Flush(), because the
+ # ts_mon thread will drop messages if the process exits before it flushes.
+ if short_lived:
+ auto_flush = False
+ fqdn = socket.getfqdn().lower()
+ host = fqdn.split('.')[0]
+ args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
+ '--ts-mon-task-number', str(os.getpid())])
+ elif task_num:
+ args.extend(['--ts-mon-task-number', str(task_num)])
+
+ args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
+ return parser.parse_args(args=args)
+
+
+@contextlib.contextmanager
+def _CreateTsMonFlushingProcess(options):
+ """Creates a separate process to flush ts_mon metrics.
+
+ Useful for multiprocessing scenarios where we don't want multiple ts-mon
+ threads send contradictory metrics. Instead, functions in
+ chromite.lib.metrics will send their calls to a Queue, which is consumed by a
+ dedicated flushing process.
+
+ Args:
+ options: An argparse options object to configure ts-mon with.
+
+ Side effects:
+ Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
+ to send their calls to the Queue instead of creating the metrics.
+ """
+ # If this is nested, we don't need to create another queue and another
+ # message consumer. Do nothing to continue to use the existing queue.
+ if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS:
+ return
+
+ with parallel.Manager() as manager:
+ message_q = manager.Queue()
+
+ metrics.FLUSHING_PROCESS = multiprocessing.Process(
+ target=lambda: _SetupAndConsumeMessages(message_q, options))
+ metrics.FLUSHING_PROCESS.start()
+
+ # this makes the chromite.lib.metric functions use the queue.
+ # note - we have to do this *after* forking the ConsumeMessages process.
+ metrics.MESSAGE_QUEUE = message_q
+
+ try:
+ yield message_q
+ finally:
+ _CleanupMetricsFlushingProcess()
+
+
+def _CleanupMetricsFlushingProcess():
+ """Sends sentinal value to flushing process and .joins it."""
+ # Now that there is no longer a process to listen to the Queue, re-set it
+ # to None so that any future metrics are created within this process.
+ message_q = metrics.MESSAGE_QUEUE
+ flushing_process = metrics.FLUSHING_PROCESS
+ metrics.MESSAGE_QUEUE = None
+ metrics.FLUSHING_PROCESS = None
+
+ # If the process has already died, we don't need to try to clean it up.
+ if not flushing_process.is_alive():
+ return
+
+ # Send the sentinal value for "flush one more time and exit".
+ try:
+ message_q.put(None)
+ # If the flushing process quits, the message Queue can become full.
+ except IOError:
+ if not flushing_process.is_alive():
+ return
+
+ logging.info('Waiting for ts_mon flushing process to finish...')
+ flushing_process.join(timeout=FLUSH_INTERVAL*2)
+ if flushing_process.is_alive():
+ flushing_process.terminate()
+ if flushing_process.exitcode:
+ logging.warning('ts_mon_config flushing process did not exit cleanly.')
+ logging.info('Finished waiting for ts_mon process.')
+
+
+def _SetupAndConsumeMessages(message_q, options):
+ """Sets up ts-mon, and starts a MetricConsumer loop.
+
+ Args:
+ message_q: The metric multiprocessing.Queue to read from.
+ options: An argparse options object to configure ts-mon with.
+ """
+ # Configure ts-mon, but don't start up a sending thread.
+ _SetupTsMonFromOptions(options, suppress_exception=True)
+ if not _WasSetup:
+ return
+
+ return MetricConsumer(message_q).Consume()
+
+
+class MetricConsumer(object):
+ """Configures ts_mon and gets metrics from a message queue.
+
+ This class is meant to be used in a subprocess. It configures itself
+ to receive a SIGHUP signal when the parent process dies, and catches the
+ signal in order to have a chance to flush any pending metrics one more time
+ before quitting.
+ """
+ def __init__(self, message_q):
+ # If our parent dies, finish flushing before exiting.
+ self.reset_after_flush = []
+ self.last_flush = 0
+ self.pending = False
+ self.message_q = message_q
+
+ if parallel.ExitWithParent(signal.SIGHUP):
+ signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush())
+
+
+ def Consume(self):
+ """Emits metrics from self.message_q, flushing periodically.
+
+ The loop is terminated by a None entry on the Queue, which is a friendly
+ signal from the parent process that it's time to shut down. Before
+ returning, we wait to flush one more time to make sure that all the
+ metrics were sent.
+ """
+ message = self.message_q.get()
+ while message:
+ self._CallMetric(message)
+ message = self._WaitForNextMessage()
+
+ if self.pending:
+ self._WaitToFlush()
+
+
+ def _CallMetric(self, message):
+ """Calls the metric method from |message|, ignoring exceptions."""
+ try:
+ cls = getattr(metrics, message.metric_name)
+ message.method_kwargs.setdefault('fields', {})
+ message.metric_kwargs.setdefault('field_spec', [])
+ message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = (
+ AddCommonFields(message.method_kwargs['fields'],
+ message.metric_kwargs['field_spec']))
+ metric = cls(*message.metric_args, **message.metric_kwargs)
+ if message.reset_after:
+ self.reset_after_flush.append(metric)
+ getattr(metric, message.method)(
+ *message.method_args,
+ **message.method_kwargs)
+ self.pending = True
+ except Exception:
+ logging.exception('Caught an exception while running %s',
+ _MethodCallRepr(message))
+
+
+ def _WaitForNextMessage(self):
+ """Waits for a new message, flushing every |FLUSH_INTERVAL| seconds."""
+ while True:
+ time_delta = self._FlushIfReady()
+ try:
+ timeout = FLUSH_INTERVAL - time_delta
+ message = self.message_q.get(timeout=timeout)
+ return message
+ except Queue.Empty:
+ pass
+
+
+ def _WaitToFlush(self):
+ """Sleeps until the next time we can call metrics.Flush(), then flushes."""
+ time_delta = time.time() - self.last_flush
+ time.sleep(max(0, FLUSH_INTERVAL - time_delta))
+ metrics.Flush(reset_after=self.reset_after_flush)
+
+
+ def _FlushIfReady(self):
+ """Call metrics.Flush() if we are ready and have pending metrics.
+
+ This allows us to only call flush every FLUSH_INTERVAL seconds.
+ """
+ now = time.time()
+ time_delta = now - self.last_flush
+ if time_delta > FLUSH_INTERVAL:
+ self.last_flush = now
+ time_delta = 0
+ metrics.Flush(reset_after=self.reset_after_flush)
+ self.pending = False
+ return time_delta
+
+
+def _MethodCallRepr(message):
+ """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)
+
+ Args:
+ message: A MetricCall object.
+ """
+ if not message:
+ return repr(message)
+ obj = message.metric_name
+ method = message.method
+ args = message.method_args
+ kwargs = message.method_kwargs
+
+ args_strings = ([repr(x) for x in args] +
+ [(str(k) + '=' + repr(v))
+ for k, v in kwargs.items()])
+ return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))