summaryrefslogtreecommitdiff
path: root/apps/CameraITS/tests/sensor_fusion/test_video_stabilization.py
blob: 0be38b56d11991a8ad061b9e314090800056b2d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# Copyright 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Verify video is stable during phone movement."""

import logging
import math
import multiprocessing
import os
import time

from mobly import test_runner
import numpy as np

import its_base_test
import camera_properties_utils
import image_processing_utils
import its_session_utils
import sensor_fusion_utils
import video_processing_utils

_ARDUINO_ANGLES = (10, 25)  # degrees
_ARDUINO_MOVE_TIME = 0.30  # seconds
_ARDUINO_SERVO_SPEED = 10
_IMG_FORMAT = 'png'
_MIN_PHONE_MOVEMENT_ANGLE = 5  # degrees
_NAME = os.path.splitext(os.path.basename(__file__))[0]
_NUM_ROTATIONS = 12
_RADS_TO_DEGS = 180/math.pi
_SEC_TO_NSEC = 1E9
_START_FRAME = 30  # give 3A 1s to warm up
_VIDEO_DELAY_TIME = 5.5  # seconds
_VIDEO_DURATION = 5.5  # seconds
_VIDEO_QUALITIES_TESTED = ('CIF:3', '480P:4', '720P:5', '1080P:6', 'QVGA:7',
                           'VGA:9')
_VIDEO_STABILIZATION_FACTOR = 0.6  # 60% of gyro movement allowed
_VIDEO_STABILIZATION_MODE = 1


def _conv_acceleration_to_movement(gyro_events):
  """Convert gyro_events time and speed to movement during video time.

  Args:
    gyro_events: sorted dict of entries with 'time', 'x', 'y', and 'z'

  Returns:
    'z' acceleration converted to movement for times around VIDEO playing.
  """
  gyro_times = np.array([e['time'] for e in gyro_events])
  gyro_speed = np.array([e['z'] for e in gyro_events])
  gyro_time_min = gyro_times[0]
  logging.debug('gyro start time: %dns', gyro_time_min)
  logging.debug('gyro stop time: %dns', gyro_times[-1])
  gyro_rotations = []
  video_time_start = gyro_time_min + _VIDEO_DELAY_TIME*_SEC_TO_NSEC
  video_time_stop = video_time_start + _VIDEO_DURATION*_SEC_TO_NSEC
  logging.debug('video start time: %dns', video_time_start)
  logging.debug('video stop time: %dns', video_time_stop)

  for i, t in enumerate(gyro_times):
    if video_time_start <= t <= video_time_stop:
      gyro_rotations.append((gyro_times[i]-gyro_times[i-1])/_SEC_TO_NSEC *
                            gyro_speed[i])
  return np.array(gyro_rotations)


def _collect_data(cam, video_profile, video_quality, rot_rig):
  """Capture a new set of data from the device.

  Captures camera frames while the user is moving the device in the prescribed
  manner.

  Args:
    cam: camera object
    video_profile: str; number of video profile
    video_quality: str; key string for video quality. ie. 1080P
    rot_rig: dict with 'cntl' and 'ch' defined

  Returns:
    recording object
  """
  logging.debug('Starting sensor event collection')
  props = cam.get_camera_properties()
  props = cam.override_with_hidden_physical_camera_props(props)

  # Start camera vibration
  p = multiprocessing.Process(
      target=sensor_fusion_utils.rotation_rig,
      args=(rot_rig['cntl'], rot_rig['ch'], _NUM_ROTATIONS,
            _ARDUINO_ANGLES, _ARDUINO_SERVO_SPEED, _ARDUINO_MOVE_TIME,))
  p.start()

  cam.start_sensor_events()

  # Record video and return recording object
  time.sleep(_VIDEO_DELAY_TIME)  # allow time for rig to start moving
  recording_obj = cam.do_basic_recording(
      video_profile, video_quality, _VIDEO_DURATION, _VIDEO_STABILIZATION_MODE)
  logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
  logging.debug('Tested quality: %s', recording_obj['quality'])

  # Wait for vibration to stop
  p.join()

  return recording_obj


