MagpieTTS_Internal_Demo / tests /collections /common /test_lhotse_dataloading.py
subhankarg's picture
Upload folder using huggingface_hub
0558aa4 verified
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Counter
from io import BytesIO
from itertools import islice
from pathlib import Path
from typing import Dict, List, Tuple
import lhotse
import numpy as np
import pytest
import torch
from lhotse import CutSet, MonoCut, NumpyFilesWriter, Recording, compute_num_samples
from lhotse.audio import AudioLoadingError
from lhotse.cut import Cut, MixedCut, PaddingCut
from lhotse.dataset import RoundRobinSampler, ZipSampler
from lhotse.shar import JsonlShardWriter
from lhotse.testing.dummies import dummy_recording
from lhotse.testing.random import deterministic_rng
from omegaconf import OmegaConf
from nemo.collections.common.data.lhotse import get_lhotse_dataloader_from_config
from nemo.collections.common.data.lhotse.text_adapters import SourceTargetTextExample, TextExample
from nemo.collections.common.tokenizers.sentencepiece_tokenizer import SentencePieceTokenizer, create_spt_model
@pytest.fixture(scope="session")
def cutset_path(tmp_path_factory) -> Path:
"""10 utterances of length 1s as a Lhotse CutSet."""
from lhotse import CutSet
from lhotse.testing.dummies import DummyManifest
cuts = DummyManifest(CutSet, begin_id=0, end_id=10, with_data=True)
for c in cuts:
c.features = None
c.custom = None
c.supervisions[0].custom = None
tmp_path = tmp_path_factory.mktemp("data")
p = tmp_path / "cuts.jsonl.gz"
pa = tmp_path / "audio"
cuts.save_audios(pa).to_file(p)
return p
@pytest.fixture(scope="session")
def cutset_shar_path(cutset_path: Path) -> Path:
"""10 utterances of length 1s as a Lhotse Shar (tarred) CutSet."""
from lhotse import CutSet
cuts = CutSet.from_file(cutset_path)
p = cutset_path.parent / "shar"
p.mkdir(exist_ok=True)
cuts.to_shar(p, fields={"recording": "wav"}, shard_size=5)
return p
@pytest.fixture(scope="session")
def cutset_shar_path_other(cutset_path: Path) -> Path:
"""10 utterances of length 1s as a Lhotse Shar (tarred) CutSet, but with different IDs."""
from lhotse import CutSet
cuts = CutSet.from_file(cutset_path).modify_ids(lambda id: f"other-{id}")
p = cutset_path.parent / "shar-other"
p.mkdir(exist_ok=True)
cuts.to_shar(p, fields={"recording": "wav"}, shard_size=5)
return p
@pytest.fixture(scope="session")
def nemo_manifest_path(cutset_path: Path):
"""10 utterances of length 1s as a NeMo manifest."""
from lhotse import CutSet
from lhotse.serialization import save_to_jsonl
nemo = []
for c in CutSet.from_file(cutset_path):
nemo.append(
{
"audio_filepath": c.recording.sources[0].source,
"text": "irrelevant",
"text-other": "not relevant",
"duration": c.duration,
"my-custom-field": "irrelevant",
"lang": "en",
"custom-lang": "pl",
}
)
p = cutset_path.parent / "nemo_manifest.json"
save_to_jsonl(nemo, p)
return p
@pytest.fixture(scope="session")
def nemo_manifest_with_skipme_path(nemo_manifest_path: Path) -> Path:
"""Create a nemo manifest with last 2 utterances out of 10 with `_skipme` key enabled"""
from lhotse.serialization import load_jsonl, save_to_jsonl
all_items = list(load_jsonl(nemo_manifest_path))
for item in all_items[-2:]:
item['_skipme'] = True
p = nemo_manifest_path.parent / "nemo_manifest_with_skipme.json"
save_to_jsonl(all_items, p)
return p
@pytest.fixture(scope="session")
def mc_cutset_path(tmp_path_factory) -> Path:
"""10 two-channel utterances of length 1s as a Lhotse CutSet."""
from lhotse import CutSet, MultiCut
from lhotse.testing.dummies import DummyManifest
num_examples = 10 # number of examples
num_channels = 2 # number of channels per example
# create a dummy manifest with single-channel examples
sc_cuts = DummyManifest(CutSet, begin_id=0, end_id=num_examples * num_channels, with_data=True)
mc_cuts = []
for n in range(num_examples):
# sources for individual channels
mc_sources = []
for channel in range(num_channels):
source = sc_cuts[n * num_channels + channel].recording.sources[0]
source.channels = [channel]
mc_sources.append(source)
# merge recordings
rec = Recording(
sources=mc_sources,
id=f'mc-dummy-recording-{n:02d}',
num_samples=sc_cuts[0].num_samples,
duration=sc_cuts[0].duration,
sampling_rate=sc_cuts[0].sampling_rate,
)
# multi-channel cut
cut = MultiCut(
recording=rec, id=f'mc-dummy-cut-{n:02d}', start=0, duration=1.0, channel=list(range(num_channels))
)
mc_cuts.append(cut)
mc_cuts = CutSet.from_cuts(mc_cuts)
tmp_path = tmp_path_factory.mktemp("data")
p = tmp_path / "mc_cuts.jsonl.gz"
pa = tmp_path / "mc_audio"
mc_cuts.save_audios(pa).to_file(p)
return p
@pytest.fixture(scope="session")
def nemo_tarred_manifest_path(nemo_manifest_path: Path) -> Tuple[str, str]:
"""10 utterances of length 1s as a NeMo tarred manifest."""
from lhotse.serialization import SequentialJsonlWriter, load_jsonl
from lhotse.shar.writers import TarWriter
root = nemo_manifest_path.parent / "nemo_tar"
root.mkdir(exist_ok=True)
with (
TarWriter(f"{root}/audios_%01d.tar", shard_size=5) as tar_writer,
SequentialJsonlWriter(root / "tarred_audio_filepaths.jsonl") as mft_writer,
):
for idx, d in enumerate(load_jsonl(nemo_manifest_path)):
p = d["audio_filepath"]
name = Path(p).name
with open(p, "rb") as f:
tar_writer.write(name, BytesIO(f.read()))
mft_writer.write({**d, "audio_filepath": name, "shard_id": int(idx > 4)})
return mft_writer.path, f"{root}/audios__OP_0..1_CL_.tar"
@pytest.fixture(scope="session")
def nemo_tarred_manifest_with_skipme_path(nemo_tarred_manifest_path: Path) -> Tuple[str, str]:
"""Create a nemo tarred manifest with last 2 utterances out of 10 with `_skipme` key enabled."""
from lhotse.serialization import load_jsonl, save_to_jsonl
json_p, tar_p = nemo_tarred_manifest_path
all_items = list(load_jsonl(json_p))
for item in all_items[-2:]:
item['_skipme'] = True
p = json_p.parent / "tarred_audio_filepaths_with_skipme.jsonl"
save_to_jsonl(all_items, p)
return p, tar_p
@pytest.fixture(scope="session")
def nemo_tarred_manifest_path_multi(nemo_tarred_manifest_path: tuple[str, str]) -> Tuple[str, str]:
"""10 utterances of length 1s as a NeMo tarred manifest. Stored in one manifest per shard."""
from lhotse.serialization import load_jsonl
from lhotse.shar.writers import JsonlShardWriter
json_p, tar_p = nemo_tarred_manifest_path
json_dir = json_p.parent / "shard_manifests"
json_dir.mkdir(exist_ok=True)
with JsonlShardWriter(f"{json_dir}/manifest_%d.jsonl", shard_size=5) as mft_writer:
for item in load_jsonl(json_p):
mft_writer.write(item)
return f"{json_dir}/manifest__OP_0..1_CL_.jsonl", tar_p
@pytest.fixture(scope="session")
def nemo_tarred_manifest_subset_path(nemo_tarred_manifest_path: Tuple[str, str]) -> Tuple[str, str]:
"""Create a shard manifests with randomly chosen 50% percent of tarred contents."""
from lhotse.serialization import load_jsonl
from lhotse.shar.writers import JsonlShardWriter
json_p, tar_p = nemo_tarred_manifest_path
json_dir = json_p.parent / "shard_manifests_subset"
json_dir.mkdir(exist_ok=True)
all_items = list(load_jsonl(json_p))
tarr_0_data = all_items[:5]
tarr_1_data = all_items[5:]
subset_items = tarr_0_data[-3:] + tarr_1_data[-3:]
with JsonlShardWriter(f"{json_dir}/manifest_%d.jsonl", shard_size=3) as mft_writer:
for item in subset_items:
mft_writer.write(item)
return f"{json_dir}/manifest__OP_0..1_CL_.jsonl", tar_p, subset_items
class UnsupervisedAudioDataset(torch.utils.data.Dataset):
def __getitem__(self, cuts: lhotse.CutSet) -> Dict[str, torch.Tensor]:
audio, audio_lens = lhotse.dataset.collation.collate_audio(cuts)
return {"audio": audio, "audio_lens": audio_lens, "ids": [c.id for c in cuts]}
def test_dataloader_from_lhotse_cuts(cutset_path: Path):
config = OmegaConf.create(
{
"cuts_path": cutset_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [batch for batch in dl]
assert len(batches) == 4
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 1
def test_dataloader_from_lhotse_cuts_truncate(cutset_path: Path):
config = OmegaConf.create(
{
"cuts_path": cutset_path,
"truncate_duration": 0.5,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [b for b in dl]
assert len(batches) == 3
# 0.5s = 8000 samples, note the constant duration and batch size except for last batch
assert batches[0]["audio"].shape == (4, 8000)
assert batches[1]["audio"].shape == (4, 8000)
assert batches[2]["audio"].shape == (2, 8000)
# exactly 10 cuts were used
def test_dataloader_from_lhotse_cuts_cut_into_windows(cutset_path: Path):
config = OmegaConf.create(
{
"cuts_path": cutset_path,
"cut_into_windows_duration": 0.5,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [b for b in dl]
assert len(batches) == 5
# 0.5s = 8000 samples, note the constant duration and batch size
assert batches[0]["audio"].shape == (4, 8000)
assert batches[1]["audio"].shape == (4, 8000)
assert batches[2]["audio"].shape == (4, 8000)
assert batches[3]["audio"].shape == (4, 8000)
assert batches[4]["audio"].shape == (4, 8000)
# exactly 20 cuts were used because we cut 10x 1s cuts into 20x 0.5s cuts
def test_dataloader_from_lhotse_cuts_pad_min_duration(cutset_path: Path):
config = OmegaConf.create(
{
"cuts_path": cutset_path,
"pad_min_duration": 21.0,
"pad_direction": "left",
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 1,
"seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batch = next(iter(dl))
(cut,) = batch
assert cut.duration == 21.0
assert isinstance(cut, MixedCut)
assert len(cut.tracks) == 2
assert isinstance(cut.tracks[0].cut, PaddingCut)
assert isinstance(cut.tracks[1].cut, MonoCut)
def test_dataloader_from_lhotse_cuts_channel_selector(mc_cutset_path: Path):
# Dataloader without channel selector
config = OmegaConf.create(
{
"cuts_path": mc_cutset_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [b for b in dl]
assert len(batches) == 3
# 1.0s = 16000 samples, two channels, note the constant duration and batch size
assert batches[0]["audio"].shape == (4, 2, 16000)
assert batches[1]["audio"].shape == (4, 2, 16000)
assert batches[2]["audio"].shape == (2, 2, 16000)
# exactly 10 cuts were used
# Apply channel selector
for channel_selector in [None, 0, 1]:
config_cs = OmegaConf.create(
{
"cuts_path": mc_cutset_path,
"channel_selector": channel_selector,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
}
)
dl_cs = get_lhotse_dataloader_from_config(
config=config_cs, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
for n, b_cs in enumerate(dl_cs):
if channel_selector is None:
# no channel selector, needs to match the original dataset
assert torch.equal(b_cs["audio"], batches[n]["audio"])
else:
# channel selector, needs to match the selected channel
assert torch.equal(b_cs["audio"], batches[n]["audio"][:, channel_selector, :])
def test_dataloader_from_lhotse_shar_cuts(cutset_shar_path: Path):
config = OmegaConf.create(
{
"shar_path": cutset_shar_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
# Note: we use islice here because with Lhotse Shar the dataloader will always be infinite.
batches = [batch for batch in islice(dl, 4)]
assert len(batches) == 4
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
def test_dataloader_from_lhotse_shar_cuts_via_fields(cutset_shar_path: Path):
config = OmegaConf.create(
{
"shar_path": {
"cuts": f"{cutset_shar_path}/cuts._OP_000000..000001_CL_.jsonl.gz",
"recording": f"{cutset_shar_path}/recording._OP_000000..000001_CL_.tar",
},
"sample_rate": 16000,
"num_workers": 0,
"shuffle": False,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batch = next(iter(dl))
assert len(batch) == 4
audio = batch[0].load_audio()
assert isinstance(audio, np.ndarray)
def test_dataloader_from_lhotse_shar_cuts_add_new_field(tmp_path_factory, cutset_shar_path: Path):
# We're creating a new field called "wer" that will be dynamically attached to Lhotse Shar cuts.
# Each "wer" shard is a jsonl manifest that has to match the "cuts" sharded manifest.
# It must have a "cut_id" field used for runtime check that the user provided correct paths.
# "wer" will be attached to each cut under `cut.wer` / cut.custom["wer"].
wer_dir = tmp_path_factory.mktemp("wer_dir")
with JsonlShardWriter(f"{wer_dir}/wer.%06d.jsonl.gz", shard_size=5) as writer:
for i in range(10):
writer.write({"cut_id": "dummy-mono-cut-%04d" % i, "wer": 0.5})
config = OmegaConf.create(
{
"shar_path": {
"cuts": f"{cutset_shar_path}/cuts._OP_000000..000001_CL_.jsonl.gz",
"recording": f"{cutset_shar_path}/recording._OP_000000..000001_CL_.tar",
"wer": f"{wer_dir}/wer._OP_000000..000001_CL_.jsonl.gz",
},
"sample_rate": 16000,
"num_workers": 0,
"shuffle": False,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batch = next(iter(dl))
assert len(batch) == 4
assert batch[0].wer == 0.5
def test_dataloader_from_nemo_manifest(nemo_manifest_path: Path):
config = OmegaConf.create(
{
"manifest_filepath": nemo_manifest_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [batch for batch in dl]
assert len(batches) == 4
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 1
class _Identity:
def __getitem__(self, cuts):
return cuts
def test_dataloader_from_nemo_manifest_has_custom_fields(nemo_manifest_path: Path):
config = OmegaConf.create(
{
"manifest_filepath": nemo_manifest_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": False,
"batch_duration": 4.0, # seconds
"shuffle_buffer_size": 10,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=_Identity())
batch = next(iter(dl))
for cut in batch:
assert isinstance(cut.custom, dict)
assert "my-custom-field" in cut.custom
def test_dataloader_from_tarred_nemo_manifest(nemo_tarred_manifest_path: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [batch for batch in islice(dl, 4)]
assert len(batches) == 4
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
def test_dataloader_from_tarred_nemo_manifest_weighted_combination(nemo_tarred_manifest_path: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path
config = OmegaConf.create(
{
"manifest_filepath": [[json_mft, 0.8], [json_mft, 0.2]],
"tarred_audio_filepaths": [[tar_mft], [tar_mft]],
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
b = next(iter(dl))
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
def test_dataloader_from_tarred_nemo_manifest_multi(nemo_tarred_manifest_path_multi: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [batch for batch in islice(dl, 4)]
assert len(batches) == 4
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 3
def test_dataloader_from_tarred_nemo_manifest_multi_max_open_streams(nemo_tarred_manifest_path_multi: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"manifest_filepath": [[json_mft], [json_mft]],
"tarred_audio_filepaths": [[tar_mft], [tar_mft]],
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"max_open_streams": 1,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
_ = next(iter(dl))
def test_dataloader_from_tarred_nemo_manifest_concat(nemo_tarred_manifest_path: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"concatenate_samples": True,
"concatenate_duration_factor": 3.0,
"batch_duration": 4.0,
"quadratic_duration": 15.0, # seconds
"use_bucketing": False,
"drop_last": False,
"shuffle_buffer_size": 10,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
batches = [batch for batch in islice(dl, 4)]
assert len(batches) == 4
# the first element has been concatenated: 2x16000 speech (2x1s) + 1600 gap (0.1s)
expected_audio_lens = torch.tensor([33600, 16000], dtype=torch.int32)
b = batches[0]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 2
torch.testing.assert_close(b["audio_lens"], expected_audio_lens)
b = batches[1]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 2
torch.testing.assert_close(b["audio_lens"], expected_audio_lens)
b = batches[2]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 2
torch.testing.assert_close(b["audio_lens"], expected_audio_lens)
b = batches[3]
assert set(b.keys()) == {"audio", "audio_lens", "ids"}
assert b["audio"].shape[0] == b["audio_lens"].shape[0] == 2
torch.testing.assert_close(b["audio_lens"], expected_audio_lens)
def test_dataloader_from_lhotse_shar_cuts_combine_datasets_unweighted(
cutset_shar_path: Path, cutset_shar_path_other: Path
):
"""
Note: if we iterated more mini-batches in this test, in the expectation there
will be 50-50 % mini-batch occupancy of examples from both datasets.
"""
config = OmegaConf.create(
{
"shar_path": [cutset_shar_path, cutset_shar_path_other],
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
# Note: we use islice here because with Lhotse Shar the dataloader will always be infinite.
batches = [batch for batch in islice(dl, 4)]
assert len(batches) == 4
b = batches[0]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 1 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 2 # dataset 2
b = batches[1]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 0 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 3 # dataset 2
b = batches[2]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 2 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 1 # dataset 2
b = batches[3]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 1 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 2 # dataset 2
def test_dataloader_from_lhotse_shar_cuts_combine_datasets_weighted(
cutset_shar_path: Path, cutset_shar_path_other: Path
):
"""
Note: if we iterated more mini-batches in this test, in the expectation there
will be 90-10 % mini-batch occupancy of examples from both datasets.
"""
config = OmegaConf.create(
{
"shar_path": [[cutset_shar_path, 90], [cutset_shar_path_other, 10]],
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
# Note: we use islice here because with Lhotse Shar the dataloader will always be infinite.
batches = [batch for batch in islice(dl, 6)]
assert len(batches) == 6
b = batches[0]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 3 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 0 # dataset 2
b = batches[1]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 1 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 2 # dataset 2
b = batches[2]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 2 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 1 # dataset 2
b = batches[3]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 3 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 0 # dataset 2
b = batches[4]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 3 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 0 # dataset 2
b = batches[5]
assert len([cid for cid in b["ids"] if cid.startswith("dummy")]) == 3 # dataset 1
assert len([cid for cid in b["ids"] if cid.startswith("other")]) == 0 # dataset 2
class TextDataset(torch.utils.data.Dataset):
def __getitem__(self, cuts: lhotse.CutSet) -> List[str]:
return [c.supervisions[0].text for c in cuts]
@pytest.mark.parametrize(["text_field", "text_value"], [(None, "irrelevant"), ("text-other", "not relevant")])
def test_dataloader_from_nemo_manifest_with_text_field(nemo_manifest_path: Path, text_field: str, text_value: str):
kwarg = {"text_field": text_field} if text_field is not None else {}
config = OmegaConf.create(
{
"manifest_filepath": nemo_manifest_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 2,
# lhotse specific
"use_bucketing": False,
**kwarg,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=TextDataset())
b = next(iter(dl))
assert b == [text_value] * 2
class LangDataset(torch.utils.data.Dataset):
def __getitem__(self, cuts: lhotse.CutSet) -> List[str]:
return [c.supervisions[0].language for c in cuts]
@pytest.mark.parametrize(["lang_field", "lang_value"], [(None, "en"), ("custom-lang", "pl")])
def test_dataloader_from_nemo_manifest_with_lang_field(nemo_manifest_path: Path, lang_field: str, lang_value: str):
kwarg = {"lang_field": lang_field} if lang_field is not None else {}
config = OmegaConf.create(
{
"manifest_filepath": nemo_manifest_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 2,
# lhotse specific
"use_bucketing": False,
**kwarg,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=LangDataset())
b = next(iter(dl))
assert b == [lang_value] * 2
def test_lazy_nemo_iterator_with_offset_field(tmp_path: Path):
import numpy as np
import soundfile as sf
from nemo.collections.common.data.lhotse.nemo_adapters import LazyNeMoIterator
# Have to generate as INT16 to avoid quantization error after saving to 16-bit WAV
INT16MAX = 2**15
expected_audio = np.random.randint(low=-INT16MAX - 1, high=INT16MAX, size=(16000,)).astype(np.float32) / INT16MAX
audio_path = str(tmp_path / "dummy.wav")
sf.write(audio_path, expected_audio, 16000)
manifest_path = str(tmp_path / "manifest.json")
lhotse.serialization.save_to_jsonl(
[
{"audio_filepath": audio_path, "offset": 0.0, "duration": 0.5, "text": "irrelevant"},
{"audio_filepath": audio_path, "offset": 0.5, "duration": 0.5, "text": "irrelevant"},
],
manifest_path,
)
cuts = lhotse.CutSet(LazyNeMoIterator(manifest_path))
cut = cuts[0]
assert isinstance(cut, lhotse.MonoCut)
assert cut.start == 0.0
assert cut.duration == 0.5
assert cut.sampling_rate == 16000
assert cut.num_samples == 8000
assert cut.supervisions[0].text == "irrelevant"
audio = cut.load_audio()
assert audio.shape == (1, 8000)
np.testing.assert_equal(audio[0], expected_audio[:8000])
cut = cuts[1]
assert isinstance(cut, lhotse.MonoCut)
assert cut.start == 0.5
assert cut.duration == 0.5
assert cut.sampling_rate == 16000
assert cut.num_samples == 8000
assert cut.supervisions[0].text == "irrelevant"
audio = cut.load_audio()
assert audio.shape == (1, 8000)
np.testing.assert_allclose(audio[0], expected_audio[8000:], atol=5e-5)
assert cuts[0].id != cuts[1].id
def test_lazy_nemo_iterator_with_relative_paths(tmp_path: Path):
import numpy as np
import soundfile as sf
from nemo.collections.common.data.lhotse.nemo_adapters import LazyNeMoIterator
# Have to generate as INT16 to avoid quantization error after saving to 16-bit WAV
INT16MAX = 2**15
expected_audio = np.random.randint(low=-INT16MAX - 1, high=INT16MAX, size=(16000,)).astype(np.float32) / INT16MAX
audio_path = str(tmp_path / "dummy.wav")
sf.write(audio_path, expected_audio, 16000)
manifest_path = str(tmp_path / "manifest.json")
lhotse.serialization.save_to_jsonl(
[
# note: relative path
{"audio_filepath": "dummy.wav", "offset": 0.0, "duration": 0.5, "text": "irrelevant"},
],
manifest_path,
)
cuts = lhotse.CutSet(LazyNeMoIterator(manifest_path))
cut = cuts[0]
audio = cut.load_audio()
assert isinstance(cut, lhotse.MonoCut)
assert cut.start == 0.0
assert cut.duration == 0.5
assert cut.sampling_rate == 16000
assert cut.num_samples == 8000
assert cut.supervisions[0].text == "irrelevant"
assert audio.shape == (1, 8000)
np.testing.assert_equal(audio[0], expected_audio[:8000])
def test_lhotse_cuts_resolve_relative_paths(tmp_path: Path):
cuts_path = tmp_path / "cuts.jsonl.gz"
audio_path = tmp_path / "_relative_test_audio_.wav"
lhotse.audio.save_audio(audio_path, np.random.rand(16000) - 0.5, 16000)
cut = Recording.from_file(audio_path).to_cut()
cut.recording.sources[0].source = str(audio_path.name) # make the path relative
cut.target_recording = cut.recording # assign a custom field with relative path
with NumpyFilesWriter(tmp_path) as w:
cut.some_array = w.store_array(cut.id, np.random.randn(32))
cut.some_array.storage_path = "" # relative path
with pytest.raises(AudioLoadingError):
cut.load_audio() # Lhotse doesn't know about what the path should be relative to
cut.load_target_recording()
CutSet([cut]).to_file(cuts_path)
config = OmegaConf.create(
{
"cuts_path": cuts_path,
"sample_rate": 16000,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 2,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=_Identity())
batches = [batch for batch in dl]
assert len(batches) == 1
for cut in batches[0]:
assert cut.has_recording
cut.load_audio() # works
assert cut.has_custom("target_recording")
cut.load_target_recording()
assert cut.has_custom("some_array")
cut.load_some_array()
class Identity(torch.utils.data.Dataset):
def __getitem__(self, cuts: lhotse.CutSet) -> lhotse.CutSet:
return cuts
def test_extended_data_input_cfg(cutset_shar_path, nemo_tarred_manifest_path_multi):
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": nemo_tarred_manifest_path_multi[0],
"tarred_audio_filepaths": nemo_tarred_manifest_path_multi[1],
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D1",
},
},
{
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D2",
},
},
],
"sample_rate": 16000,
"shuffle": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert all(c.custom["language"] == "en" for c in b)
assert all(c.custom["modality"] == "audio" for c in b)
assert sum(c.custom["dataset_name"] == "D1" for c in b) == 2
assert sum(c.custom["dataset_name"] == "D2" for c in b) == 2
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert all(c.custom["language"] == "en" for c in b)
assert all(c.custom["modality"] == "audio" for c in b)
assert sum(c.custom["dataset_name"] == "D1" for c in b) == 1
assert sum(c.custom["dataset_name"] == "D2" for c in b) == 3
def test_extended_data_input_cfg_subgroup(cutset_shar_path, nemo_tarred_manifest_path_multi):
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "group",
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": nemo_tarred_manifest_path_multi[0],
"tarred_audio_filepaths": nemo_tarred_manifest_path_multi[1],
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D1",
},
},
{
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D2",
},
},
],
"weight": 0.2,
"tags": {
"group_name": "G1",
},
},
{
"type": "group",
"weight": 0.8,
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": nemo_tarred_manifest_path_multi[0],
"tarred_audio_filepaths": nemo_tarred_manifest_path_multi[1],
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D3",
},
},
{
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D4",
},
},
],
"tags": {
"group_name": "G2",
},
},
],
"sample_rate": 16000,
"shuffle": True,
"num_workers": 0,
"batch_size": 32,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Sample 100 mini-batches and test statistical properties
group_occurrences = Counter()
dataset_occurrences = Counter()
for batch in islice(dl, 100):
for cut in batch:
group_occurrences[cut.group_name] += 1
dataset_occurrences[cut.dataset_name] += 1
tot = sum(group_occurrences.values())
for k in group_occurrences:
group_occurrences[k] /= tot
for k in dataset_occurrences:
dataset_occurrences[k] /= tot
def almost(number):
return pytest.approx(number, abs=0.02)
assert group_occurrences["G1"] == almost(0.2) # group weight: 0.2
assert group_occurrences["G2"] == almost(0.8) # group weight: 0.8
assert dataset_occurrences["D1"] == almost(0.1) # group weight: 0.2 * dataset weight 0.5 => 0.1
assert dataset_occurrences["D2"] == almost(0.1) # group weight: 0.2 * dataset weight 0.5 => 0.1
assert dataset_occurrences["D3"] == almost(0.4) # group weight: 0.8 * dataset weight 0.5 => 0.4
assert dataset_occurrences["D4"] == almost(0.4) # group weight: 0.8 * dataset weight 0.5 => 0.4
def test_extended_data_input_cfg_yaml_path(tmp_path, cutset_shar_path, nemo_tarred_manifest_path_multi):
input_cfg = [
{
"type": "nemo_tarred",
"manifest_filepath": str(nemo_tarred_manifest_path_multi[0]),
"tarred_audio_filepaths": str(nemo_tarred_manifest_path_multi[1]),
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D1",
},
},
{
"type": "lhotse_shar",
"shar_path": str(cutset_shar_path),
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D2",
},
},
]
yaml_path = tmp_path / "input_cfg.yaml"
lhotse.serialization.save_to_yaml(input_cfg, yaml_path)
config = OmegaConf.create(
{
"input_cfg": input_cfg,
"sample_rate": 16000,
"shuffle": True,
"num_workers": 0,
"batch_size": 32,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batch = next(iter(dl))
assert isinstance(batch, lhotse.CutSet)
for cut in batch:
assert cut.dataset_name in ("D1", "D2")
@pytest.fixture(scope="session")
def txt_en_path(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("text_data")
en_path = tmp_path / "text.en"
en_path.write_text(
"""Example text in English.
Another sentence.
"""
)
return en_path
@pytest.fixture(scope="session")
def txt_es_path(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("text_data")
es_path = tmp_path / "text.es"
es_path.write_text(
"""Otro texto en ingles.
Otra frase."""
)
return es_path
@pytest.fixture(scope="session")
def questions_path(tmp_path_factory) -> str:
tmpdir = tmp_path_factory.mktemp("questions")
qp = tmpdir / "questions.txt"
qp.write_text("translate the following to spanish")
return str(qp)
def test_text_file_input(txt_en_path, txt_es_path):
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "txt",
"paths": txt_en_path,
"language": "en",
},
],
"shuffle": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
# Note: this test does not need to pass a tokenizer because we use static batch sizes
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, TextExample) for c in b)
assert all(c.language == "en" for c in b)
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, TextExample) for c in b)
assert all(c.language == "en" for c in b)
def test_text_file_pairs_input(txt_en_path, txt_es_path, questions_path):
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "txt_pair",
"source_paths": txt_en_path,
"target_paths": txt_es_path,
"questions_path": questions_path,
"source_language": "en",
"target_language": "es",
"questions_language": "en",
},
],
"shuffle": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
# Note: this test does not need to pass a tokenizer because we use static batch sizes
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, SourceTargetTextExample) for c in b)
assert all(c.source.language == "en" for c in b)
assert all(c.target.language == "es" for c in b)
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, SourceTargetTextExample) for c in b)
assert all(c.source.language == "en" for c in b)
assert all(c.target.language == "es" for c in b)
@pytest.fixture(scope="session")
def txt_pair_paths_shards(tmp_path_factory, txt_en_path, txt_es_path):
tmp_path = tmp_path_factory.mktemp("text_data_shards")
en_text = txt_en_path.read_text().splitlines()
(tmp_path / "en_0.txt").write_text("\n".join(en_text[:5]))
(tmp_path / "en_1.txt").write_text("\n".join(en_text[5:]))
es_text = txt_es_path.read_text().splitlines()
(tmp_path / "es_0.txt").write_text("\n".join(es_text[:5]))
(tmp_path / "es_1.txt").write_text("\n".join(es_text[5:]))
return f"{tmp_path}/en__OP_0..1_CL_.txt", f"{tmp_path}/es__OP_0..1_CL_.txt"
def test_text_file_pairs_shards_input(txt_pair_paths_shards: tuple[str, str], questions_path):
en_paths, es_paths = txt_pair_paths_shards
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "txt_pair",
"source_paths": en_paths,
"target_paths": es_paths,
"questions_path": questions_path,
"source_language": "en",
"target_language": "es",
"questions_language": "en",
},
],
"shuffle": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
# Note: this test does not need to pass a tokenizer because we use static batch sizes
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, SourceTargetTextExample) for c in b)
assert all(c.source.language == "en" for c in b)
assert all(c.target.language == "es" for c in b)
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert all(isinstance(c, SourceTargetTextExample) for c in b)
assert all(c.source.language == "en" for c in b)
assert all(c.target.language == "es" for c in b)
@pytest.fixture(scope="session")
def en_es_tokenizer(tmp_path_factory, txt_en_path, txt_es_path) -> SentencePieceTokenizer:
tmpdir = tmp_path_factory.mktemp("en_es_tokenizer")
text_path = tmpdir / "text.txt"
text_path.write_text(txt_en_path.read_text() + "\n" + txt_es_path.read_text())
create_spt_model(text_path, vocab_size=128, sample_size=-1, do_lower_case=False, output_dir=str(tmpdir))
return SentencePieceTokenizer(str(tmpdir / "tokenizer.model"))
def test_multimodal_text_audio_dataloading(
txt_pair_paths_shards: tuple[str, str],
nemo_tarred_manifest_path_multi: tuple[str, str],
en_es_tokenizer: SentencePieceTokenizer,
questions_path: str,
):
en_paths, es_paths = txt_pair_paths_shards
manifest_filepath, tarred_audio_filepaths = nemo_tarred_manifest_path_multi
QF, BT = 50, 1024
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "txt_pair",
"source_paths": en_paths,
"target_paths": es_paths,
"source_language": "en",
"target_language": "es",
"questions_path": questions_path,
"questions_language": "en",
"tags": {
"modality": "text",
},
},
{
"type": "nemo_tarred",
"manifest_filepath": manifest_filepath,
"tarred_audio_filepaths": tarred_audio_filepaths,
"tags": {
"modality": "audio",
},
},
],
"shuffle": True,
"num_workers": 0,
"use_multimodal_sampling": True,
"prompt_format": "plain",
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
tokenizer=en_es_tokenizer,
)
b = next(iter(dl))
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert any(isinstance(ex, Cut) for ex in b)
assert any(isinstance(ex, SourceTargetTextExample) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT
for ex in b:
if isinstance(ex, Cut):
assert ex.modality == "audio"
assert isinstance(ex.load_audio(), np.ndarray)
assert isinstance(ex.supervisions[0].text, str)
if isinstance(ex, SourceTargetTextExample):
assert ex.modality == "text"
assert ex.source.language == "en"
assert ex.target.language == "es"
assert isinstance(ex.source.text, str)
assert isinstance(ex.target.text, str)
assert isinstance(ex.question.text, str)
assert torch.is_tensor(ex.input_ids)
assert torch.is_tensor(ex.context_ids)
assert torch.is_tensor(ex.answer_ids)
assert torch.is_tensor(ex.mask)
def test_multimodal_text_audio_dataloading_zip_strategy(
txt_pair_paths_shards: tuple[str, str],
nemo_tarred_manifest_path_multi: tuple[str, str],
en_es_tokenizer: SentencePieceTokenizer,
questions_path: str,
):
en_paths, es_paths = txt_pair_paths_shards
manifest_filepath, tarred_audio_filepaths = nemo_tarred_manifest_path_multi
QF, BT = 50, 64
config = OmegaConf.create(
{
"multi_config": True,
"sampler_fusion": "zip", # <---- !!! this option is being tested here !!!
"seed": 0,
"shard_seed": 0,
"shuffle": True,
"num_workers": 0,
"audio": {
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": manifest_filepath,
"tarred_audio_filepaths": tarred_audio_filepaths,
"tags": {
"modality": "audio",
},
},
],
"prompt_format": "plain",
"use_multimodal_sampling": True,
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
},
"text": {
"input_cfg": [
{
"type": "txt_pair",
"source_paths": en_paths,
"target_paths": es_paths,
"source_language": "en",
"target_language": "es",
"questions_path": questions_path,
"questions_language": "en",
"tags": {
"modality": "text",
},
},
],
"use_multimodal_sampling": True,
"prompt_format": "plain",
"batch_tokens": 64,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": 50,
},
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
tokenizer=en_es_tokenizer,
)
assert isinstance(dl.dataset.sampler, ZipSampler)
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert any(isinstance(ex, Cut) for ex in b)
assert any(isinstance(ex, SourceTargetTextExample) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
# Note: zip samples stitches together two batches hence * 2
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT * 2
for ex in b:
if isinstance(ex, Cut):
assert ex.modality == "audio"
assert isinstance(ex.load_audio(), np.ndarray)
assert isinstance(ex.supervisions[0].text, str)
if isinstance(ex, SourceTargetTextExample):
assert ex.modality == "text"
assert ex.source.language == "en"
assert ex.target.language == "es"
assert torch.is_tensor(ex.input_ids)
assert torch.is_tensor(ex.context_ids)
assert torch.is_tensor(ex.answer_ids)
assert torch.is_tensor(ex.mask)
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert any(isinstance(ex, Cut) for ex in b)
assert any(isinstance(ex, SourceTargetTextExample) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
# Note: zip samples stitches together two batches hence * 2
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT * 2
for ex in b:
if isinstance(ex, Cut):
assert ex.modality == "audio"
assert isinstance(ex.load_audio(), np.ndarray)
assert isinstance(ex.supervisions[0].text, str)
if isinstance(ex, SourceTargetTextExample):
assert ex.modality == "text"
assert ex.source.language == "en"
assert ex.target.language == "es"
assert torch.is_tensor(ex.input_ids)
assert torch.is_tensor(ex.context_ids)
assert torch.is_tensor(ex.answer_ids)
assert torch.is_tensor(ex.mask)
def test_multimodal_text_audio_dataloading_round_robin_strategy(
txt_pair_paths_shards: tuple[str, str],
nemo_tarred_manifest_path_multi: tuple[str, str],
en_es_tokenizer: SentencePieceTokenizer,
questions_path: str,
):
en_paths, es_paths = txt_pair_paths_shards
manifest_filepath, tarred_audio_filepaths = nemo_tarred_manifest_path_multi
QF, BT = 50, 64
config = OmegaConf.create(
{
"multi_config": True,
"sampler_fusion": "round_robin", # <---- !!! this option is being tested here !!!
"seed": 0,
"shard_seed": 0,
"shuffle": True,
"num_workers": 0,
"audio": {
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": manifest_filepath,
"tarred_audio_filepaths": tarred_audio_filepaths,
"tags": {
"modality": "audio",
},
},
],
"use_multimodal_sampling": True,
"prompt_format": "plain",
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
},
"text": {
"input_cfg": [
{
"type": "txt_pair",
"source_paths": en_paths,
"target_paths": es_paths,
"source_language": "en",
"target_language": "es",
"questions_path": questions_path,
"questions_language": "en",
"tags": {
"modality": "text",
},
},
],
"prompt_format": "plain",
"use_multimodal_sampling": True,
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
},
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
tokenizer=en_es_tokenizer,
)
assert isinstance(dl.dataset.sampler, RoundRobinSampler)
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
# Batch 0 is audio-only
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert all(isinstance(ex, Cut) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT
for ex in b:
assert ex.modality == "audio"
assert isinstance(ex.load_audio(), np.ndarray)
assert isinstance(ex.supervisions[0].text, str)
# Batch 1 is text-only
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert all(isinstance(ex, SourceTargetTextExample) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT
for ex in b:
assert ex.modality == "text"
assert ex.source.language == "en"
assert ex.target.language == "es"
assert torch.is_tensor(ex.input_ids)
assert torch.is_tensor(ex.context_ids)
assert torch.is_tensor(ex.answer_ids)
assert torch.is_tensor(ex.mask)
def test_multimodal_text_audio_dataloading_randomized_round_robin_strategy(
deterministic_rng,
txt_pair_paths_shards: tuple[str, str],
nemo_tarred_manifest_path_multi: tuple[str, str],
en_es_tokenizer: SentencePieceTokenizer,
questions_path: str,
):
en_paths, es_paths = txt_pair_paths_shards
manifest_filepath, tarred_audio_filepaths = nemo_tarred_manifest_path_multi
QF, BT = 50, 64
config = OmegaConf.create(
{
"multi_config": True,
"sampler_fusion": "randomized_round_robin", # <---- !!! this option is being tested here !!!
"sampler_weights": {
"audio": 0.5,
"text": 0.5,
},
"seed": 0,
"shard_seed": 0,
"shuffle": True,
"num_workers": 0,
"audio": {
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": manifest_filepath,
"tarred_audio_filepaths": tarred_audio_filepaths,
"tags": {
"modality": "audio",
},
},
],
"use_multimodal_sampling": True,
"prompt_format": "plain",
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
},
"text": {
"input_cfg": [
{
"type": "txt_pair",
"source_paths": en_paths,
"target_paths": es_paths,
"source_language": "en",
"target_language": "es",
"questions_path": questions_path,
"questions_language": "en",
"tags": {
"modality": "text",
},
},
],
"prompt_format": "plain",
"use_multimodal_sampling": True,
"batch_tokens": BT,
# How to set token equivalent duration in actual training?
# assuming fbank frames: 0.01 is the base due to frame shift;
# + subsampling x8 gives us 0.08
# assuming discrete audio tokens, with frame rate 50Hz,
# we'd get 0.02
# in this test we'll just use 0.1 for simplicity
"token_equivalent_duration": 0.1,
"quadratic_factor": QF,
},
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
tokenizer=en_es_tokenizer,
)
assert isinstance(dl.dataset.sampler, RoundRobinSampler)
# Note: we use islice here because the dataloader will be infinite.
batches = [batch for batch in islice(dl, 2)]
# Batch 0 is audio-only
b = batches[0]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert all(isinstance(ex, Cut) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT
for ex in b:
assert ex.modality == "audio"
assert isinstance(ex.load_audio(), np.ndarray)
assert isinstance(ex.supervisions[0].text, str)
# Batch 1 is text-only
b = batches[1]
assert isinstance(b, lhotse.CutSet)
assert len(b)
assert all(isinstance(ex, SourceTargetTextExample) for ex in b)
# Batch tokens is not exceeded after applying the quadratic factor correction
assert sum(ex.num_tokens**2 / QF for ex in b) <= BT
for ex in b:
assert ex.modality == "text"
assert ex.source.language == "en"
assert ex.target.language == "es"
assert torch.is_tensor(ex.input_ids)
assert torch.is_tensor(ex.context_ids)
assert torch.is_tensor(ex.answer_ids)
assert torch.is_tensor(ex.mask)
def test_dataloader_with_noise_nemo_json(cutset_path: Path, nemo_manifest_path: Path):
config = OmegaConf.create(
{
"cuts_path": str(cutset_path),
"noise_path": str(nemo_manifest_path),
"noise_mix_prob": 1.0,
"noise_snr": [-5.0, 5.0],
"batch_size": 2,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
)
batch = next(iter(dl))
assert isinstance(batch, CutSet)
assert len(batch) == 2
cut = batch[0]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
cut = batch[1]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
def test_dataloader_with_noise_lhotse_jsonl(cutset_path: Path):
config = OmegaConf.create(
{
"cuts_path": str(cutset_path),
"noise_path": str(cutset_path),
"noise_mix_prob": 1.0,
"noise_snr": [-5.0, 5.0],
"batch_size": 2,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
)
batch = next(iter(dl))
assert isinstance(batch, CutSet)
assert len(batch) == 2
cut = batch[0]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
cut = batch[1]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
def test_dataloader_with_noise_nemo_tar(cutset_path: Path, nemo_tarred_manifest_path_multi: Path):
noise_json, noise_tar = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"cuts_path": str(cutset_path),
"noise_path": {
"manifest_filepath": noise_json,
"tarred_audio_filepaths": noise_tar,
},
"noise_mix_prob": 1.0,
"noise_snr": [-5.0, 5.0],
"batch_size": 2,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
)
batch = next(iter(dl))
assert isinstance(batch, CutSet)
assert len(batch) == 2
cut = batch[0]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
cut = batch[1]
assert isinstance(cut, MixedCut)
assert -5.0 < cut.tracks[1].snr < 5.0
def test_dataloader_with_synth_rir(cutset_path: Path):
from lhotse.augmentation import ReverbWithImpulseResponse
config = OmegaConf.create(
{
"cuts_path": str(cutset_path),
"rir_enabled": True,
"rir_prob": 0.5,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config,
global_rank=0,
world_size=1,
dataset=Identity(),
)
batch = next(iter(dl))
assert isinstance(batch, CutSet)
assert len(batch) == 4
cut = batch[0]
assert isinstance(cut, MonoCut)
assert cut.recording.transforms is None
cut = batch[1]
assert isinstance(cut, MonoCut)
assert cut.recording.transforms is None
cut = batch[2]
assert isinstance(cut, MonoCut)
assert isinstance(cut.recording.transforms, list) and len(cut.recording.transforms) == 1
tfnm = cut.recording.transforms[0]
if isinstance(tfnm, dict): # lhotse<=1.23.0
assert tfnm["name"] == "ReverbWithImpulseResponse"
else: # lhotse>=1.24.0
assert isinstance(tfnm, ReverbWithImpulseResponse)
cut = batch[3]
assert isinstance(cut, MonoCut)
assert isinstance(cut.recording.transforms, list) and len(cut.recording.transforms) == 1
tfnm = cut.recording.transforms[0]
if isinstance(tfnm, dict): # lhotse<=1.23.0
assert tfnm["name"] == "ReverbWithImpulseResponse"
else: # lhotse>=1.24.0
assert isinstance(tfnm, ReverbWithImpulseResponse)
def test_dataloader_bucket_batch_size(nemo_tarred_manifest_path_multi: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
# Note: all input cuts belong to the first bucket so the batch size will always be 2.
"bucket_duration_bins": [2.0, 4.0],
"bucket_batch_size": [2, 1],
"drop_last": False,
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
for b in islice(dl, 10):
assert len(b) == 2
def test_dataloader_2d_bucketing(nemo_tarred_manifest_path_multi: tuple[str, str], en_es_tokenizer):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
# Here each bin has the format: [audio_duration, token_sequence_length]
"bucket_duration_bins": [[0.5, 1], [0.5, 2], [2.0, 5], [2.0, 15], [4.0, 10], [4.0, 20]],
"bucket_batch_size": [7, 6, 5, 4, 3, 2],
"drop_last": False,
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=Identity(), tokenizer=en_es_tokenizer
)
# All of our data have duration 1.0 and 10 tokens so they will fall to bin[3] with batch_size=4
for b in islice(dl, 10):
assert len(b) == 4
@pytest.fixture(scope="session")
def questions_path(tmp_path_factory) -> Path:
"""A text file with 10 lines containing question values"""
qdir = tmp_path_factory.mktemp("questions")
path = qdir / "questions.txt"
path.write_text("\n".join(f"some question number {i}" for i in range(10)))
return path
def test_dataloader_from_nemo_nontarred_manifest_with_extra_questions_field_iter(
nemo_manifest_path: Path, questions_path: Path
):
config = OmegaConf.create(
{
"input_cfg": [
{
"manifest_filepath": nemo_manifest_path,
"type": "nemo",
"extra_fields": [
{
"type": "text_iter",
"name": "question",
"path": questions_path,
}
],
},
],
"sample_rate": 16000,
"shuffle": False,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 2,
"use_bucketing": False,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
b = next(iter(dl))
c = b[0]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 0"
c = b[1]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 1"
def test_dataloader_from_nemo_manifest_with_extra_questions_field_iter(
nemo_tarred_manifest_path: tuple, questions_path: Path
):
config = OmegaConf.create(
{
"input_cfg": [
{
"manifest_filepath": nemo_tarred_manifest_path[0],
"tarred_audio_filepaths": nemo_tarred_manifest_path[1],
"type": "nemo_tarred",
"extra_fields": [
{
"type": "text_iter",
"name": "question",
"path": questions_path,
}
],
},
],
"sample_rate": 16000,
"shuffle": False,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 2,
"use_bucketing": False,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
b = next(iter(dl))
c = b[0]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 0"
c = b[1]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 1"
def test_dataloader_from_nemo_manifest_with_extra_questions_field_sample(
nemo_tarred_manifest_path: tuple, questions_path: Path
):
config = OmegaConf.create(
{
"input_cfg": [
{
"manifest_filepath": nemo_tarred_manifest_path[0],
"tarred_audio_filepaths": nemo_tarred_manifest_path[1],
"type": "nemo_tarred",
"extra_fields": [
{
"type": "text_sample",
"name": "question",
"path": questions_path,
}
],
},
],
"sample_rate": 16000,
"shuffle": False,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 5,
"seed": 0,
"shard_seed": 0,
"use_bucketing": False,
}
)
# Note: despite shuffle=True, it is sampling lines from questions_path because of type: "text_sample"
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
b = next(iter(dl))
c = b[0]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 6"
c = b[1]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 6"
c = b[2]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 0"
c = b[3]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 4"
c = b[4]
assert isinstance(c, MonoCut)
assert hasattr(c, "question")
assert c.question == "some question number 8"
@pytest.fixture(scope="session")
def nemo_tarred_manifest_path_with_offset(tmp_path_factory) -> Tuple[str, str]:
"""10 utterances of length 1s as a NeMo tarred manifest."""
from lhotse.serialization import SequentialJsonlWriter
from lhotse.shar.writers import TarWriter
root = tmp_path_factory.mktemp("nemo_tar_offset")
root.mkdir(exist_ok=True)
recording = dummy_recording(0, duration=10.0, with_data=True)
with (
TarWriter(f"{root}/audios_0.tar", shard_size=None) as tar_writer,
SequentialJsonlWriter(root / "tarred_audio_filepaths.jsonl") as mft_writer,
):
def audio_path(n: int = None):
return recording.id + ("" if n is None else f"-sub{n}") + ".wav"
tar_writer.write(audio_path(), BytesIO(recording.sources[0].source))
mft_writer.write(
{ # segment 0-3s
"audio_filepath": audio_path(),
"offset": 0.0,
"duration": 3.0,
"text": "irrelevant",
"lang": "en",
"shard_id": 0,
}
)
mft_writer.write(
{ # segment 4-9s
"audio_filepath": audio_path(1),
"offset": 4.0,
"duration": 5.0,
"text": "irrelevant-2",
"lang": "en",
"shard_id": 0,
}
)
mft_writer.write(
{ # full recording - for reference
"audio_filepath": audio_path(2),
"offset": 0.0,
"duration": 10.0,
"text": "irrelevant irrelevant-2",
"lang": "en",
"shard_id": 0,
}
)
return mft_writer.path, tar_writer.output_paths[0]
def test_dataloader_from_tarred_nemo_manifest_with_offset(nemo_tarred_manifest_path_with_offset: tuple[str, str]):
json_mft, tar_mft = nemo_tarred_manifest_path_with_offset
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": False,
"num_workers": 0,
"batch_size": 3,
"seed": 0,
"shard_seed": 0,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# Loads all three examples in a single mini-batch (that's why batch_size=3).
batches = [b for b in dl]
assert len(batches) == 1
(batch,) = batches
assert len(batch) == 3
# Validate example containing full 10s recording.
full_cut = batch[1]
assert full_cut.start == 0.0
assert full_cut.duration == 10.0
assert full_cut.supervisions[0].text == "irrelevant irrelevant-2"
assert full_cut.supervisions[0].language == "en"
full_audio = full_cut.load_audio()
assert full_audio.shape[1] == full_cut.num_samples == 160000 # 10s * 16kHz
# Validate segment 0-3s.
cut = batch[2]
assert cut.start == 0.0
assert cut.duration == 3.0
assert cut.supervisions[0].text == "irrelevant"
assert cut.supervisions[0].language == "en"
audio = cut.load_audio()
assert audio.shape[1] == cut.num_samples
# Check the audio for the segment is identical to a slice of the full audio.
np.testing.assert_equal(audio, full_audio[:, : compute_num_samples(cut.duration, cut.sampling_rate)])
# Validate segment 4-9s.
# Note: LazyNeMoTarredIterator removes the offset information, as it creates a new recording
# that's a "subset" of the original recording as a memory saving optimization.
# Hence, we will not see cut.start == 4.0.
cut = batch[0]
assert cut.start == 0.0
assert cut.duration == 5.0
assert cut.supervisions[0].text == "irrelevant-2"
assert cut.supervisions[0].language == "en"
audio = cut.load_audio()
assert audio.shape[1] == cut.num_samples
# Check the audio for the segment is identical to a slice of the full audio.
np.testing.assert_equal(
audio, full_audio[:, compute_num_samples(4.0, cut.sampling_rate) : compute_num_samples(9.0, cut.sampling_rate)]
)
def test_force_iterable_dataset(cutset_path: Path):
config = OmegaConf.create({"cuts_path": cutset_path, "batch_size": 2, "num_workers": 2})
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches_map = [b for b in dl]
config = OmegaConf.create(
{"cuts_path": cutset_path, "batch_size": 2, "num_workers": 2, "force_iterable_dataset": True}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches_iter = [b for b in dl]
# 2x duplicated data due to iterable dataset lack of deduplication
assert len(batches_iter) == 2 * len(batches_map)
# assertion that this is in fact the same data (same ids)
assert set(c.id for b in batches_iter for c in b) == set(c.id for b in batches_map for c in b)
def test_force_map_dataset(cutset_shar_path: Path):
config = OmegaConf.create({"shar_path": cutset_shar_path, "batch_size": 2, "num_workers": 2, "force_finite": True})
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches_iter = [b for b in dl]
config = OmegaConf.create(
{
"shar_path": cutset_shar_path,
"batch_size": 2,
"num_workers": 2,
"force_map_dataset": True,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches_map = [b for b in dl]
# 2x duplicated data due to iterable dataset lack of deduplication
assert len(batches_iter) == 2 * len(batches_map)
# assertion that this is in fact the same data (same ids)
assert set(c.id for b in batches_iter for c in b) == set(c.id for b in batches_map for c in b)
def test_dataloader_from_tarred_nemo_subset_manifest(nemo_tarred_manifest_subset_path: tuple[str, str]):
json_mft, tar_mft, subset_items = nemo_tarred_manifest_subset_path
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
# lhotse specific
"use_bucketing": True,
"concurrent_bucketing": False,
"num_buckets": 2,
"drop_last": False,
"batch_duration": 4.0, # seconds
"quadratic_duration": 15.0, # seconds
"shuffle_buffer_size": 10,
"bucket_buffer_size": 100,
"seed": 0,
"shard_seed": 0,
"tarred_random_access": True,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(
config=config, global_rank=0, world_size=1, dataset=UnsupervisedAudioDataset()
)
seen_ids = list()
for batch in dl:
current_ids = batch["ids"]
seen_ids += current_ids
expected_ids = set([data['audio_filepath'] for data in subset_items])
seen_ids_set = set(seen_ids)
assert len(seen_ids_set) == len(seen_ids), "Duplicate IDs found in the batch."
assert seen_ids_set == expected_ids, "The set of IDs in the batches does not match the input JSON manifests."
def test_dataloader_from_nemo_manifest_with_skipme(nemo_manifest_with_skipme_path: Path):
config = OmegaConf.create(
{
"manifest_filepath": nemo_manifest_with_skipme_path,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 1,
# lhotse specific
"use_bucketing": False,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=_Identity())
batches = [batch for batch in dl]
skipme_s = [cut.custom.get('_skipme', 0) for batch in batches for cut in batch]
assert len(batches) == 8
assert not any(skipme_s)
def test_dataloader_from_tarred_nemo_manifest_with_skipme(nemo_tarred_manifest_with_skipme_path: tuple[Path, str]):
json_mft, tar_mft = nemo_tarred_manifest_with_skipme_path
config = OmegaConf.create(
{
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"sample_rate": 16000,
"shuffle": True,
"use_lhotse": True,
"num_workers": 0,
"batch_size": 1,
# lhotse specific
"use_bucketing": False,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=_Identity())
batches = [batch for batch in dl]
skipme_s = [cut.custom.get('_skipme', 0) for batch in batches for cut in batch]
assert len(batches) == 8
assert not any(skipme_s)
def test_dataloader_from_data_input_cfg_yaml_path_with_skipme(cutset_shar_path, nemo_tarred_manifest_with_skipme_path):
config = OmegaConf.create(
{
"input_cfg": [
{
"type": "nemo_tarred",
"manifest_filepath": nemo_tarred_manifest_with_skipme_path[0],
"tarred_audio_filepaths": nemo_tarred_manifest_with_skipme_path[1],
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D1",
},
},
{
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
"weight": 0.5,
"tags": {
"language": "en",
"modality": "audio",
"dataset_name": "D2",
},
},
],
"sample_rate": 16000,
"shuffle": True,
"num_workers": 0,
"batch_size": 4,
"seed": 0,
"shard_seed": 0,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches = [batch for batch in dl]
skipme_s = [cut.custom.get('_skipme', 0) for batch in batches for cut in batch]
assert not any(skipme_s)
def test_dataloader_lhotse_shar_nemo_tarred_slice_length(
nemo_tarred_manifest_path_multi: tuple[str, str], cutset_shar_path: Path
):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"input_cfg": [
{
# 2 shards, 5 utterances each
"type": "nemo_tarred",
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
"tags": {"origin": "nemo_tarred"},
},
{
# 2 shards, 5 utterances each
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
"tags": {"origin": "lhotse_shar"},
},
],
"slice_length": 2,
"shuffle": True,
"num_workers": 0,
"seed": 0,
"shard_seed": 0,
"batch_size": 4,
"force_finite": True,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
batches = [b for b in dl]
assert len(batches) == 2
# We expect to sample a total of 8 examples (4 shards, 2 examples each), in 2 batches of size 4 each,
# half of it coming from nemo tarred dataset, and the other half from lhotse shar dataset.
origin_tags = Counter()
origin_shards = Counter()
for b in batches:
assert len(b) == 4
for cut in b:
origin_tags[cut.origin] += 1
if cut.origin == "lhotse_shar":
origin_shard = cut.shard_origin
else:
origin_shard = cut.manifest_origin
origin_shard = Path(origin_shard).name
origin_shards[origin_shard] += 1
assert origin_tags["lhotse_shar"] == 4
assert origin_tags["nemo_tarred"] == 4
assert origin_shards["cuts.000000.jsonl.gz"] == 2
assert origin_shards["cuts.000001.jsonl.gz"] == 2
assert origin_shards["manifest_0.jsonl"] == 2
assert origin_shards["manifest_1.jsonl"] == 2
def test_dataloader_lhotse_shar_slice_length_multi_epoch_different_sample(cutset_shar_path: Path):
config = OmegaConf.create(
{
"input_cfg": [
{
# 2 shards, 5 utterances each
"type": "lhotse_shar",
"shar_path": cutset_shar_path,
},
],
"slice_length": 2,
"shuffle": False, # shuffle is disabled, but the slice offset still must be random!
"num_workers": 0,
"seed": 0,
"shard_seed": 0,
"batch_size": 2,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# 2 batches == 1 epoch => 4 batches == 2 epochs
batches = [b for b in islice(dl, 4)]
assert len(batches) == 4
epoch0_ids = [cut.id for b in batches[:2] for cut in b]
epoch1_ids = [cut.id for b in batches[2:] for cut in b]
assert epoch0_ids != epoch1_ids
assert epoch0_ids + epoch1_ids != sorted(epoch0_ids + epoch1_ids) # true when slice_length=None
def test_dataloader_nemo_tarred_slice_length_multi_epoch_different_sample(
nemo_tarred_manifest_path_multi: tuple[str, str],
):
json_mft, tar_mft = nemo_tarred_manifest_path_multi
config = OmegaConf.create(
{
"input_cfg": [
{
# 2 shards, 5 utterances each
"type": "nemo_tarred",
"manifest_filepath": json_mft,
"tarred_audio_filepaths": tar_mft,
},
],
"slice_length": 2,
"shuffle": False, # shuffle is disabled, but the slice offset still must be random!
"num_workers": 0,
"seed": 0,
"shard_seed": 0,
"batch_size": 2,
}
)
dl = get_lhotse_dataloader_from_config(config=config, global_rank=0, world_size=1, dataset=Identity())
# 2 batches == 1 epoch => 4 batches == 2 epochs
batches = [b for b in islice(dl, 4)]
assert len(batches) == 4
epoch0_ids = [cut.id for b in batches[:2] for cut in b]
epoch1_ids = [cut.id for b in batches[2:] for cut in b]
assert epoch0_ids != epoch1_ids
assert epoch0_ids + epoch1_ids != sorted(epoch0_ids + epoch1_ids) # true when slice_length=None