subhankarg's picture
Upload folder using huggingface_hub
0558aa4 verified
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import numpy as np
import torch
from nemo.utils import AppState, logging
try:
from apex.transformer.log_util import set_logging_level
HAVE_APEX = True
except (ImportError, ModuleNotFoundError):
HAVE_APEX = False
try:
from megatron.core import tensor_parallel
from megatron.core.parallel_state import (
RankGenerator,
get_pipeline_model_parallel_rank,
set_expert_model_parallel_rank,
set_expert_model_parallel_world_size,
set_pipeline_model_parallel_rank,
set_pipeline_model_parallel_world_size,
set_tensor_model_parallel_rank,
set_tensor_model_parallel_world_size,
)
HAVE_MEGATRON_CORE = True
except (ImportError, ModuleNotFoundError):
HAVE_MEGATRON_CORE = False
try:
from megatron.core.num_microbatches_calculator import (
ConstantNumMicroBatchesCalculator,
get_current_global_batch_size,
get_micro_batch_size,
get_num_microbatches,
init_num_microbatches_calculator,
)
MCORE_MB_CALCULATOR = True
except (ImportError, ModuleNotFoundError):
logging.warning("Megatron num_microbatches_calculator not found, using Apex version.")
if HAVE_APEX:
from apex.transformer.microbatches import ConstantNumMicroBatches as ConstantNumMicroBatchesCalculator
from apex.transformer.pipeline_parallel.utils import (
get_current_global_batch_size,
get_micro_batch_size,
get_num_microbatches,
)
from apex.transformer.pipeline_parallel.utils import (
setup_microbatch_calculator as init_num_microbatches_calculator,
)
MCORE_MB_CALCULATOR = False
def initialize_model_parallel_for_nemo(
world_size,
global_rank,
local_rank,
tensor_model_parallel_size=1,
expert_model_parallel_size=1,
expert_tensor_parallel_size=None,
pipeline_model_parallel_size=1,
virtual_pipeline_model_parallel_size=None,
pipeline_model_parallel_split_rank=None,
pipeline_model_parallel_comm_backend=None,
context_parallel_size=1,
encoder_tensor_model_parallel_size=0,
encoder_pipeline_model_parallel_size=0,
micro_batch_size=None,
global_batch_size=None,
rampup_batch_size=None,
use_fp8=False,
init_mpi_proc_group=False,
seed=1234,
apex_transformer_log_level=30,
use_tp_pp_dp_mapping=False,
use_te_rng_tracker=False,
num_distributed_optimizer_instances=1,
nccl_communicator_config_path=None,
use_sharp=False,
use_gloo_process_groups: bool = True,
):
"""Initialize model parallel groups in NeMo."""
assert (
pipeline_model_parallel_split_rank is None or pipeline_model_parallel_split_rank == 0
), "pipeline_model_parallel_split_rank is deprecated."
assert encoder_pipeline_model_parallel_size == 0 and (
encoder_tensor_model_parallel_size == 0 or encoder_tensor_model_parallel_size == tensor_model_parallel_size
), (
"encoder_pipeline_model_parallel_size is temporarily "
"unavailable. We are working on a refactoring to add it back."
)
# updating NeMo globals
app_state = AppState()
app_state.global_rank = global_rank
app_state.world_size = world_size
app_state.local_rank = local_rank
app_state.use_tp_pp_dp_mapping = use_tp_pp_dp_mapping
app_state.expert_model_parallel_size = expert_model_parallel_size
app_state.tensor_model_parallel_size = tensor_model_parallel_size
app_state.pipeline_model_parallel_size = pipeline_model_parallel_size
app_state.virtual_pipeline_model_parallel_size = virtual_pipeline_model_parallel_size
app_state.context_parallel_size = context_parallel_size
app_state.encoder_tensor_model_parallel_size = encoder_tensor_model_parallel_size
app_state.encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size
app_state.pipeline_model_parallel_comm_backend = pipeline_model_parallel_comm_backend
app_state.use_fp8 = use_fp8
app_state.use_sharp = use_sharp
app_state.init_mpi_proc_group = init_mpi_proc_group
app_state.expert_tensor_parallel_size = expert_tensor_parallel_size
app_state.num_distributed_optimizer_instances = num_distributed_optimizer_instances
app_state.nccl_communicator_config_path = nccl_communicator_config_path
app_state.use_gloo_process_groups = use_gloo_process_groups
(
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.expert_tensor_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
app_state.virtual_pipeline_model_parallel_rank,
) = fake_initialize_model_parallel(
world_size=world_size,
rank=global_rank,
tensor_model_parallel_size_=tensor_model_parallel_size,
pipeline_model_parallel_size_=pipeline_model_parallel_size,
virtual_pipeline_model_parallel_size_=virtual_pipeline_model_parallel_size,
pipeline_model_parallel_split_rank_=pipeline_model_parallel_split_rank,
context_parallel_size_=context_parallel_size,
expert_model_parallel_size_=expert_model_parallel_size,
expert_tensor_parallel_size_=expert_tensor_parallel_size,
encoder_tensor_model_parallel_size_=encoder_tensor_model_parallel_size,
encoder_pipeline_model_parallel_size_=encoder_pipeline_model_parallel_size,
use_tp_pp_dp_mapping=use_tp_pp_dp_mapping,
)
# update apex.transformer globals
set_tensor_model_parallel_world_size(app_state.tensor_model_parallel_size)
set_tensor_model_parallel_rank(app_state.tensor_model_parallel_rank)
set_expert_model_parallel_world_size(app_state.expert_model_parallel_size)
set_expert_model_parallel_rank(app_state.expert_model_parallel_rank)
set_pipeline_model_parallel_world_size(
app_state.pipeline_model_parallel_size + app_state.encoder_pipeline_model_parallel_size
)
set_pipeline_model_parallel_rank(app_state.pipeline_model_parallel_rank)
tensor_parallel.random.initialize_rng_tracker(use_te_rng_tracker=use_te_rng_tracker)
if seed is not None:
# @chcui not setting seed is for model conversion. always set seed for training/inference.
_set_random_seed(seed)
if global_batch_size and micro_batch_size is not None:
# TODO: add rampup_batch_size here when we have it implemented
if MCORE_MB_CALCULATOR:
from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR
if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None:
init_num_microbatches_calculator(
rank=global_rank,
global_batch_size=global_batch_size,
micro_batch_size=micro_batch_size,
data_parallel_size=app_state.data_parallel_size,
rampup_batch_size=rampup_batch_size,
decrease_batch_size_if_needed=False,
)
else:
if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator):
assert get_current_global_batch_size() == global_batch_size
assert get_micro_batch_size() == micro_batch_size
assert get_num_microbatches() == global_batch_size // (
micro_batch_size * app_state.data_parallel_size
)
else:
raise Exception("Microbatch calculator already initialized.")
else:
from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR
if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None:
init_num_microbatches_calculator(
rank=global_rank,
global_batch_size=global_batch_size,
micro_batch_size=micro_batch_size,
data_parallel_size=app_state.data_parallel_size,
rampup_batch_size=rampup_batch_size,
decrease_batch_size_if_needed=False,
)
else:
if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator):
assert get_current_global_batch_size() == global_batch_size
assert get_micro_batch_size() == micro_batch_size
assert get_num_microbatches() == global_batch_size // (
micro_batch_size * app_state.data_parallel_size
)
else:
raise Exception("Microbatch calculator already initialized.")
app_state._is_megatron_initialized = True
if HAVE_APEX:
set_logging_level(apex_transformer_log_level)
def _set_random_seed(seed_):
"""Set random seed for reproducability."""
if seed_ is not None and seed_ > 0:
# Ensure that different pipeline MP stages get different seeds.
seed = seed_ + (100 * get_pipeline_model_parallel_rank())
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.device_count() > 0:
tensor_parallel.model_parallel_cuda_manual_seed(seed)
else:
raise ValueError('Seed ({}) should be a positive integer.'.format(seed_))
def set_jit_fusion_options():
"""Set PyTorch JIT layer fusion options."""
# set flags if we are using the 21.10 container
if torch.__version__ == "1.10.0a0+0aef44c":
# nvfuser
torch._C._jit_set_profiling_executor(True)
torch._C._jit_set_profiling_mode(True)
torch._C._jit_override_can_fuse_on_cpu(False)
torch._C._jit_override_can_fuse_on_gpu(False)
torch._C._jit_set_texpr_fuser_enabled(False)
torch._C._jit_set_nvfuser_enabled(True)
torch._C._debug_set_autodiff_subgraph_inlining(False)
def fake_initialize_model_parallel(
world_size,
rank,
tensor_model_parallel_size_,
pipeline_model_parallel_size_,
pipeline_model_parallel_split_rank_=None,
virtual_pipeline_model_parallel_size_=None,
expert_model_parallel_size_=1,
expert_tensor_parallel_size_=None,
context_parallel_size_=1,
encoder_tensor_model_parallel_size_=0,
encoder_pipeline_model_parallel_size_=0,
use_tp_pp_dp_mapping=False,
):
"""
Fake initialize model data parallel groups so that we can instantiate model parallel
models before DDP is initialized. This is needed because PTL execution flow is init
model, init trainer -> call trainer.fit(model). DDP is initialized during .fit.
This function is taken from megatron.core.parallel_state and modified so that the
distributed groups are not created.
We only need the tensor parallel and pipeline parallel ranks to instantiate the model.
Arguments:
tensor_model_parallel_size: number of GPUs used to parallelize model tensor.
pipeline_model_parallel_size: number of GPUs used to parallelize model pipeline.
context_parallel_size: number of GPUs used to parallelize tokens of each input.
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
8 data_parallel groups:
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
8 tensor model-parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
4 pipeline model-parallel groups:
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
"""
assert pipeline_model_parallel_split_rank_ is None, "pipeline_model_parallel_split_rank is deprecated."
assert encoder_pipeline_model_parallel_size_ == 0 and (
encoder_tensor_model_parallel_size_ == 0 or encoder_tensor_model_parallel_size_ == tensor_model_parallel_size_
), (
"encoder_pipeline_model_parallel_size is temporarily "
"unavailable. We are working on a refactoring to add it back."
)
# Get world size and rank. Ensure some consistencies.
tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
model_parallel_size = tensor_model_parallel_size * pipeline_model_parallel_size
context_parallel_size = min(context_parallel_size_, world_size)
if encoder_pipeline_model_parallel_size_ is None:
encoder_pipeline_model_parallel_size = 0
else:
encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size_
if encoder_tensor_model_parallel_size_ == 0 and encoder_pipeline_model_parallel_size_ > 0:
encoder_tensor_model_parallel_size = tensor_model_parallel_size
else:
encoder_tensor_model_parallel_size = encoder_tensor_model_parallel_size_
if encoder_tensor_model_parallel_size > 0:
assert encoder_pipeline_model_parallel_size > 0
assert (
encoder_tensor_model_parallel_size <= tensor_model_parallel_size
), "We do not support encoders with more TP than the decoder."
encoder_model_size = (
encoder_tensor_model_parallel_size * encoder_pipeline_model_parallel_size * context_parallel_size
)
decoder_model_size = tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size
total_model_size = encoder_model_size + decoder_model_size
assert world_size % total_model_size == 0, (
f'world_size: {world_size} must be divisible by total world_size: '
f'(decoder_)tensor_model_parallel_size {tensor_model_parallel_size} '
f'* (decoder_)pipeline_model_parallel_size {pipeline_model_parallel_size} '
f'* (decoder_)context_parallel_size {context_parallel_size} + '
f'encoder_tensor_model_parallel_size {encoder_tensor_model_parallel_size} '
f'* encoder_pipeline_model_parallel_size {encoder_pipeline_model_parallel_size} '
f'* context_parallel_size {context_parallel_size}'
)
data_parallel_size = world_size // total_model_size
encoder_world_size = encoder_model_size * data_parallel_size
decoder_world_size = decoder_model_size * data_parallel_size
assert encoder_world_size + decoder_world_size == world_size
virtual_pipeline_model_parallel_rank = None
if virtual_pipeline_model_parallel_size_ is not None:
virtual_pipeline_model_parallel_rank = 0
if encoder_world_size > 0:
encoder_rank_generator = RankGenerator(
tp=encoder_tensor_model_parallel_size,
ep=1,
dp=data_parallel_size,
pp=encoder_pipeline_model_parallel_size,
cp=context_parallel_size,
order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=0,
)
else:
encoder_rank_generator = None
decoder_rank_generator = RankGenerator(
tp=tensor_model_parallel_size,
ep=1,
dp=data_parallel_size,
pp=pipeline_model_parallel_size,
cp=context_parallel_size,
order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)
# Build expert rank generator
if expert_tensor_parallel_size_ is None:
expert_tensor_parallel_size_ = tensor_model_parallel_size
expert_tensor_model_pipeline_parallel_size = (
expert_tensor_parallel_size_ * expert_model_parallel_size_ * pipeline_model_parallel_size
)
expert_data_parallel_size = decoder_world_size // expert_tensor_model_pipeline_parallel_size
if decoder_world_size % expert_tensor_model_pipeline_parallel_size != 0:
raise RuntimeError(
f"decoder world_size ({decoder_world_size}) is not divisible by "
f"expert_tensor_model_pipeline_parallel size ({expert_tensor_model_pipeline_parallel_size})"
)
expert_decoder_rank_generator = RankGenerator(
tp=expert_tensor_parallel_size_,
ep=expert_model_parallel_size_,
dp=expert_data_parallel_size,
pp=pipeline_model_parallel_size,
cp=1,
order='tp-cp-ep-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)
assert (
not use_tp_pp_dp_mapping
or pipeline_model_parallel_size == 1
or expert_data_parallel_size == data_parallel_size
), "When not using pp-last rank ordering, the data parallel size of the attention and moe layers must be the same"
assert decoder_rank_generator.get_ranks("pp") == expert_decoder_rank_generator.get_ranks(
"pp"
), f"Pipeline parallel groups are expected to be the same for Non-Expert and Expert part, \
but got {decoder_rank_generator.get_ranks('pp')} and {expert_decoder_rank_generator.get_ranks('pp')}"
def generator_wrapper(group_type, is_expert=False, **kwargs):
from itertools import cycle
"""The `RankGenerator` class produces a hyper-rectangle for a given set of
tensor, pipeline, data, expert, and context parallelism. If we have an encoder,
in addition to the default decoder, we essentially instantiate two `RankGenerator`
classes to construct the parallelism for each module separately, and we then have
to stitch them together for the right groups. For now, this means pp and tp-pp."""
if is_expert:
d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs)
else:
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)
if encoder_rank_generator is None:
for x in d_ranks:
yield x
return
e_ranks = encoder_rank_generator.get_ranks(group_type, **kwargs)
if group_type == 'pp':
# Map 1 encoder tp rank to several decoder tp ranks, because
# these won't be the same size.
for x, y in zip(cycle(e_ranks), d_ranks):
yield x + y
elif group_type == 'tp-pp':
# For this group, we can just return the concatenated
# groups together, because their sizes are the same.
assert len(e_ranks) == len(d_ranks)
for x, y in zip(e_ranks, d_ranks):
yield x + y
else:
for x in e_ranks:
yield x
for x in d_ranks:
yield x
# Build the data-parallel groups.
all_data_parallel_group_ranks_with_cp = []
for ranks in generator_wrapper('dp'):
if rank in ranks:
data_parallel_group = list(ranks)
logging.info(f'Rank {rank} has data parallel group : {data_parallel_group}')
for ranks_with_cp in generator_wrapper('dp-cp'):
all_data_parallel_group_ranks_with_cp.append(ranks_with_cp)
if rank in ranks_with_cp:
data_parallel_group_with_cp = ranks_with_cp
logging.info(
f'Rank {rank} has combined group of data parallel and context parallel : {data_parallel_group_with_cp}'
)
data_parallel_rank = data_parallel_group.index(rank)
logging.info(
f'All data parallel group ranks with context parallel combined: {all_data_parallel_group_ranks_with_cp}'
)
logging.info(f'Ranks {rank} has data parallel rank: {data_parallel_rank}')
# Build the context-parallel groups.
all_context_parallel_group_ranks = []
for ranks in generator_wrapper('cp'):
all_context_parallel_group_ranks.append(ranks)
if rank in ranks:
context_parallel_group = ranks
logging.info(f'Rank {rank} has context parallel group: {context_parallel_group}')
context_parallel_rank = context_parallel_group.index(rank)
logging.info(f'All context parallel group ranks: {all_context_parallel_group_ranks}')
logging.info(f'Ranks {rank} has context parallel rank: {context_parallel_rank}')
# Build the model-parallel groups.
all_model_parallel_group_ranks = []
for ranks in generator_wrapper('tp-pp'):
all_model_parallel_group_ranks.append(ranks)
if rank in ranks:
logging.info(f'Rank {rank} has model parallel group: {list(ranks)}')
logging.info(f'All model parallel group ranks: {all_model_parallel_group_ranks}')
# Build the tensor model-parallel groups.
all_tensor_model_parallel_group_ranks = []
tensor_model_parallel_group = None
for ranks in generator_wrapper('tp'):
all_tensor_model_parallel_group_ranks.append(ranks)
if rank in ranks:
tensor_model_parallel_group = ranks
logging.info(f'Rank {rank} has tensor model parallel group: {tensor_model_parallel_group}')
tensor_model_parallel_rank = tensor_model_parallel_group.index(rank)
logging.info(f'All tensor model parallel group ranks: {all_tensor_model_parallel_group_ranks}')
logging.info(f'Rank {rank} has tensor model parallel rank: {tensor_model_parallel_rank}')
# EP rank
expert_model_parallel_rank = 0
if expert_model_parallel_size_ is not None and expert_model_parallel_size_ > 1:
all_expert_model_parallel_ranks = []
for ranks in generator_wrapper('ep', is_expert=True):
all_expert_model_parallel_ranks.append(ranks)
if rank in ranks:
expert_model_parallel_rank = list(ranks).index(rank)
logging.info(f'All expert model parallel group ranks: {all_expert_model_parallel_ranks}')
logging.info(f'Rank {rank} has expert model parallel rank: {expert_model_parallel_rank}')
# ETP
expert_tensor_parallel_rank = 0
if expert_tensor_parallel_size_ is not None and expert_tensor_parallel_size_ > 1:
all_expert_tensor_parallel_ranks = []
for ranks in generator_wrapper('tp', is_expert=True):
all_expert_tensor_parallel_ranks.append(ranks)
if rank in ranks:
expert_tensor_parallel_rank = list(ranks).index(rank)
logging.info(f'All expert tensor parallel group ranks: {all_expert_tensor_parallel_ranks}')
logging.info(f'Rank {rank} has expert tensor parallel rank: {expert_tensor_parallel_rank}')
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
all_pipeline_model_parallel_group_ranks = []
all_embedding_group_ranks = []
pipeline_model_parallel_group = None
embedding_group = None
embedding_rank = None
for ranks in generator_wrapper('pp'):
all_pipeline_model_parallel_group_ranks.append(ranks)
if rank in ranks:
pipeline_model_parallel_group = ranks
logging.info(f'Rank {rank} has pipeline model parallel group: {pipeline_model_parallel_group}')
# Setup embedding group (to exchange gradients between
# first and last stages).
if len(ranks) > 1:
embedding_ranks = [ranks[0], ranks[-1]]
all_embedding_group_ranks.append(embedding_ranks)
else:
embedding_ranks = ranks
all_embedding_group_ranks.append(list(embedding_ranks))
if rank in embedding_ranks:
embedding_group = list(embedding_ranks)
logging.info(f'Rank {rank} has embedding group: {embedding_group}')
pipeline_model_parallel_rank = pipeline_model_parallel_group.index(rank)
if embedding_group is not None:
embedding_rank = embedding_group.index(rank)
logging.info(f'All pipeline model parallel group ranks: {all_pipeline_model_parallel_group_ranks}')
logging.info(f'Rank {rank} has pipeline model parallel rank {pipeline_model_parallel_rank}')
logging.info(f'All embedding group ranks: {all_pipeline_model_parallel_group_ranks}')
logging.info(f'Rank {rank} has embedding rank: {embedding_rank}')
return (
tensor_model_parallel_rank,
pipeline_model_parallel_rank,
expert_model_parallel_rank,
expert_tensor_parallel_rank,
model_parallel_size,
data_parallel_size,
pipeline_model_parallel_split_rank_,
virtual_pipeline_model_parallel_rank,
)