class VideoStabilizationTest(its_base_test.ItsBaseTest):
  """Tests if video is stabilized.

  Camera is moved in sensor fusion rig on an arc of 15 degrees.
  Speed is set to mimic hand movement (and not be too fast.)
  Video is captured after rotation rig starts moving, and the
  gyroscope data is dumped.

  Video is processed to dump all of the frames to PNG files.
  Camera movement is extracted from frames by determining max
  angle of deflection in video movement vs max angle of deflection
  in gyroscope movement. Test is a PASS if rotation is reduced in video.
  """

  def test_video_stability(self):
    rot_rig = {}
    log_path = self.log_path

    with its_session_utils.ItsSession(
        device_id=self.dut.serial,
        camera_id=self.camera_id,
        hidden_physical_id=self.hidden_physical_id) as cam:
      props = cam.get_camera_properties()
      props = cam.override_with_hidden_physical_camera_props(props)
      first_api_level = its_session_utils.get_first_api_level(self.dut.serial)
      supported_stabilization_modes = props[
          'android.control.availableVideoStabilizationModes']

      camera_properties_utils.skip_unless(
          first_api_level >= its_session_utils.ANDROID13_API_LEVEL and
          _VIDEO_STABILIZATION_MODE in supported_stabilization_modes)

      # Raise error if not FRONT or REAR facing camera
      facing = props['android.lens.facing']
      if (facing != camera_properties_utils.LENS_FACING_FRONT and
          facing != camera_properties_utils.LENS_FACING_BACK):
        raise AssertionError(f'Unknown lens facing: {facing}.')

      # Initialize rotation rig
      rot_rig['cntl'] = self.rotator_cntl
      rot_rig['ch'] = self.rotator_ch
      if rot_rig['cntl'].lower() != 'arduino':
        raise AssertionError(f'You must use the arduino controller for {_NAME}.')

      # Create list of video qualities to test
      supported_video_qualities = cam.get_supported_video_qualities(
          self.camera_id)
      logging.debug('Supported video qualities: %s', supported_video_qualities)
      tested_video_qualities = list(set(_VIDEO_QUALITIES_TESTED) &
                                    set(supported_video_qualities))

      # Raise error if no video qualities to test
      if not tested_video_qualities:
        raise AssertionError(
            f'QUALITY_LOW not supported: {supported_video_qualities}')
      else:
        logging.debug('video qualities tested: %s', str(tested_video_qualities))

      max_gyro_angles = []
      max_camera_angles = []
      for video_tested in tested_video_qualities:
        video_profile = video_tested.split(':')[1]
        video_quality = video_tested.split(':')[0]

        # Record video
        recording_obj = _collect_data(
            cam, video_profile, video_quality, rot_rig)

        # Grab the video from the save location on DUT
        self.dut.adb.pull([recording_obj['recordedOutputPath'], log_path])
        file_name = recording_obj['recordedOutputPath'].split('/')[-1]
        logging.debug('file_name: %s', file_name)

        # Get gyro events
        logging.debug('Reading out inertial sensor events')
        gyro_events = cam.get_sensor_events()['gyro']
        logging.debug('Number of gyro samples %d', len(gyro_events))

        # Extract all frames from video
        file_list = video_processing_utils.extract_all_frames_from_video(
            log_path, file_name, _IMG_FORMAT)
        frames = []
        logging.debug('Number of frames %d', len(file_list))
        for file in file_list:
          img = image_processing_utils.convert_image_to_numpy_array(
              os.path.join(log_path, file))
          frames.append(img/255)
        frame_shape = frames[0].shape
        logging.debug('Frame size %d x %d', frame_shape[1], frame_shape[0])

        # Extract camera rotations
        img_h = frames[0].shape[0]
        file_name_stem = f'{os.path.join(log_path, _NAME)}_{video_quality}'
        cam_rots = sensor_fusion_utils.get_cam_rotations(
            frames[_START_FRAME:len(frames)], facing, img_h,
            file_name_stem, _START_FRAME)
        sensor_fusion_utils.plot_camera_rotations(
            cam_rots, _START_FRAME, video_quality, file_name_stem)
        max_camera_angles.append(sensor_fusion_utils.calc_max_rotation_angle(
            cam_rots, 'Camera'))

        # Extract gyro rotations
        sensor_fusion_utils.plot_gyro_events(
            gyro_events, f'{_NAME}_{video_quality}', log_path)
        gyro_rots = _conv_acceleration_to_movement(gyro_events)
        max_gyro_angles.append(sensor_fusion_utils.calc_max_rotation_angle(
            gyro_rots, 'Gyro'))
        logging.debug(
            'Max deflection (degrees) %s: video: %.3f, gyro: %.3f, ratio: %.4f',
            video_quality, max_camera_angles[-1], max_gyro_angles[-1],
            max_camera_angles[-1] / max_gyro_angles[-1])

        # Assert phone is moved enough during test
        if max_gyro_angles[-1] < _MIN_PHONE_MOVEMENT_ANGLE:
          raise AssertionError(
              f'Phone not moved enough! Movement: {max_gyro_angles[-1]}, '
              f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees')

      # Assert PASS/FAIL criteria
      test_failures = []
      for i, max_camera_angle in enumerate(max_camera_angles):
        if max_camera_angle >= max_gyro_angles[i] * _VIDEO_STABILIZATION_FACTOR:
          test_failures.append(
              f'{tested_video_qualities[i]} video not stabilized enough! '
              f'Max video angle: {max_camera_angle:.3f}, '
              f'Max gyro angle: {max_gyro_angles[i]:.3f}, '
              f'ratio: {max_camera_angle}/{max_gyro_angles[-1]:.3f}, '
              f'THRESH: {_VIDEO_STABILIZATION_FACTOR}.')
      if test_failures:
        raise AssertionError(test_failures)


if __name__ == '__main__':
  test_runner.main()