aboutsummaryrefslogtreecommitdiff
path: root/client/cros/audio/audio_analysis_unittest.py
blob: 81472c1a24e4d3cbdcdc79844c148dda752f2a14 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#!/usr/bin/python3
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import numpy
import os
import unittest

import common
from autotest_lib.client.cros.audio import audio_analysis
from autotest_lib.client.cros.audio import audio_data
from six.moves import range

class SpectralAnalysisTest(unittest.TestCase):
    def setUp(self):
        """Uses the same seed to generate noise for each test."""
        numpy.random.seed(0)


    def stub_peak_detection(self, array, window_size):
        """Detects peaks in an array in simple way.

        A point (i, array[i]) is a peak if array[i] is the maximum among
        array[i - half_window_size] to array[i + half_window_size].
        If array[i - half_window_size] to array[i + half_window_size] are all
        equal, then there is no peak in this window.

        @param window_size: The window to detect peaks.

        @returns: A list of tuples:
                  [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
                   ...]
                  where the tuples are sorted by peak values.

        """
        half_window_size = window_size // 2
        length = len(array)

        def mid_is_peak(array, mid, left, right):
            """Checks if value at mid is the largest among left to right.

            @param array: A list of numbers.
            @param mid: The mid index.
            @param left: The left index.
            @param rigth: The right index.

            @returns: True if array[index] is the maximum among numbers in array
                      between index [left, right] inclusively.

            """
            value_mid = array[mid]
            for index in range(left, right + 1):
                if index == mid:
                    continue
                if array[index] >= value_mid:
                    return False
            return True

        results = []
        for mid in range(length):
            left = max(0, mid - half_window_size)
            right = min(length - 1, mid + half_window_size)
            if mid_is_peak(array, mid, left, right):
                results.append((mid, array[mid]))

        # Sort the peaks by values.
        return sorted(results, key=lambda x: x[1], reverse=True)


    def testPeakDetection(self):
        array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
        result = audio_analysis.peak_detection(array, 4)
        golden_answer = [(12, 5), (4, 4)]
        self.assertEqual(result, golden_answer)


    def testPeakDetectionLarge(self):
        array = numpy.random.uniform(0, 1, 1000000)
        window_size = 100
        logging.debug('Test large array using stub peak detection')
        stub_answer = self.stub_peak_detection(array, window_size)
        logging.debug('Test large array using improved peak detection')
        improved_answer = audio_analysis.peak_detection(array, window_size)
        logging.debug('Compare the result')
        self.assertEqual(stub_answer, improved_answer)


    def testSpectralAnalysis(self):
        rate = 48000
        length_in_secs = 0.5
        freq_1 = 490.0
        freq_2 = 60.0
        coeff_1 = 1
        coeff_2 = 0.3
        samples = int(length_in_secs * rate)
        noise = numpy.random.standard_normal(samples) * 0.005
        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
             coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
        results = audio_analysis.spectral_analysis(y, rate)
        # Results should contains
        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
        # frequency with coefficient 1, 60Hz is the second dominant frequency
        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
        # around 0.1. The k constant is resulted from window function.
        logging.debug('Results: %s', results)
        self.assertTrue(abs(results[0][0]-freq_1) < 1)
        self.assertTrue(abs(results[1][0]-freq_2) < 1)
        self.assertTrue(
                abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)


    def testSpectralAnalysisRealData(self):
        """This unittest checks the spectral analysis works on real data."""
        file_path = os.path.join(
                os.path.dirname(__file__), 'test_data', '1k_2k.raw')
        binary = open(file_path, 'rb').read()
        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
        saturate_value = audio_data.get_maximum_value_from_sample_format(
                'S32_LE')
        golden_frequency = [1000, 2000]
        for channel in [0, 1]:
            normalized_signal = audio_analysis.normalize_signal(
                    data.channel_data[channel],saturate_value)
            spectral = audio_analysis.spectral_analysis(
                    normalized_signal, 48000, 0.02)
            logging.debug('channel %s: %s', channel, spectral)
            self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
                            'Dominant frequency is not correct')


    def testNotMeaningfulData(self):
        """Checks that sepectral analysis handles un-meaningful data."""
        rate = 48000
        length_in_secs = 0.5
        samples = int(length_in_secs * rate)
        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
        noise = numpy.random.standard_normal(samples) * noise_amplitude
        results = audio_analysis.spectral_analysis(noise, rate)
        self.assertEqual([(0, 0)], results)


    def testEmptyData(self):
        """Checks that sepectral analysis rejects empty data."""
        with self.assertRaises(audio_analysis.EmptyDataError):
            results = audio_analysis.spectral_analysis([], 100)


class NormalizeTest(unittest.TestCase):
    def testNormalize(self):
        y = [1, 2, 3, 4, 5]
        normalized_y = audio_analysis.normalize_signal(y, 10)
        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
        for i in range(len(y)):
            self.assertEqual(expected[i], normalized_y[i])


