这是2024年的第40篇文章
( 本文阅读时间:15分钟 )
01
前言
随着LLM模型越来越大,单GPU已经无法加载一个模型。以Qwen-14B-Chat模型为例,模型权重大概28GB,但是单个NVIDIA A10仅有24GB显存。如果想要在A10上部署Qwen-14B-Chat模型,我们需要将模型切分后部署到2个A10机器上,每个A10卡加载一半的模型,这种方式称之为分布式推理。
社区涌现了很多支持分布式推理的框架如vllm、deepspeed-mii,rtp-llm等。本文选取了vllm框架,从源码角度分析vllm + Ray 如何实现LLM模型的分布式推理。
02
2.1 模型准备
下载Qwen-14B-Chat到OSS中,并在集群中创建对应的pv,pvc。pvc名称为llm-model。
`kubectl apply -f- << EOF``apiVersion: v1``kind: Secret``metadata:` `name: oss-secret``stringData:` `akId: ${your-accesskey-id} # 用于访问oss的AK` `akSecret: ${your-accesskey-secert} # 用于访问oss的SK``---``apiVersion: v1``kind: PersistentVolume``metadata:` `name: llm-model` `labels:` `alicloud-pvname: llm-model``spec:` `capacity:` `storage: 30Gi`` accessModes:` `- ReadOnlyMany` `persistentVolumeReclaimPolicy: Retain` `csi:` `driver: ossplugin.csi.alibabacloud.com` `volumeHandle: model-oss` `nodePublishSecretRef:` `name: oss-secret` `namespace: default` `volumeAttributes:` `bucket: ${your-bucket-name}` `url: ${your-bucket-endpoint} # e.g. oss-cn-hangzhou.aliyuncs.com` `otherOpts: "-o umask=022 -o max_stat_cache_size=0 -o allow_other"` `path: "/"``---``apiVersion: v1``kind: PersistentVolumeClaim``metadata:` `name: llm-model``spec:` `accessModes:` `- ReadOnlyMany` `resources:` `requests:` `storage: 30Gi` `selector:` `matchLabels:` `alicloud-pvname: llm-model``EOF`
2.2 部署分布式vllm应用
1. 执行以下命令,部署vllm应用
`kubectl apply -f- <<EOF``apiVersion: apps/v1` `kind: Deployment``metadata:` `name: vllm` `labels:` `app: vllm``spec:` `replicas: 2` `selector:` `matchLabels:` `app: vllm` `template:` `metadata:` `labels:` `app: vllm` `spec:` `affinity:` `podAntiAffinity:` `requiredDuringSchedulingIgnoredDuringExecution:` `- labelSelector:` `matchExpressions:` `- key: app` `operator: In` `values:` `- vllm` `topologyKey: kubernetes.io/hostname` `volumes:` `- name: model` `persistentVolumeClaim:` `claimName: llm-model` `containers:` `- name: vllm` `image: kube-ai-registry.cn-shanghai.cr.aliyuncs.com/kube-ai/vllm:0.4.1` `command:` `- "sh"` `- "-c"` `- "sleep 7d"` `ports:` `- containerPort: 8080` `readinessProbe:` `tcpSocket:` `port: 8080` `initialDelaySeconds: 30` `periodSeconds: 30` `resources:` `limits:` `nvidia.com/gpu: "1"` `requests:` `cpu: 4` `memory: 8Gi` `nvidia.com/gpu: "1"` `volumeMounts:` `- mountPath: /mnt/models` `name: model``EOF`
2. 执行以下命令,启动vllm应用
启动ray
在Pod1上运行
`ray start --head``# 启动后,日志中会显示ray-head-address地址`
`# ray-head-address 设置为pod1日志中显示的ray-head-address地址``ray start --address=<ray-head-address>`
`python3 model_init.py`` ``from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig`` ``config = AutoConfig.from_pretrained(` `"/mnt/models/Qwen-14B-Chat",` `trust_remote_code=True)``tokenizer = AutoTokenizer.from_pretrained("/mnt/models/Qwen-14B-Chat", trust_remote_code=True)`
`python3 -m vllm.entrypoints.openai.api_server \``--port 8080 \``--trust-remote-code \``--served-model-name qwen \``--model /mnt/models/Qwen-14B-Chat \``--gpu-memory-utilization 0.95 \``--tensor-parallel-size 2`
`kubectl -n <your-namespace> exec -it <pod1-name> bash`` ``curl -H "Content-Type: application/json" \` `http://localhost:8080/v1/chat/completions -X POST \` `-d '{"model": "qwen", "messages": [{"role": "user", "content": "你好"}], "max_tokens": 512, "temperature": 0.7, "top_p": 0.9, "seed": 10, "stop":["<|endoftext|>", "<|im_end|>", "<|im_start|>"]}'`
03
1.入口函数:vllm/entrypoints/openai/api_server.py main
`if __name__ == "__main__":` `# 构建engine args` `engine_args = AsyncEngineArgs.from_cli_args(args)` `# 构建engine` `engine = AsyncLLMEngine.from_engine_args(` `engine_args, usage_context=UsageContext.OPENAI_API_SERVER)`` ` `openai_serving_chat = OpenAIServingChat(engine, served_model_names,` `args.response_role,` `args.lora_modules,` `args.chat_template)`` ` `openai_serving_completion = OpenAIServingCompletion(` `engine, served_model_names, args.lora_modules)`` ` `app.root_path = args.root_path` `uvicorn.run(app)`
2.构建LLM engine
`engine = AsyncLLMEngine.from_engine_args(` `engine_args, usage_context=UsageContext.OPENAI_API_SERVER)`` ``def from_engine_args():` `"""Creates an async LLM engine from the engine arguments."""` `# Create the engine configs.` `engine_config = engine_args.create_engine_config()`` ` `# ray 集群初始化` `initialize_ray_cluster(engine_config.parallel_config)` `from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync` `executor_class = RayGPUExecutorAsync`` ` `# Create the engine configs.` `engine_config = engine_args.create_engine_config()`` ` `# ray 集群初始化` `# 1. ray.init()` `# 2. 根据集群内gpu数量 & tp并发度设置ray placement策略` `initialize_ray_cluster(engine_config.parallel_config)` `from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync` `executor_class = RayGPUExecutorAsync`` ` `# Create the async LLM engine.` `engine = cls(...) #创建一个AsyncLLMEngine实例` `# AsyncLLMEngine.__init__ -> self._init_engine -> _AsyncLLMEngine.__init__ -> LLMEngine.__init__ -> executor_class() 即调用RayGPUExecutorAsync.__init__`
3.初始化Ray集群
Ray Worker初始化包括Ray集群初始化,Ray Worker初始化。在Ray worker初始化时会分布式加载模型。
`# RayGPUExecutorAsync 继承了RayGPUExecutor及ExecutorAsyncBase 类,初始化时会调用RayGPUExecutor的self._init_executor 方法``def _init_executor(self) -> None:` `# Create the parallel GPU workers. 初始化workers 核心代码` `self._init_workers_ray(placement_group)`` ``def _init_workers_ray():` `# 定义worker, 是vllm.worker.worker模块里的Worker类` `# actor为RayWorkerWrapper类` `worker = ray.remote(` `num_cpus=0,` `num_gpus=num_gpus,` `scheduling_strategy=scheduling_strategy,` `**ray_remote_kwargs,` `)(RayWorkerWrapper).remote(` `worker_module_name="vllm.worker.worker",` `worker_class_name="Worker",` `trust_remote_code=self.model_config.trust_remote_code,` `)`` ` `# 在Ray Worker上依次执行如下方法` `self._run_workers("get_node_and_gpu_ids",` `use_dummy_driver=True)` `self._run_workers("update_environment_variables",` `all_args=all_args_to_update_environment_variables)` `self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)` `self._run_workers("init_device")` `self._run_workers(` `"load_model",` `max_concurrent_workers=self.parallel_config.` `max_parallel_loading_workers,` `)`` ``def _run_workers():` `# Start the ray workers first.` `ray_worker_outputs = [` `# worker是前面定义的RayWorkerWrapper类, 继承RayWorkerWrapper类` `# 实际调用了RayWorkerWrapper.execute_method 并在远程实例上执行method方法` `worker.execute_method.remote(method, *worker_args,` `**worker_kwargs)` `for (worker, worker_args, worker_kwargs` `) in zip(self.workers, all_worker_args, all_worker_kwargs)` `]`` ``def init_worker():` `# worker_module_name 是 vllm.worker.worker 就是_init_workers_ray方法中传入的` `mod = importlib.import_module(self.worker_module_name)` `# Worker` `worker_class = getattr(mod, self.worker_class_name)` `self.worker = worker_class(*args, **kwargs)` `# Worker.__init__ -> ModelRunner.__init__`` ``def init_device():` `# 初始化分布式推理的机器信息` `"""Initialize the distributed environment."""` `init_distributed_environment(parallel_config.world_size, rank,` `distributed_init_method, local_rank)`` ``def load_model():` `self.model_runner.load_model() # ModelRunner.load_model() -> vllm.model_executor.model_loader.loader.load_model`
执行完load_model()的预期日志输出如下,可以看到两个pod,每个加载了13.2845 GB,即一半的模型。
`INFO 04-26 09:39:46 model_runner.py:173] Loading model weights took 13.2845 GB``(RayWorkerWrapper pid=3327, ip=192.168.12.132) INFO 04-26 09:39:51 model_runner.py:173] Loading model weights took 13.2845 GB`
4.对外提供服务
创建OpenAIServingChat 以及OpenAIServingCompletion实例,启动uvicorn对外提供服务。
`@app.post("/v1/chat/completions")``openai_serving_chat = OpenAIServingChat(engine, served_model_names,` `args.response_role,` `args.lora_modules,` `args.chat_template)``@app.post("/v1/completions")``openai_serving_completion = OpenAIServingCompletion(` `engine, served_model_names, args.lora_modules)`` ``app.root_path = args.root_path``uvicorn.run(app)`
3.1 分布式推理过程
当启动参数--tensor-parallel-size > 1 时,会自动触发ray分布式部署。
1. 构建LLM engine时会对Ray集群进行初始化
`# ray 集群初始化``initialize_ray_cluster(engine_config.parallel_config)`
parallel_config的配置如下,pp=1,tp=2,world_size=2
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'placement_group': None, 'world_size': 2}
初始化时会为worker进程创建placement_group。
1)获取ray cluster中所有gpu的数量。
2)根据world size申请gpu placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)。
3)创建placement_group,ray会根据placement_group在对应node上启动actor。
2. 在每个worker上执行get_node_and_gpu_ids 方法
`# 获取node及node上分配的gpu卡信息``def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]:` `node_id = ray.get_runtime_context().get_node_id()` `gpu_ids = ray.get_gpu_ids()` `return node_id, gpu_ids`
3. 在每个worker上执行update_environment_variables
`# 第二步获取的worker_node以及gpu信息``worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",` `use_dummy_driver=True)`` ``# Set environment variables for the driver and workers.``all_args_to_update_environment_variables = [({` `"CUDA_VISIBLE_DEVICES":` `",".join(map(str, node_gpus[node_id])),` `"VLLM_INSTANCE_ID":` `VLLM_INSTANCE_ID,` `"VLLM_TRACE_FUNCTION":` `os.getenv("VLLM_TRACE_FUNCTION", "0"),` `}, ) for (node_id, _) in worker_node_and_gpu_ids]`
4. 在每个worker上执行init_device方法
`# worker的启动参数``init_worker_all_kwargs = []``# worker_node_and_gpu_ids 是第二步获取的worker上的gpu信息``for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids):` `local_rank = node_workers[node_id].index(rank)` `init_worker_all_kwargs.append(` `collect_arg_helper_func(` `model_config=self.model_config,` `parallel_config=self.parallel_config,` `scheduler_config=self.scheduler_config,` `device_config=self.device_config,` `cache_config=self.cache_config,` `load_config=self.load_config,` `local_rank=local_rank,` `rank=rank,` `distributed_init_method=distributed_init_method,` `lora_config=self.lora_config,` `vision_language_config=self.vision_language_config,` `is_driver_worker=rank == 0,` `))`` ``def init_device(self) -> None:` `if self.device_config.device.type == "cuda":` `# torch.distributed.all_reduce does not free the input tensor until` `# the synchronization point. This causes the memory usage to grow` `# as the number of all_reduce calls increases. This env var disables` `# this behavior.` `# Related issue:` `# https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573` `os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"`` ` `# This env var set by Ray causes exceptions with graph building.` `os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)` `self.device = torch.device(f"cuda:{self.local_rank}")` `torch.cuda.set_device(self.device)`` ` `_check_if_gpu_supports_dtype(self.model_config.dtype)` `torch.cuda.empty_cache()` `self.init_gpu_memory = torch.cuda.mem_get_info()[0]` `else:` `raise RuntimeError(` `f"Not support device type: {self.device_config.device}")` `# Initialize the distributed environment.` `init_worker_distributed_environment(self.parallel_config, self.rank,` `self.distributed_init_method,` `self.local_rank)` `# Set random seed.` `set_random_seed(self.model_config.seed)`
核心方法 init_worker_distributed_environment 用于构建分布式集群的world信息,类似horovod及deepspeed框架中的world info。
该方法参数如下:
work1: self.rank=0, self.local_rank=0, self.distributed_init_method="tcp://192.168.12.120:42167" (ray master)
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'placement_group': <ray.util.placement_group.PlacementGroup object at 0x7fdeaa896ce0>, 'world_size': 2}, {'id': PlacementGroupID(51489eb26a9335f31ed1bdb4eace04000000), 'bundle_cache': [{'GPU': 1.0}, {'GPU': 1.0}]}, self.rank=0, tcp://192.168.12.120:42167, self.local_rank=0
work2: self.rank=1, self.local_rank=0,self.distributed_init_method="tcp://192.168.12.120:42167"
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'world_size': 2}, self.rank=1, tcp://192.168.12.120:42167, self.local_rank=0
self.rank全局递增,self.local_rank是指在一个pod内第几个gpu。
5. 在每个worker执行load_model方法
load_model用于加载分布式模型,比较复杂,在下面的章节中单独介绍。
3.2 分布式模型加载流程
在每个worker执行load_model方法
`def load_model():` `self.model_runner.load_model()` ` ``# ModelRunner.load_model() -> vllm.model_executor.model_loader.loader.load_model``def load_model(self) -> None:` `with CudaMemoryProfiler() as m:` `# get_model 获取模型` `self.model = get_model(` `model_config=self.model_config,` `device_config=self.device_config,` `load_config=self.load_config,` `lora_config=self.lora_config,` `vision_language_config=self.vision_language_config,` `parallel_config=self.parallel_config,` `scheduler_config=self.scheduler_config,` `)`` ` `self.model_memory_usage = m.consumed_memory` `logger.info(f"Loading model weights took "` `f"{self.model_memory_usage / float(2**30):.4f} GB")`` ``# get_model -> loader.load_model -> DefaultModelLoader.load_model``def load_model(self, *, model_config: ModelConfig,` `device_config: DeviceConfig,` `lora_config: Optional[LoRAConfig],` `vision_language_config: Optional[VisionLanguageConfig],` `parallel_config: ParallelConfig,` `scheduler_config: SchedulerConfig) -> nn.Module:` `with set_default_torch_dtype(model_config.dtype):` `with torch.device(device_config.device):` `"""Initialize a model with the given configurations."""` `# 初始化模型` `model = _initialize_model(model_config, self.load_config,` `lora_config, vision_language_config)`` ` `# 调用对应model的load_weights方法` `model.load_weights(` `self._get_weights_iterator(model_config.model,` `model_config.revision,` `fall_back_to_pt=getattr(` `model,` `"fall_back_to_pt_during_load",` `True)), )` `for _, module in model.named_modules():` `linear_method = getattr(module, "linear_method", None)` `if linear_method is not None:` `linear_method.process_weights_after_loading(module)` `if hasattr(module, "process_weights_after_loading"):` `module.process_weights_after_loading()` `return model.eval()`` ``# 根据model config找到具体是什么模型``def _initialize_model(` `model_config: ModelConfig, load_config: LoadConfig,` `lora_config: Optional[LoRAConfig],` `vision_language_config: Optional[VisionLanguageConfig]) -> nn.Module:` `"""Initialize a model with the given configurations."""` `# Qwen-7B-Chat/config.json中architecture字段` `model_class = get_model_architecture(model_config)[0]` `linear_method = _get_linear_method(model_config, load_config)`` ` `return model_class(config=model_config.hf_config,` `linear_method=linear_method,` `**_get_model_initialization_kwargs(` `model_class, lora_config, vision_language_config))`` ``# model_class 是 <class 'vllm.model_executor.models.qwen.QWenLMHeadModel'>`
model.load_weights即调用QwenLMHeadModel的load_weights方法
`# QWenLMHeadModel.load_weights``def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]):` `stacked_params_mapping = [` `# (param_name, shard_name, shard_id)` `("gate_up_proj", "w2", 0),` `("gate_up_proj", "w1", 1),` `]`` ` `# 模型每层权重及其名称` `# self.named_parameters即model.named_parameters()` `params_dict = dict(self.named_parameters())`` ` `for name, loaded_weight in weights:` `# name: transformer.h.27.mlp.c_proj.weight`` # loaded_weight: tensor(xxx)` `if "rotary_emb.inv_freq" in name:` `continue` `for (param_name, weight_name, shard_id) in stacked_params_mapping:` `if weight_name not in name:` `continue` `# 如果在stacked_params_mapping里,就需要把shard_name改为param_name` `# 如 name为 transformer.h.0.mlp.w1.weight,则name需要改为 transformer.h.0.mlp.gate_up_proj.weight` `name = name.replace(weight_name, param_name)` `# Skip loading extra bias for GPTQ models.` `if name.endswith(".bias") and name not in params_dict:` `continue` `param = params_dict[name]` `weight_loader = param.weight_loader` `weight_loader(param, loaded_weight, shard_id)` `break` `else:` `# python的for-else语法,到达这里意味着没有执行循环中的 break 语句` `# Skip loading extra bias for GPTQ models.` `if name.endswith(".bias") and name not in params_dict:` `continue` `param = params_dict[name]` `# 根据name找到对应的weight_loader方法` `weight_loader = getattr(param, "weight_loader",` `default_weight_loader)` `weight_loader(param, loaded_weight)`
模型层权重及其weight_loader方法
`# param,weight_loader`` ``lm_head.weight, weight_loader <bound method VocabParallelEmbedding.weight_loader of ParallelLMHead()>` `transformer.h.0.attn.c_attn.weight, weight_loader <bound method QKVParallelLinear.weight_loader of QKVParallelLinear()>` `transformer.h.0.attn.c_proj.weight, weight_loader <bound method RowParallelLinear.weight_loader of RowParallelLinear()>` `transformer.h.0.ln_1.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>` `transformer.h.0.ln_2.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>` `transformer.h.0.mlp.c_proj.weight, weight_loader <bound method RowParallelLinear.weight_loader of RowParallelLinear()>` `transformer.h.0.mlp.gate_up_proj.weight, weight_loader <bound method MergedColumnParallelLinear.weight_loader of MergedColumnParallelLinear()>` `transformer.ln_f.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>` `transformer.wte.weight, weight_loader <bound method VocabParallelEmbedding.weight_loader of VocabParallelEmbedding()>`
模型的每一层都有自己的分布式加载方法,如transformer.h.0.attn.c_proj.weight 这个权重使用了RowParallelLinear.weight_loader方法。
`class RowParallelLinear(torch.nn.Module):` `def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor):` `# 获取worker的tp_rank,根据tp_rank计算需要加载的权重范围` `tp_rank = get_tensor_model_parallel_rank()` `input_dim = getattr(param, "input_dim", None)` `param_data = param.data` `if input_dim is not None:` `shard_size = param_data.shape[input_dim]` `start_idx = tp_rank * shard_size` `loaded_weight = loaded_weight.narrow(input_dim, start_idx,` `shard_size)` `assert param_data.shape == loaded_weight.shape` `param_data.copy_(loaded_weight)`
模型切分采用了Megatron-LM算法,详情可参考论文**【文末查看】**
04
4.1 分布式节点通信:AllReduce
https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html#
1)Reduce:将每个GPU的计算结果汇总到某个特定的GPU上
2)Broadcast:将某个GPU的数据同步到所有GPU上
3)AllReduce = Reduce + Broadcast
4.2 Transformer切分
Transformer层由一个自注意力模块(self-attention block)后跟一个两层的多层感知机(MLP)实现的。
MLP
如图所示,MLP由两个部分组成,GeLU是非线形函数,即所以不能采用行并行,需要采用列并行。
此时,B需要采用行并行。如果B采用列并行的话,则需要进行一次all-reduce同步。
Dropout是按照一定比例随机丢弃一些参数,因此Dropout前必须进行一次all-reduce同步。
Self-Attention
multi-head attention机制中每个attention都是独立的QKV矩阵,每个GPU上计算部分attention就行。因此要求attention head可以被tp_size整除。否则会报错如下(Qwen-14b设置tp=3):
ValueError: Total number of attention heads (40) must be divisible by tensor parallel size (3).
同样,Dropout前需要进行一次all-reduce操作。
因此,一次Transformer推理需要进行2次all-reduce操作,qwen-14b中transformer有40个,一次推理需要执行81一个all-reduce操作。跨节点部署推理服务时,网络通信将会是比较大的开销。
本文重点分析vllm如何实现分布式推理,具体vllm的推理过程可参考下方【01推理过程解析】
参考链接
[01] 推理过程解析
https://zhuanlan.zhihu.com/p/649974825
[02] 【深度学习】【分布式训练】一文捋顺千亿模型训练技术:流水线并行、张量并行和3D并行
https://zhuanlan.zhihu.com/p/617087561
[03] Hugging Face高效训练技术四:多GPU分布式训练(DP、PP、TP 、ZeRO)_zero-dp
https://blog.csdn.net/qq\_56591814/article/details/134099476
[04] Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism
https://arxiv.org/pdf/1909.08053
欢迎留言一起参与讨论~