Spaces:
Runtime error
Runtime error
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from os.path import basename, splitext | |
| from typing import List, Optional | |
| import nemo_run as run | |
| from nemo.collections.llm.recipes.deepseek_v3 import pretrain_recipe | |
| from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer | |
| from nemo.lightning.pytorch.callbacks.megatron_enable_experimental_callback import MegatronEnableExperimentalCallback | |
| from nemo.lightning.pytorch.callbacks.moe_token_drop import MegatronTokenDropCallback | |
| from nemo.lightning.run.plugins import MemoryProfilePlugin, NsysPlugin | |
| from ..argument_parser import parse_additional_slurm_params, parse_cli_args | |
| from ..executors import slurm_executor | |
| from ..helpers import ( | |
| args_sanity_check, | |
| build_perf_env_plugin, | |
| get_user_configs, | |
| set_exp_logging_configs, | |
| set_primary_perf_configs, | |
| ) | |
| from ..utils import dump_config_diff_from_base_recipe, hf_tokenizer | |
| HF_MODEL_URI = "deepseek-ai/DeepSeek-V3-Base" | |
| USE_TOKEN_DROP = True # Use token drop callback | |
| def override_recipe_configs( | |
| args: str, | |
| num_nodes: int, | |
| mbs: int, | |
| gbs: int, | |
| tp_size: int, | |
| pp_size: int, | |
| cp_size: int, | |
| vp_size: int, | |
| ep_size: int, | |
| etp_size: int, | |
| enable_cuda_graphs: bool, | |
| use_mcore_fsdp: bool, | |
| recompute_layers: int, | |
| activation_offload_layers: int, | |
| recompute_modules: Optional[List[str]] = None, | |
| use_user_buffer_registration: Optional[bool] = None, | |
| use_sharp: Optional[bool] = None, | |
| ): | |
| """ | |
| DeepSeek V3 pre-train recipe aimed at achieving best possible performance. | |
| """ | |
| recipe = pretrain_recipe(performance_mode=True) | |
| # reset recompute args in the default recipe | |
| if args.recompute_modules is None: | |
| recipe.model.config.recompute_granularity = None | |
| recipe.model.config.recompute_method = None | |
| recipe.model.config.recompute_num_layers = None | |
| recipe.model.config.recompute_modules = None | |
| if not hasattr(recipe.trainer, "callbacks") or recipe.trainer.callbacks is None: | |
| recipe.trainer.callbacks = [] | |
| # Token dispatcher configs. For H100 we use deepEP and for Blackwell, | |
| # because deepEP is not supported yet, we use all-to-all dispatcher with | |
| # token drop. After deepEP is supported, we can use deepEP dispatcher. | |
| if args.gpu.lower() in ['h100']: | |
| recipe.model.config.moe_token_dispatcher_type = "flex" | |
| recipe.model.config.moe_enable_deepep = True | |
| recipe.model.config.moe_shared_expert_overlap = False # not supported for deepEP | |
| # use force load balance for reducing variance in benchmarking | |
| recipe.model.config.moe_router_force_load_balancing = True | |
| else: | |
| recipe.model.config.moe_token_dispatcher_type = "alltoall" | |
| recipe.model.config.moe_enable_deepep = False | |
| recipe.model.config.moe_shared_expert_overlap = True | |
| if USE_TOKEN_DROP: | |
| recipe.trainer.callbacks.append(run.Config(MegatronTokenDropCallback)) | |
| # Performance optimization knobs | |
| recipe.model.config.moe_permute_fusion = True | |
| recipe.model.config.apply_rope_fusion = True | |
| recipe.trainer.callbacks.append(run.Config(MegatronEnableExperimentalCallback)) | |
| # Pipeline parallelism configs. We infer PP layout from the provided PP and VP size | |
| map_pp_vp_to_layout = { | |
| (1, 1): None, | |
| (4, 1): [['embedding'] + ['decoder'] * 16, ['decoder'] * 16, ['decoder'] * 16, ['decoder'] * 13 + ['loss']], | |
| (8, 1): [['embedding'] + ['decoder'] * 8] + [['decoder'] * 8] * 6 + [['decoder'] * 5 + ['loss']], | |
| (4, 2): [['embedding'] + ['decoder'] * 8] + [['decoder'] * 8] * 6 + [['decoder'] * 5 + ['loss']], | |
| (16, 1): [['embedding'] + ['decoder'] * 4] + [['decoder'] * 4] * 14 + [['decoder', 'loss']], | |
| (8, 2): [['embedding'] + ['decoder'] * 4] + [['decoder'] * 4] * 14 + [['decoder', 'loss']], | |
| (4, 4): [['embedding'] + ['decoder'] * 4] + [['decoder'] * 4] * 14 + [['decoder', 'loss']], | |
| } | |
| pp_size = pp_size or 1 | |
| vp_size = vp_size or 1 | |
| if (pp_size, vp_size) not in map_pp_vp_to_layout: | |
| raise ValueError( | |
| f"Invalid PP and VP size: {pp_size} and {vp_size} to infer PP layout " | |
| f"for DeepSeek V3. Known PP and VP combinations: {map_pp_vp_to_layout.keys()}" | |
| ) | |
| layout = map_pp_vp_to_layout[(pp_size, vp_size)] | |
| if layout is not None: | |
| layout = list([list(x) for x in layout]) # yield all the elements | |
| recipe.trainer.strategy.pipeline_model_parallel_layout = layout | |
| # The following knobs are not needed if we specify layout | |
| recipe.trainer.strategy.account_for_embedding_in_pipeline_split = False | |
| recipe.trainer.strategy.account_for_loss_in_pipeline_split = False | |
| recipe.trainer.strategy.num_layers_in_first_pipeline_stage = None | |
| recipe.trainer.strategy.num_layers_in_last_pipeline_stage = None | |
| recipe = set_primary_perf_configs( | |
| recipe, | |
| "pre_train", | |
| num_nodes, | |
| args.gpus_per_node, | |
| mbs, | |
| gbs, | |
| args.max_steps, | |
| tp_size, | |
| pp_size, | |
| cp_size, | |
| vp_size, | |
| ep_size, | |
| etp_size, | |
| enable_cuda_graphs=enable_cuda_graphs, | |
| use_mcore_fsdp=use_mcore_fsdp, | |
| use_fsdp_double_buffer=args.use_fsdp_double_buffer, | |
| use_user_buffer_registration=use_user_buffer_registration, | |
| use_sharp=use_sharp, | |
| recompute_layers=recompute_layers, | |
| activation_offload_layers=activation_offload_layers, | |
| compute_dtype=args.compute_dtype, | |
| fp8_recipe=args.fp8_recipe, | |
| recompute_modules=recompute_modules, | |
| use_te_act_func=args.use_te_act_func, | |
| act_func_fp8_input_store=args.act_func_fp8_input_store, | |
| ) | |
| recipe = set_exp_logging_configs( | |
| recipe, | |
| "pre_train", | |
| "llm", | |
| "deepseekv3", | |
| args.tensorboard, | |
| args.wandb, | |
| args.wandb_prj_name, | |
| args.wandb_job_name, | |
| ) | |
| # data module configs | |
| if args.use_hf_tokenizer: | |
| recipe.data.tokenizer = hf_tokenizer(HF_MODEL_URI) | |
| else: | |
| recipe.data.tokenizer = run.Config( | |
| get_nmt_tokenizer, library="null", model_name="NullTokenizer", vocab_size=129280 | |
| ) | |
| recipe.model.tokenizer = recipe.data.tokenizer | |
| return recipe | |
| if __name__ == "__main__": | |
| args = parse_cli_args().parse_args() | |
| args_sanity_check(args) | |
| # Parse additional SLURM parameters if provided | |
| additional_slurm_params = None | |
| if hasattr(args, 'additional_slurm_params') and args.additional_slurm_params: | |
| additional_slurm_params = parse_additional_slurm_params(args.additional_slurm_params) | |
| kwargs = get_user_configs(args.gpu.lower(), "pre_train", "deepseek", "v3", args) | |
| ( | |
| num_nodes, | |
| mbs, | |
| gbs, | |
| tp_size, | |
| pp_size, | |
| cp_size, | |
| vp_size, | |
| ep_size, | |
| etp_size, | |
| enable_cuda_graphs, | |
| use_mcore_fsdp, | |
| recompute_layers, | |
| activation_offload_layers, | |
| recompute_modules, | |
| _, # keep_fsdp_fp8_transpose_cache | |
| use_user_buffer_registration, | |
| use_sharp, | |
| ) = kwargs[:17] | |
| recipe = override_recipe_configs( | |
| args, | |
| num_nodes, | |
| mbs, | |
| gbs, | |
| tp_size, | |
| pp_size, | |
| cp_size, | |
| vp_size, | |
| ep_size, | |
| etp_size, | |
| enable_cuda_graphs, | |
| use_mcore_fsdp, | |
| recompute_layers, | |
| activation_offload_layers, | |
| recompute_modules, | |
| use_user_buffer_registration, | |
| use_sharp, | |
| ) | |
| exp_config = f"{num_nodes}nodes_tp{tp_size}_pp{pp_size}_cp{cp_size}_vp{vp_size}_ep{ep_size}_{mbs}mbs_{gbs}gbs" | |
| exp_name = f"{splitext(basename(__file__))[0]}_{args.compute_dtype}_{exp_config}" | |
| executor = slurm_executor( | |
| args.gpu.lower(), | |
| args.account, | |
| args.partition, | |
| args.log_dir, | |
| num_nodes, | |
| args.gpus_per_node, | |
| args.time_limit, | |
| args.container_image, | |
| custom_mounts=args.custom_mounts, | |
| custom_env_vars={}, | |
| hf_token=args.hf_token, | |
| nemo_home=args.nemo_home, | |
| wandb_key=args.wandb_key, | |
| network='sharp' if use_sharp else None, | |
| additional_slurm_params=additional_slurm_params, | |
| ) | |
| plugins = [build_perf_env_plugin(args, pp_size=pp_size)] | |
| if args.enable_nsys: | |
| plugins.append(NsysPlugin(start_step=5, end_step=6)) | |
| if args.enable_memory_profile: | |
| assert args.memory_profile_out_path is not None | |
| plugins.append(MemoryProfilePlugin(dir=args.memory_profile_out_path)) | |
| with run.Experiment(exp_name) as exp: | |
| exp.add( | |
| recipe, | |
| executor=executor, | |
| name=exp_name, | |
| plugins=plugins, | |
| ) | |
| if not args.dryrun: | |
| exp.run(sequential=True, detach=args.detach) | |
| else: | |
| exp.dryrun() | |
| if args.dump_config_diff_from_base_recipe: | |
| output_dir = exp.jobs[0].executor.job_dir | |
| # dump difference from base recipe | |
| base_recipe = pretrain_recipe(performance_mode=False) | |
| file_name = f"diff_from_base_recipe_{args.compute_dtype}.diff" | |
| dump_config_diff_from_base_recipe(base_recipe, recipe, output_dir, file_name=file_name) | |
| # dump difference from default perf recipe | |
| default_perf_recipe = pretrain_recipe(performance_mode=True) | |
| file_name = f"diff_from_default_perf_recipe_{args.compute_dtype}.diff" | |
| dump_config_diff_from_base_recipe(default_perf_recipe, recipe, output_dir, file_name=file_name) | |