# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import tempfile from typing import List, Type, Union import numpy as np import pytest from numpy.random import default_rng from nemo.collections.asr.parts.preprocessing.segment import AudioSegment from nemo.collections.audio.data.data_simulation import ( ArrayGeometry, check_angle, convert_placement_to_range, convert_rir_to_multichannel, simulate_room_mix, wrap_to_180, ) class TestDataSimulationUtils: @pytest.mark.unit def test_check_angle(self): """Test angle checks.""" num_examples = 100 random = default_rng() assert check_angle('azimuth', random.uniform(low=-180, high=180, size=num_examples)) == True assert check_angle('elevation', random.uniform(low=-90, high=90, size=num_examples)) == True assert check_angle('yaw', random.uniform(low=-180, high=180, size=num_examples)) == True assert check_angle('pitch', random.uniform(low=-90, high=90, size=num_examples)) == True assert check_angle('roll', random.uniform(low=-180, high=180, size=num_examples)) == True with pytest.raises(ValueError): check_angle('azimuth', [-200, 200]) with pytest.raises(ValueError): check_angle('elevation', [-100, 100]) with pytest.raises(ValueError): check_angle('yaw', [-200, 200]) with pytest.raises(ValueError): check_angle('pitch', [-200, 200]) with pytest.raises(ValueError): check_angle('roll', [-200, 200]) @pytest.mark.unit def test_wrap_to_180(self): """Test wrap.""" test_cases = [] test_cases.append({'angle': 0, 'wrapped': 0}) test_cases.append({'angle': 45, 'wrapped': 45}) test_cases.append({'angle': -30, 'wrapped': -30}) test_cases.append({'angle': 179, 'wrapped': 179}) test_cases.append({'angle': -179, 'wrapped': -179}) test_cases.append({'angle': 181, 'wrapped': -179}) test_cases.append({'angle': -181, 'wrapped': 179}) test_cases.append({'angle': 270, 'wrapped': -90}) test_cases.append({'angle': -270, 'wrapped': 90}) test_cases.append({'angle': 359, 'wrapped': -1}) test_cases.append({'angle': 360, 'wrapped': 0}) for test_case in test_cases: assert wrap_to_180(test_case['angle']) == test_case['wrapped'] @pytest.mark.unit def test_placement_range(self): """Test placement range conversion.""" # Setup 1: test_cases = [] test_cases.append( { 'room_dim': [3, 4, 5], 'placement': {'x': None, 'y': None, 'height': None, 'min_to_wall': 0}, 'object_radius': 0, 'expected_range': np.array([[0, 3], [0, 4], [0, 5]]), } ) test_cases.append( { 'room_dim': [3, 4, 5], 'placement': {'x': None, 'y': None, 'height': None, 'min_to_wall': 0}, 'object_radius': 0.1, 'expected_range': np.array([[0.1, 2.9], [0.1, 3.9], [0.1, 4.9]]), } ) test_cases.append( { 'room_dim': [3, 4, 5], 'placement': {'x': None, 'y': None, 'height': None, 'min_to_wall': 0.5}, 'object_radius': 0.1, 'expected_range': np.array([[0.6, 2.4], [0.6, 3.4], [0.6, 4.4]]), } ) test_cases.append( { 'room_dim': [3, 4, 5], 'placement': {'x': [1, 3], 'y': [0.3, 3.0], 'height': [1.5, 1.8], 'min_to_wall': 0.5}, 'object_radius': 0.1, 'expected_range': np.array([[1, 2.4], [0.6, 3.0], [1.5, 1.8]]), } ) test_cases.append( { 'room_dim': [3, 4, 5], 'placement': {'x': 2, 'y': 3, 'height': [1.5, 1.8], 'min_to_wall': 0.5}, 'object_radius': 0.1, 'expected_range': np.array([[2, 2], [3, 3], [1.5, 1.8]]), } ) for test_case in test_cases: placement_range = convert_placement_to_range( test_case['placement'], test_case['room_dim'], test_case['object_radius'] ) assert np.all(placement_range == test_case['expected_range']) with pytest.raises(ValueError): # fail because of negative x convert_placement_to_range( **{ 'room_dim': [3, 4, 5], 'placement': {'x': -1, 'y': None, 'height': None, 'min_to_wall': 0}, 'object_radius': 0.1, } ) with pytest.raises(ValueError): # fail because of negative min_to_wall convert_placement_to_range( **{ 'room_dim': [3, 4, 5], 'placement': {'x': None, 'y': None, 'height': None, 'min_to_wall': -1}, 'object_radius': 0.1, } ) with pytest.raises(ValueError): # fail because height range doesn't have exactly two elements convert_placement_to_range( **{ 'room_dim': [3, 4, 5], 'placement': {'x': None, 'y': None, 'height': [1], 'min_to_wall': 0}, 'object_radius': 0.1, } ) with pytest.raises(ValueError): # fail because the room is too small for constraint convert_placement_to_range( **{ 'room_dim': [1, 2, 3], 'placement': {'x': None, 'y': None, 'height': None, 'min_to_wall': 1}, 'object_radius': 0.1, } ) @pytest.mark.unit @pytest.mark.parametrize("num_mics", [2, 4]) @pytest.mark.parametrize("num_sources", [1, 3]) def test_convert_rir_to_mc(self, num_mics: int, num_sources: int): """Test conversion of a RIR from list of lists to multichannel array.""" len_range = [50, 1000] random = default_rng() rir = [] rir_len = [] # Golden reference for n_mic in range(num_mics): this_rir = [] this_len = [] for n_source in range(num_sources): random_len = np.random.randint(low=len_range[0], high=len_range[1]) this_rir.append(np.random.rand(random_len)) this_len.append(random_len) rir.append(this_rir) rir_len.append(this_len) # UUT mc_rir = convert_rir_to_multichannel(rir) # Compare for n_source in range(num_sources): for n_mic in range(num_mics): # check RIR diff_len = rir_len[n_mic][n_source] diff = mc_rir[n_source][:diff_len, n_mic] - rir[n_mic][n_source] assert np.all(diff == 0.0), f'Original RIR not matching: source={n_source}, channel={n_mic}' # check padding pad = mc_rir[n_source][diff_len:, n_mic] assert np.all(pad == 0.0), f'Original RIR not matching: source={n_source}, channel={n_mic}' class TestArrayGeometry: @pytest.mark.unit @pytest.mark.parametrize('mic_spacing', [0.05]) @pytest.mark.parametrize("num_mics", [2, 4]) @pytest.mark.parametrize("axis", [0, 1, 2]) def test_array_geometry(self, mic_spacing: float, num_mics: int, axis: int): max_abs_tol = 1e-8 random = default_rng() # assume linear arrray along axis mic_positions = np.zeros((num_mics, 3)) mic_positions[:, axis] = mic_spacing * np.arange(num_mics) center = np.mean(mic_positions, axis=0) mic_positions_centered = mic_positions - center uut = ArrayGeometry(mic_positions) # test initialization assert np.max(np.abs(uut.center - center)) < max_abs_tol assert np.max(np.abs(uut.centered_positions - mic_positions_centered)) < max_abs_tol assert np.max(np.abs(uut.positions - mic_positions)) < max_abs_tol # test translation center = random.uniform(low=-10, high=-10, size=3) mic_positions = mic_positions_centered + center uut.translate(to=center) assert np.max(np.abs(uut.center - center)) < max_abs_tol assert np.max(np.abs(uut.centered_positions - mic_positions_centered)) < max_abs_tol assert np.max(np.abs(uut.positions - mic_positions)) < max_abs_tol # test rotation center = uut.center centered_positions = uut.centered_positions test_cases = [] test_cases.append( { 'orientation': {'yaw': 90}, 'new_positions': np.vstack( (-centered_positions[:, 1], centered_positions[:, 0], centered_positions[:, 2]) ).T, } ) test_cases.append( { 'orientation': {'pitch': 90}, 'new_positions': np.vstack( (centered_positions[:, 2], centered_positions[:, 1], -centered_positions[:, 0]) ).T, } ) test_cases.append( { 'orientation': {'roll': 90}, 'new_positions': np.vstack( (centered_positions[:, 0], -centered_positions[:, 2], centered_positions[:, 1]) ).T, } ) for test_case in test_cases: new_array = uut.new_rotated_array(**test_case['orientation']) assert np.max(np.abs(new_array.center - center)) < max_abs_tol assert np.max(np.abs(new_array.centered_positions - test_case['new_positions'])) < max_abs_tol # test radius assert np.max(np.abs(uut.radius - (num_mics - 1) / 2 * mic_spacing)) < max_abs_tol # test conversion to spherical # point on x axis point = np.array([1, 0, 0]) test_cases = [] test_cases.append({'center': 0, 'dist': np.linalg.norm(point - 0), 'azim': 0, 'elev': 0}) test_cases.append( { 'center': np.array([2, 0, 0]), 'dist': np.linalg.norm(point - np.array([2, 0, 0])), 'azim': -180, 'elev': 0, } ) test_cases.append( { 'center': np.array([1, 1, 1]), 'dist': np.linalg.norm(point - np.array([1, 1, 1])), 'azim': -90, 'elev': -45, } ) test_cases.append( { 'center': np.array([1, 2, -2]), 'dist': np.linalg.norm(point - np.array([1, 2, -2])), 'azim': -90, 'elev': 45, } ) for test_case in test_cases: uut.translate(to=test_case['center']) dist, azim, elev = uut.spherical_relative_to_array(point) assert abs(dist - test_case['dist']) < max_abs_tol assert abs(wrap_to_180(azim - test_case['azim'])) < max_abs_tol assert abs(elev - test_case['elev']) < max_abs_tol class TestRoomSimulation: max_diff_tol = 1e-5 @pytest.mark.unit def test_simulate_room_mix(self, test_data_dir): """Test room simulation for fixed parameters.""" # Test setup data_dir = os.path.join(test_data_dir, 'asr', 'data_simulation') # Minimal configuration sample_rate = 16000 target_cfg = { 'room_filepath': os.path.join(data_dir, 'test_room.h5'), 'mic_positions': np.random.rand(6, 3), # random positions 'selected_mics': [0, 1, 2, 3, 4, 5], 'source': 0, 'audio_filepath': os.path.join(data_dir, 'target.wav'), 'duration': 1.5, } interference_cfg = [{'source': 1, 'selected_mics': target_cfg['selected_mics']}] audio_metadata = { 'target': [{'audio_filepath': 'target.wav', 'duration': 1.5, 'offset': 0.8}], 'target_dir': data_dir, 'noise': [{'audio_filepath': 'noise.wav', 'duration': 2.3}], 'noise_dir': data_dir, 'interference': [ {'audio_filepath': 'interference_1.wav', 'duration': 0.8}, {'audio_filepath': 'interference_2.wav', 'duration': 0.75}, ], 'interference_dir': data_dir, } mix_cfg = {'rsnr': 10, 'rsir': 15, 'ref_mic': 0, 'ref_mic_rms': -30, 'min_duration': None, 'save': {}} with tempfile.TemporaryDirectory() as output_dir: # Mix base_output_filepath = os.path.join(output_dir, 'test_output') simulate_room_mix( sample_rate=sample_rate, target_cfg=target_cfg, interference_cfg=interference_cfg, mix_cfg=mix_cfg, audio_metadata=audio_metadata, base_output_filepath=base_output_filepath, ) # Check target + noise + interference = mix mix_from_parts = 0 for suffix in ['_target_reverberant.wav', '_noise.wav', '_interference.wav']: mix_from_parts += AudioSegment.from_file(base_output_filepath + suffix).samples mix_uut = AudioSegment.from_file(base_output_filepath + '_mic.wav') mix_uut_samples = mix_uut.samples # Compare UUT to sum of parts max_diff = np.max(np.abs(mix_uut_samples - mix_from_parts)) assert max_diff < self.max_diff_tol # Compare the UUT to golden reference golden_mix_filepath = os.path.join(data_dir, 'test_output_mic.wav') mix_golden = AudioSegment.from_file(base_output_filepath + '_mic.wav') assert mix_uut.num_samples == mix_golden.num_samples assert mix_uut.num_channels == mix_golden.num_channels assert mix_uut.sample_rate == mix_golden.sample_rate assert mix_uut.duration == mix_golden.duration max_diff = np.max(np.abs(mix_uut_samples - mix_golden.samples)) assert max_diff < self.max_diff_tol