class AnomalyTest(unittest.TestCase):
    def setUp(self):
        """Creates a test signal of sine wave."""
        # Use the same seed for each test case.
        numpy.random.seed(0)

        self.block_size = 120
        self.rate = 48000
        self.freq = 440
        length_in_secs = 0.25
        self.samples = int(length_in_secs * self.rate)
        x = numpy.linspace(
                0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)


    def add_noise(self):
        """Add noise to the test signal."""
        noise_amplitude = 0.3
        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
        self.y = self.y + noise


    def insert_anomaly(self):
        """Inserts an anomaly to the test signal.

        The anomaly self.anomaly_samples should be created before calling this
        method.

        """
        self.anomaly_start_secs = 0.1
        self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
                              self.anomaly_samples)


    def generate_skip_anomaly(self):
        """Skips a section of test signal."""
        self.anomaly_start_secs = 0.1
        self.anomaly_duration_secs = 0.005
        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
        anomaly_start_index = int(self.anomaly_start_secs * self.rate)
        anomaly_append_index = int(anomaly_append_secs * self.rate)
        self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])


    def create_constant_anomaly(self, amplitude):
        """Creates an anomaly of constant samples.

        @param amplitude: The amplitude of the constant samples.

        """
        self.anomaly_duration_secs = 0.005
        self.anomaly_samples = (
                [amplitude] * int(self.anomaly_duration_secs * self.rate))


    def run_analysis(self):
        """Runs the anomaly detection."""
        self.results = audio_analysis.anomaly_detection(
                self.y, self.rate, self.freq, self.block_size)
        logging.debug('Results: %s', self.results)


    def check_no_anomaly(self):
        """Verifies that there is no anomaly in detection result."""
        self.run_analysis()
        self.assertFalse(self.results)


    def check_anomaly(self):
        """Verifies that there is anomaly in detection result.

        The detection result should contain anomaly time stamps that are
        close to where anomaly was inserted. There can be multiple anomalies
        since the detection depends on the block size.

        """
        self.run_analysis()
        self.assertTrue(self.results)
        # Anomaly can be detected as long as the detection window of block size
        # overlaps with anomaly.
        expected_detected_range_secs = (
                self.anomaly_start_secs - float(self.block_size) / self.rate,
                self.anomaly_start_secs + self.anomaly_duration_secs)
        for detected_secs in self.results:
            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
            self.assertTrue(detected_secs >= expected_detected_range_secs[0] )


    def testGoodSignal(self):
        """Sine wave signal with no noise or anomaly."""
        self.check_no_anomaly()


    def testGoodSignalNoise(self):
        """Sine wave signal with noise."""
        self.add_noise()
        self.check_no_anomaly()


    def testZeroAnomaly(self):
        """Sine wave signal with no noise but with anomaly.

        This test case simulates underrun in digital data where there will be
        one block of samples with 0 amplitude.

        """
        self.create_constant_anomaly(0)
        self.insert_anomaly()
        self.check_anomaly()


    def testZeroAnomalyNoise(self):
        """Sine wave signal with noise and anomaly.

        This test case simulates underrun in analog data where there will be
        one block of samples with amplitudes close to 0.

        """
        self.create_constant_anomaly(0)
        self.insert_anomaly()
        self.add_noise()
        self.check_anomaly()


    def testLowConstantAnomaly(self):
        """Sine wave signal with low constant anomaly.

        The anomaly is one block of constant values.

        """
        self.create_constant_anomaly(0.05)
        self.insert_anomaly()
        self.check_anomaly()


    def testLowConstantAnomalyNoise(self):
        """Sine wave signal with low constant anomaly and noise.

        The anomaly is one block of constant values.

        """
        self.create_constant_anomaly(0.05)
        self.insert_anomaly()
        self.add_noise()
        self.check_anomaly()


    def testHighConstantAnomaly(self):
        """Sine wave signal with high constant anomaly.

        The anomaly is one block of constant values.

        """
        self.create_constant_anomaly(2)
        self.insert_anomaly()
        self.check_anomaly()


    def testHighConstantAnomalyNoise(self):
        """Sine wave signal with high constant anomaly and noise.

        The anomaly is one block of constant values.

        """
        self.create_constant_anomaly(2)
        self.insert_anomaly()
        self.add_noise()
        self.check_anomaly()


    def testSkippedAnomaly(self):
        """Sine wave signal with skipped anomaly.

        The anomaly simulates the symptom where a block is skipped.

        """
        self.generate_skip_anomaly()
        self.check_anomaly()


    def testSkippedAnomalyNoise(self):
        """Sine wave signal with skipped anomaly with noise.

        The anomaly simulates the symptom where a block is skipped.

        """
        self.generate_skip_anomaly()
        self.add_noise()
        self.check_anomaly()


    def testEmptyData(self):
        """Checks that anomaly detection rejects empty data."""
        self.y = []
        with self.assertRaises(audio_analysis.EmptyDataError):
            self.check_anomaly()


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    unittest.main()