# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Example: python scripts/vlm/neva_generate.py --load_from_hf python scripts/vlm/neva_generate.py --local_model_path --enable_quantization """ import argparse import requests import torch from megatron.core.inference.common_inference_params import CommonInferenceParams from PIL import Image from transformers import AutoProcessor import nemo.lightning as nl from nemo.collections.vlm import Llava15Config7B, LlavaModel from nemo.collections.vlm.inference import generate as vlm_generate from nemo.collections.vlm.inference import setup_inference_wrapper from nemo.utils import logging try: import modelopt.torch.quantization as mtq from megatron.core.post_training.modelopt.gpt.model_specs import get_gpt_modelopt_spec HAVE_MODELOPT = True except (ImportError, ModuleNotFoundError): HAVE_MODELOPT = False def load_image(image_url: str) -> Image.Image: # pylint: disable=C0115,C0116 try: response = requests.get(image_url, stream=True) response.raise_for_status() image = Image.open(response.raw) return image except requests.exceptions.RequestException as e: print(f"Error loading image from {image_url}: {e}") return None def generate(model, processor, images, text, params): # pylint: disable=C0115,C0116 conversation = [ { "role": "user", "content": [ {"type": "text", "text": text}, {"type": "image"}, ], }, ] input_text = processor.apply_chat_template(conversation, add_generation_prompt=True) class NevaTokenizer: # pylint: disable=C0115,C0116 def __init__(self, tokenizer): self._tokenizer = tokenizer self.vocab_size = tokenizer.vocab_size self.eos_token_id = tokenizer.eos_token_id def decode(self, tokens, **kwargs): modified_tokens = [] for x in tokens: if x == -200: modified_tokens.append(0) elif x != 1: modified_tokens.append(x) return self._tokenizer.decode(modified_tokens, skip_special_tokens=False) def encode(self, prompt, **kwargs): prompts_tokens = self._tokenizer.encode(prompt, add_special_tokens=True) return [-200 if x == 32000 else x for x in prompts_tokens] model = setup_inference_wrapper(model, processor.tokenizer) prompts = [input_text] images = [images] result = vlm_generate( model, NevaTokenizer(processor.tokenizer), processor.image_processor, prompts, images, inference_params=params, ) generated_texts = list(result)[0].generated_text if torch.distributed.get_rank() == 0: print("======== GENERATED TEXT OUTPUT ========") print(f"{generated_texts}") print("=======================================") return generated_texts def legacy_generate(model, processor, raw_image, text, num_tokens_to_generate): # pylint: disable=C0115,C0116 conversation = [ { "role": "user", "content": [ {"type": "text", "text": text}, {"type": "image"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) hf_tokenizer = processor.tokenizer inputs = processor(prompt, raw_image, return_tensors='pt').to(0, torch.float16) input_ids = hf_tokenizer(prompt, return_tensors='pt')['input_ids'].cuda() input_ids[input_ids == 32000] = -200 images = inputs['pixel_values'].cuda() images = images.reshape(images.size(0), 3, 336, 336) position_ids = ( torch.arange(input_ids.size(1), dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids) ) model = model.module.cuda() model.eval() generated_ids = input_ids.clone() # Greedy generation loop for _ in range(num_tokens_to_generate): with torch.no_grad(): output = model( images=images, input_ids=input_ids, position_ids=position_ids, attention_mask=None, ) next_token_ids = torch.argmax(output[:, -1], dim=-1, keepdim=True) generated_ids = torch.cat([generated_ids, next_token_ids], dim=-1) input_ids = generated_ids position_ids = ( torch.arange(input_ids.size(1), dtype=torch.long, device=input_ids.device) .unsqueeze(0) .expand_as(input_ids) ) # If the generated token is the end of sequence token, stop generating if next_token_ids.item() == hf_tokenizer.eos_token_id: break generated_ids[generated_ids == -200] = 0 generated_texts = hf_tokenizer.batch_decode(generated_ids, skip_special_tokens=False) logging.info("======== GENERATED TEXT OUTPUT ========") logging.info(f"{generated_texts}") logging.info("=======================================") def main(args) -> None: # pylint: disable=C0115,C0116 strategy = nl.MegatronStrategy( tensor_model_parallel_size=1, ckpt_include_optimizer=False, ) trainer = nl.Trainer( devices=1, max_steps=1000, accelerator="gpu", strategy=strategy, plugins=nl.MegatronMixedPrecision(precision="bf16-mixed"), val_check_interval=1000, limit_val_batches=50, ) processor = AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf") hf_tokenizer = processor.tokenizer # Load the image raw_image = load_image(args.image_url) if raw_image is None: return # Exit if the image can't be loaded fabric = trainer.to_fabric() # Decide whether to import or load the model based on the input arguments if args.load_from_hf: model = fabric.import_model("hf://llava-hf/llava-1.5-7b-hf", LlavaModel) else: config = Llava15Config7B() if args.enable_quantization: new_transformer_layer_spec = get_gpt_modelopt_spec( config.language_transformer_config, local_core_attention=False, remap_te_layernorm=True ) config.language_transformer_config.transformer_layer_spec = new_transformer_layer_spec model = LlavaModel(config, tokenizer=hf_tokenizer) model = fabric.load_model(args.local_model_path, model) params = CommonInferenceParams( temperature=args.temperature, top_p=args.top_p, top_k=args.top_k, num_tokens_to_generate=args.num_tokens_to_generate, ) if args.legacy_generate: legacy_generate(model, processor, raw_image, args.prompt, args.num_tokens_to_generate) else: generate(model, processor, images=raw_image, text=args.prompt, params=params) if args.enable_quantization: base_img_url = "http://images.cocodataset.org/val2017/" images = [ "000000039769.jpg", "000000002685.jpg", "000000004495.jpg", "000000005001.jpg", "000000003845.jpg", "000000011615.jpg", "000000010977.jpg", "000000010764.jpg", "000000010707.jpg", "000000010583.jpg", "000000010363.jpg", "000000010092.jpg", "000000009914.jpg", "000000009891.jpg", "000000009769.jpg", "000000009590.jpg", "000000009483.jpg", "000000009448.jpg", "000000009378.jpg", "000000008899.jpg", ] quantization_images_url = [base_img_url + img_id for img_id in images] def forward_loop(): for img_url in quantization_images_url: raw_image = load_image(img_url) response = generate( model, processor, images=raw_image, text="can you describe this image?", params=params ) print(img_url, "->", response) # Please see https://nvidia.github.io/TensorRT-Model-Optimizer/guides/_choosing_quant_methods.html # for the selection of quantization algorithms if args.quant_alg == "int8_sq": mtq_config = mtq.INT8_SMOOTHQUANT_CFG elif args.quant_alg == "fp8": mtq_config = mtq.FP8_DEFAULT_CFG elif args.quant_alg == "awq": mtq_config = mtq.INT4_AWQ_CFG else: raise ValueError(f"Unsupported quantization algorithm: {args.quantization.algorithm}") logging.info("-------- Start Quantization --------") mtq.quantize(model, mtq_config, forward_loop) logging.info("-------- End Quantization --------") if __name__ == "__main__": parser = argparse.ArgumentParser(description="LLaVA Multimodal Inference") parser.add_argument( "--load_from_hf", action="store_true", help="Flag to indicate whether to load the model from Hugging Face hub.", ) parser.add_argument( "--local_model_path", type=str, default=None, help="Local path to the model if not loading from Hugging Face.", ) parser.add_argument( "--image_url", type=str, default="http://images.cocodataset.org/val2017/000000039769.jpg", help="URL of the image to use for inference.", ) parser.add_argument( "--prompt", type=str, default="What are these?", help="Input prompt", ) parser.add_argument( "--temperature", type=float, default=1.0, help="""Temperature to be used in megatron.core.inference.common_inference_params.CommonInferenceParams""", ) parser.add_argument( "--top_p", type=float, default=0.0, help="""top_p to be used in megatron.core.inference.common_inference_params.CommonInferenceParams""", ) parser.add_argument( "--top_k", type=int, default=1, help="""top_k to be used in megatron.core.inference.common_inference_params.CommonInferenceParams""", ) parser.add_argument( "--num_tokens_to_generate", type=int, default=20, help="""Number of tokens to generate per prompt""", ) parser.add_argument( "--legacy_generate", action="store_true", help="Flag to indicate whether to use legacy generation function.", ) parser.add_argument( "--enable_quantization", action="store_true", help="Flag to indicate whether to enable quantization.", ) parser.add_argument( "--quant_alg", type=str, default="fp8", help="Input prompt", ) args = parser.parse_args() main(args)