地方类门户网站怎么用云主机做网站

张小明 2026/1/9 23:36:22
地方类门户网站,怎么用云主机做网站,公司做网页,国内最大的猎头公司参考verl对dapo的实现#xff0c;首先咱们看一下入口.sh和.py文件#xff0c;在./recipe/dapo/文件夹中有以下目录.├── config│ ├── dapo_megatron_trainer.yaml│ └── dapo_trainer.yaml├── dapo_ray_trainer.py├── main_dapo.py├── prepare_dapo_data.s…参考verl对dapo的实现首先咱们看一下入口.sh和.py文件在./recipe/dapo/文件夹中有以下目录.├── config│ ├── dapo_megatron_trainer.yaml│ └── dapo_trainer.yaml├── dapo_ray_trainer.py├── main_dapo.py├── prepare_dapo_data.sh├── README.md├── run_dapo_qwen2.5_32b.sh整体的执行顺序main_dapo.py数据加载初始化、初始化actor_rollout model、rm model加载reward_managerdapo_ray_trainer.pyRL训练流程对batch进行repeate每个q采样n次记录每个采样的log以及对应的reward_score 和 advantagefilter掉一个q的所有sample的score都是1或都是0继续获取新的q进行采样直到满足要求的batch的大小达到train_prompt_bsz。值得注意的是batch大小是gen_prompt_bsz3*train_prompt_bsz通过提高采样q的个数避免满足要求的q不到train_prompt_bsz。每mini_batch的data进行模型更新每micro_batch的data进行前向传播token-mean loss与梯度计算具体代码实例:main_dapo.py# Copyright 2024 Bytedance Ltd. and/or its affiliates## Licensed under the Apache License, Version 2.0 (the License);# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an AS IS BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.Note that we dont combine the main with ray_trainer as ray_trainer is used by other main.import osimport socketimport hydraimport rayfrom omegaconf import OmegaConffrom verl.trainer.ppo.reward import load_reward_managerfrom verl.utils.device import is_cuda_availablefrom .dapo_ray_trainer import RayDAPOTrainerhydra.main(config_pathconfig, config_namedapo_trainer, version_baseNone)def main(config):run_ppo(config)################################################################## RL训练入口#################################################################def run_ppo(config) - None:if not ray.is_initialized():# this is for local ray clusterdefault_runtime_env {env_vars: {TOKENIZERS_PARALLELISM: true, NCCL_DEBUG: WARN, VLLM_LOGGING_LEVEL: WARN}}ray_init_kwargs config.ray_kwargs.get(ray_init, {})runtime_env_kwargs ray_init_kwargs.get(runtime_env, {})runtime_env OmegaConf.merge(default_runtime_env, runtime_env_kwargs)ray_init_kwargs OmegaConf.create({**ray_init_kwargs, runtime_env: runtime_env})print(fray init kwargs: {ray_init_kwargs})ray.init(**OmegaConf.to_container(ray_init_kwargs))try:if (is_cuda_availableand config.global_profiler.tool nsysand OmegaConf.select(config.global_profiler, steps) is not Noneand len(OmegaConf.select(config.global_profiler, steps)) 0):nsight_options OmegaConf.to_container(config.global_profiler.global_tool_config.nsys.controller_nsight_options)runner TaskRunner.options(runtime_env{nsight: nsight_options}).remote()else:runner TaskRunner.remote()ray.get(runner.run.remote(config))finally:if ray.is_initialized():ray.shutdown()ray.remote(num_cpus1) # please make sure main_task is not scheduled on headclass TaskRunner:def run(self, config):# print initial configfrom pprint import pprintfrom omegaconf import OmegaConffrom verl.utils.fs import copy_to_localprint(fTaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()})pprint(OmegaConf.to_container(config, resolveTrue)) # resolveTrue will eval symbol valuesOmegaConf.resolve(config)# download the checkpoint from hdfslocal_path copy_to_local(config.actor_rollout_ref.model.path)# instantiate tokenizerfrom verl.utils import hf_processor, hf_tokenizertokenizer hf_tokenizer(local_path)processor hf_processor(local_path, use_fastTrue) # used for multimodal LLM, could be nonefrom verl.single_controller.ray import RayWorkerGroup################################################################## 加载actor worker################################################################## define worker classesif config.actor_rollout_ref.actor.strategy in {fsdp, fsdp2}:assert config.critic.strategy in {fsdp, fsdp2}from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorkerray_worker_group_cls RayWorkerGroupelif config.actor_rollout_ref.actor.strategy megatron:assert config.actor_rollout_ref.actor.strategy config.critic.strategyfrom verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorkerray_worker_group_cls RayWorkerGroupelse:raise NotImplementedErrorfrom verl.trainer.ppo.ray_trainer import ResourcePoolManager, Rolerole_worker_mapping {Role.ActorRollout: ray.remote(ActorRolloutRefWorker),Role.Critic: ray.remote(CriticWorker),}global_pool_id global_poolresource_pool_spec {global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,}mapping {Role.ActorRollout: global_pool_id,Role.Critic: global_pool_id,}# we should adopt a multi-source reward function here# - for rule-based rm, we directly call a reward score# - for model-based rm, we call a model# - for code related prompt, we send to a sandbox if there are test cases# - finally, we combine all the rewards together# - The reward type depends on the tag of the dataif config.reward_model.enable:if config.reward_model.strategy in {fsdp, fsdp2}:from verl.workers.fsdp_workers import RewardModelWorkerelif config.reward_model.strategy megatron:from verl.workers.megatron_workers import RewardModelWorkerelse:raise NotImplementedErrorrole_worker_mapping[Role.RewardModel] ray.remote(RewardModelWorker)mapping[Role.RewardModel] global_pool_id# reference modelif config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:role_worker_mapping[Role.RefPolicy] ray.remote(ActorRolloutRefWorker)mapping[Role.RefPolicy] global_pool_id################################################################## 加载reward manager函数。用于根据data计算对应的reward score#################################################################reward_fn load_reward_manager(config,tokenizer,0,max_resp_lenconfig.data.max_response_length,overlong_buffer_cfgconfig.reward_model.overlong_buffer,)# Note that we always use function-based RM for validationval_reward_fn load_reward_manager(config,tokenizer,1,max_resp_lenconfig.data.max_response_length,overlong_buffer_cfgconfig.reward_model.overlong_buffer,)resource_pool_manager ResourcePoolManager(resource_pool_specresource_pool_spec, mappingmapping)################################################################## 加载主要的DAPO RL训练类并运行.fit()#################################################################trainer RayDAPOTrainer(configconfig,tokenizertokenizer,processorprocessor,role_worker_mappingrole_worker_mapping,resource_pool_managerresource_pool_manager,ray_worker_group_clsray_worker_group_cls,reward_fnreward_fn,val_reward_fnval_reward_fn,)trainer.init_workers()trainer.fit()if __name__ __main__:main()我们紧接着来看一下from verl.trainer.ppo.reward import load_reward_manager。配置文件中verl/recipe/dapo/run_dapo_qwen2.5_32b.sh给出了reward的类型enable_overlong_bufferTrueoverlong_buffer_len$((1024 * 4)) # overlong softoverlong_penalty_factor1.0reward_model.reward_managerdapo \reward_model.overlong_buffer.enable${enable_overlong_buffer} \reward_model.overlong_buffer.len${overlong_buffer_len} \reward_model.overlong_buffer.penalty_factor${overlong_penalty_factor} \verl.trainer.ppo.reward.pydef load_reward_manager(config: DictConfig, tokenizer: Any, num_examine: int, **reward_kwargs: Any) - AbstractRewardManager:Load and initialize a reward manager based on the configuration.Args:config: PPO trainer configuration object containing reward_model fields.tokenizer: Tokenizer object used for processing text.num_examine: Number of samples to examine.**reward_kwargs: Additional keyword arguments for the reward manager.Returns:An instance of the specified reward manager class.# Try to get a custom reward function based on the configuration# user defined reward manager can be registered in custom_reward_fncompute_score get_custom_reward_fn(config)final_compute_score compute_score# The list of pre-defined reward managers are defined in verl/workers/reward_manager/:# naive: NaiveRewardManager# prime: PrimeRewardManager# batch: BatchRewardManager# dapo: DAPORewardManager# Note(haibin.lin): For custom reward managers, please make sure they are imported and# registered via verl.workers.reward_manager.register# By default reward_manager is set to naive (NaiveRewardManager)################################################################## 在这里加载具体的reward_manager#################################################################reward_manager_name config.reward_model.get(reward_manager, naive)reward_manager_cls get_reward_manager_cls(reward_manager_name)if compute_score is None:sandbox_config config.reward_model.get(sandbox_fusion)sandbox_url sandbox_config.get(url) if sandbox_config else Nonememory_limit_mb sandbox_config.get(memory_limit_mb, 1024)if sandbox_url:sandbox_manager multiprocessing.Manager()# Create a semaphore to control concurrent access to the sandbox_concurrent_semaphore sandbox_manager.Semaphore(sandbox_config.get(max_concurrent, 64))final_compute_score partial(default_compute_score,sandbox_fusion_urlsandbox_url,concurrent_semaphore_concurrent_semaphore,memory_limit_mbmemory_limit_mb,)else:final_compute_score default_compute_score################################################################## 这里的reward_manager_cls 其实是DAPO################################################################## Instantiate and return the reward manager with the specified parametersreturn reward_manager_cls(tokenizertokenizer,num_examinenum_examine,compute_scorefinal_compute_score,reward_fn_keyconfig.data.reward_fn_key,**reward_kwargs,)这里需要知道dapo的reward_manager_cls 具体是什么因为reward需要batch数据才能计算因此对于reward manager咱们先按下不表其实dapo对应的reward_manager_cls是在verl/verl/workers/reward_manager/dapo.py先去dapo_ray_trainer.py看一下batch是怎么采样的再回来仔细阅读reward的具体计算方法。dapo_ray_trainer.py################################################################## RayDAPOTrainer继承于RayPPOTrainer# fit()函数执行dapo的训练包括1动态采样2overlong soft reward计算3token-level loss#################################################################class RayDAPOTrainer(RayPPOTrainer):Note that this trainer runs on the driver process on a single CPU/GPU node.def fit(self):The training loop of PPO.The driver process only need to call the compute functions of the worker group through RPCto construct the PPO dataflow.The light-weight advantage computation is done on the driver process.from omegaconf import OmegaConffrom verl.utils.tracking import Trackinglogger Tracking(project_nameself.config.trainer.project_name,experiment_nameself.config.trainer.experiment_name,default_backendself.config.trainer.logger,configOmegaConf.to_container(self.config, resolveTrue),)self.global_steps 0self.gen_steps 0# load checkpoint before doing anythingself._load_checkpoint()# perform validation before training# currently, we only support validation using the reward_function.if self.val_reward_fn is not None and self.config.trainer.get(val_before_train, True):val_metrics self._validate()assert val_metrics, f{val_metrics}pprint(fInitial validation metrics: {val_metrics})logger.log(dataval_metrics, stepself.global_steps)if self.config.trainer.get(val_only, False):returnif self.config.actor_rollout_ref.rollout.get(skip_rollout, False):rollout_skip RolloutSkip(self.config, self.actor_rollout_wg)rollout_skip.wrap_generate_sequences()# add tqdmprogress_bar tqdm(totalself.total_training_steps, initialself.global_steps, descTraining Progress)# we start from step 1self.global_steps 1self.gen_steps 1last_val_metrics Noneprev_step_profile Falsecurr_step_profile (self.global_steps in self.config.global_profiler.stepsif self.config.global_profiler.steps is not Noneelse False)next_step_profile Falsetiming_raw defaultdict(float)batch None################################################################## num_prompt_in_batch记录filter后std不等于0的q的个数当模型更新后重新赋值为0# num_gen_batches: 记录当前使用了多少个gen_batch当模型更新后重新赋值为0#################################################################num_prompt_in_batch 0num_gen_batches 0################################################################## 正式开始训练循环每个epoch后循环每个gen_batch#################################################################for epoch in range(self.config.trainer.total_epochs):for batch_dict in self.train_dataloader:metrics {}with marked_timer(start_profile, timing_raw):self._start_profiling(not prev_step_profile and curr_step_profileif self.config.global_profiler.profile_continuous_stepselse curr_step_profile)################################################################## new_batch 是DataProto类型具体见verl/verl/protocol.py# new_batch.batch是TensorDict类型# new_batch中q的数量是可训练batch大小的3倍增加采样的batch的q的个数#################################################################new_batch: DataProto DataProto.from_single_dict(batch_dict)num_gen_batches 1# pop those keys for generationif multi_modal_data in new_batch.non_tensor_batch.keys():gen_batch new_batch.pop(batch_keys[input_ids, attention_mask, position_ids],non_tensor_batch_keys[raw_prompt_ids, multi_modal_data],)else:# 从new_batch中提取对应的key构建gen_batchgen_batch new_batch.pop(batch_keys[input_ids, attention_mask, position_ids],non_tensor_batch_keys[raw_prompt_ids],)# 这里为什么要repeate呢因为每个prompt要采样n次所以repeat n次。这里的interleaveTrue# gen_batch: (bsz, response_length),# gen_batch_output: (bsz*n, response_length)gen_batch_output gen_batch.repeat(repeat_timesself.config.actor_rollout_ref.rollout.n, interleaveTrue)is_last_step self.global_steps self.total_training_stepswith marked_timer(step, timing_raw):# generate a batchwith marked_timer(gen, timing_raw, red):gen_batch_output self.actor_rollout_wg.generate_sequences(gen_batch_output)timing_raw.update(gen_batch_output.meta_info[timing])gen_batch_output.meta_info.pop(timing, None)# 这个advatange 可以先忽略。RMAX需要先计算 贪心采样的sample的logits作为后序adv计算的baselineif self.config.algorithm.adv_estimator AdvantageEstimator.REMAX:with marked_timer(gen_max, timing_raw, red):gen_baseline_batch deepcopy(gen_batch)# 这里是贪心采样的baselinedo_sample Falsegen_baseline_batch.meta_info[do_sample] Falsegen_baseline_output self.actor_rollout_wg.generate_sequences(gen_baseline_batch)new_batch new_batch.union(gen_baseline_output)# compute reward model score on new_batchrm_scores Noneif self.use_rm and rm_scores not in new_batch.batch.keys():rm_scores self.rm_wg.compute_rm_score(new_batch)new_batch new_batch.union(rm_scores)reward_baseline_tensor, _ compute_reward(new_batch, self.reward_fn)reward_baseline_tensor reward_baseline_tensor.sum(dim-1)keys_to_pop set(gen_baseline_output.batch.keys())if rm_scores is not None:keys_to_pop.update(rm_scores.batch.keys())new_batch.pop(batch_keyslist(keys_to_pop))new_batch.batch[reward_baselines] reward_baseline_tensordel rm_scores, gen_baseline_batch, gen_baseline_output################################################################## new_batch的大小是gen_prompt_bsz# 对每一个prompt设置一个专属的标识 uid# 之所以设置uid是因为之后对sample计算reward时需要对同一个q的n个sample的reward标准化#################################################################new_batch.non_tensor_batch[uid] np.array([str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtypeobject)# 对batch中的每个key进行repeat这里应该主要是对uid进行repeat# repeat to align with repeated responses in rolloutnew_batch new_batch.repeat(repeat_timesself.config.actor_rollout_ref.rollout.n, interleaveTrue)# 把采样完的放到new_batch中new_batch new_batch.union(gen_batch_output)with marked_timer(reward, timing_raw, yellow):# compute scores. Support both model and function-based.# We first compute the scores using reward model. Then, we call reward_fn to combine# the results from reward model and rule-based results.if self.use_rm and rm_scores not in new_batch.batch.keys():# we first compute reward model scorereward_tensor self.rm_wg.compute_rm_score(new_batch)new_batch new_batch.union(reward_tensor)# 计算new_batch各个采样的reward根据设置好的self.reward_fn# we combine with rule-based rmreward_tensor, reward_extra_infos_dict compute_reward(new_batch, self.reward_fn)new_batch.batch[token_level_scores] reward_tensorif reward_extra_infos_dict:new_batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()})# compute rewards. apply_kl_penalty if availableif self.config.algorithm.use_kl_in_reward:new_batch, kl_metrics apply_kl_penalty(new_batch, kl_ctrlself.kl_ctrl_in_reward, kl_penaltyself.config.algorithm.kl_penalty)metrics.update(kl_metrics) # TODO: This will be cleared if we use multiple genenration batcheselse:new_batch.batch[token_level_rewards] new_batch.batch[token_level_scores]################################################################## dapo的filterdynamic sample部分#################################################################if not self.config.algorithm.filter_groups.enable:batch new_batchelse: # NOTE: When prompts after filtering is less than train batch size,# we skip to the next generation batchmetric_name self.config.algorithm.filter_groups.metricif metric_name seq_final_reward:# Turn to numpy for easier filteringnew_batch.non_tensor_batch[seq_final_reward] (new_batch.batch[token_level_rewards].sum(dim-1).numpy())elif metric_name seq_reward:new_batch.non_tensor_batch[seq_reward] (new_batch.batch[token_level_scores].sum(dim-1).numpy())# {uid: [r1,r2,r3,...,rn], uid: [...], ...}记录每个轨迹所有采样的reward# Collect the sequence reward for each trajectoryprompt_uid2metric_vals defaultdict(list)for uid, metric_val in zip(new_batch.non_tensor_batch[uid], new_batch.non_tensor_batch[metric_name], strictTrue):prompt_uid2metric_vals[uid].append(metric_val)# 每个q的reward的stdprompt_uid2metric_std {}for prompt_uid, metric_vals in prompt_uid2metric_vals.items():prompt_uid2metric_std[prompt_uid] np.std(metric_vals)# 保留reward std不是0的q的uidkept_prompt_uids [uidfor uid, std in prompt_uid2metric_std.items()if std 0 or len(prompt_uid2metric_vals[uid]) 1]# 累积std不是0的qnum_prompt_in_batch len(kept_prompt_uids)# 记录留下来的q的sample的idxkept_traj_idxs []for idx, traj_from_prompt_uid in enumerate(new_batch.non_tensor_batch[uid]):if traj_from_prompt_uid in kept_prompt_uids:kept_traj_idxs.append(idx)# 基于traj的id检索对应的new_batchnew_batch new_batch[kept_traj_idxs]# batch是留下的traj数据的累积batch new_batch if batch is None else DataProto.concat([batch, new_batch])# .sh文件配置的 可以训练的batch的最小大小q的数量prompt_bsz self.config.data.train_batch_size# 如果现有的累积filter出来的q的数量小于 配置的最小数量则continue继续使用下一个new_batch进行累积if num_prompt_in_batch prompt_bsz:print(f{num_prompt_in_batch} {prompt_bsz})max_num_gen_batches self.config.algorithm.filter_groups.max_num_gen_batches# max_num_gen_batches是最多可以使用的gen_batch的个数# 如果其小于0的话即没有限制若num_gen_batches max_num_gen_batches则继续continueif max_num_gen_batches 0 or num_gen_batches max_num_gen_batches:print(f{num_gen_batches}. Keep generating...)self.gen_steps 1is_last_step self.global_steps self.total_training_stepscontinueelse:raise ValueError(f{num_gen_batches} {max_num_gen_batches}. Generated too many. Please check if your data are too difficult. You could also try set max_num_gen_batches0 to enable endless trials.)# 累积的符合的q个个数最小的可以训练的batch的大小else:# Align the batchtraj_bsz self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n################################################################## 对齐一下多余的轨迹会被抛弃不知道会不会导致采样的利用效率不高# 会不会导致一些轨迹根本不会被训练到#################################################################batch batch[:traj_bsz]################################################################## actor模型更新################################################################## Updating batch.batch[response_mask] compute_response_mask(batch)# Balance the number of valid tokens across DP ranks.# NOTE: This usually changes the order of data in the batch,# which wont affect the advantage calculation (since its based on uid),# but might affect the loss calculation (due to the change of mini-batching).# TODO: Decouple the DP balancing and mini-batching.if self.config.trainer.balance_batch:self._balance_batch(batch, metricsmetrics)# compute global_valid tokensbatch.meta_info[global_token_num] torch.sum(batch.batch[attention_mask], dim-1).tolist()################################################################## 记录filter后的batch的每个traj的采样时的logtistoken-level# 用于计算重要性采样的比值################################################################## recompute old_log_probswith marked_timer(old_log_prob, timing_raw, blue):old_log_prob self.actor_rollout_wg.compute_log_prob(batch)entropys old_log_prob.batch[entropys]response_masks batch.batch[response_mask]loss_agg_mode self.config.actor_rollout_ref.actor.loss_agg_mode# 这里dapo的loss_agg_mode是“token_mean”entropy_agg agg_loss(loss_matentropys, loss_maskresponse_masks, loss_agg_modeloss_agg_mode)old_log_prob_metrics {actor/entropy: entropy_agg.detach().item()}metrics.update(old_log_prob_metrics)old_log_prob.batch.pop(entropys)batch batch.union(old_log_prob)if self.use_reference_policy:# compute reference log_probwith marked_timer(ref, timing_raw, olive):ref_log_prob self.ref_policy_wg.compute_ref_log_prob(batch)batch batch.union(ref_log_prob)# compute valuesif self.use_critic:with marked_timer(values, timing_raw, cyan):values self.critic_wg.compute_values(batch)batch batch.union(values)# 计算token_level的重要性采样# Compute rollout IS weights and mismatch metrics (inherited from RayPPOTrainer)batch, is_metrics self.compute_rollout_importance_weights_and_add_to_batch(batch)# IS and mismatch metrics already have mismatch/ prefixmetrics.update(is_metrics)################################################################## 计算advantage#################################################################with marked_timer(adv, timing_raw, brown):# compute advantages, executed on the driver processnorm_adv_by_std_in_grpo self.config.algorithm.get(norm_adv_by_std_in_grpo, True)batch compute_advantage(batch,adv_estimatorself.config.algorithm.adv_estimator,gammaself.config.algorithm.gamma,lamself.config.algorithm.lam,num_repeatself.config.actor_rollout_ref.rollout.n,norm_adv_by_std_in_grponorm_adv_by_std_in_grpo,)# update criticif self.use_critic:with marked_timer(update_critic, timing_raw, pink):critic_output self.critic_wg.update_critic(batch)critic_output_metrics reduce_metrics(critic_output.meta_info[metrics])metrics.update(critic_output_metrics)# implement critic warmupif self.config.trainer.critic_warmup self.global_steps:################################################################## 更新actor modelbatch的大小是train_prompt_size# 每个mini_bsz 更新一次模型参数-累积梯度# 每个micro_bsz 累积一次梯度################################################################## update actorwith marked_timer(update_actor, timing_raw, red):actor_output self.actor_rollout_wg.update_actor(batch)actor_output_metrics reduce_metrics(actor_output.meta_info[metrics])metrics.update(actor_output_metrics)# Log rollout generations if enabledrollout_data_dir self.config.trainer.get(rollout_data_dir, None)if rollout_data_dir:self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)# validateif (self.val_reward_fn is not Noneand self.config.trainer.test_freq 0and (is_last_step or self.global_steps % self.config.trainer.test_freq 0)):with marked_timer(testing, timing_raw, green):val_metrics: dict self._validate()if is_last_step:last_val_metrics val_metricsmetrics.update(val_metrics)if self.config.trainer.save_freq 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq 0):with marked_timer(save_checkpoint, timing_raw, green):self._save_checkpoint()with marked_timer(stop_profile, timing_raw):next_step_profile (self.global_steps 1 in self.config.global_profiler.stepsif self.config.global_profiler.steps is not Noneelse False)self._stop_profiling(curr_step_profile and not next_step_profileif self.config.global_profiler.profile_continuous_stepselse curr_step_profile)prev_step_profile curr_step_profilecurr_step_profile next_step_profile# collect metricsmetrics.update(compute_data_metrics(batchbatch, use_criticself.use_critic))metrics.update(compute_timing_metrics(batchbatch, timing_rawtiming_raw))# TODO: implement actual tflpo and theoretical tflpon_gpus self.resource_pool_manager.get_n_gpus()metrics.update(compute_throughout_metrics(batchbatch, timing_rawtiming_raw, n_gpusn_gpus))timing_raw defaultdict(float) # clear timingmetrics[train/num_gen_batches] num_gen_batchesbatch Nonenum_prompt_in_batch 0num_gen_batches 0# TODO: make a canonical logger that supports various backendlogger.log(datametrics, stepself.global_steps)if is_last_step:pprint(fFinal validation metrics: {last_val_metrics})progress_bar.close()returnprogress_bar.update(1)self.global_steps 1self.gen_steps 1# check if last step checkpint existscheckpoint_dir os.path.join(self.config.trainer.default_local_dir, fglobal_step_{self.global_steps})if not os.path.exists(checkpoint_dir):# save last step checkpointtiming_raw defaultdict(float)with marked_timer(save_checkpoint, timing_raw, green):self._save_checkpoint()metrics {ftiming/{k}: v for k, v in timing_raw.items()}logger.log(datametrics, stepself.global_steps)这时候咱们再看一下dapo的reward manager实现主要和ppo的区别在于使用了overlong_buffer计算长度的rewardverl/verl/workers/reward_manager/dapo.py################################################################## 这里使用dapo注册了DAPORewardManager因此可以用# reward_manager_cls get_reward_manager_cls(reward_manager_name)得到#################################################################register(dapo)class DAPORewardManager(AbstractRewardManager):The reward manager.def __init__(self,tokenizer,num_examine,compute_scoreNone,reward_fn_keydata_source,max_resp_lenNone,overlong_buffer_cfgNone,) - None:self.tokenizer tokenizerself.num_examine num_examine # the number of batches of decoded responses to print to the consoleself.compute_score compute_score or default_compute_scoreself.reward_fn_key reward_fn_keyself.overlong_buffer_cfg overlong_buffer_cfgself.max_resp_len max_resp_lenif self.overlong_buffer_cfg is not None:assert self.max_resp_len is not None, (fmax_resp_len must be provided if {overlong_buffer_cfg}, but got None)assert self.max_resp_len self.overlong_buffer_cfg.len, (max_resp_len must be larger than overlong_buffer.len)################################################################## DAPO reward manager的主要函数#################################################################def __call__(self, data: DataProto, return_dict: bool False):We will expand this function gradually based on the available datasets# If there is rm score, we directly return rm score. Otherwise, we compute via rm_score_fnif rm_scores in data.batch.keys():if return_dict:reward_extra_keys data.meta_info.get(reward_extra_keys, [])reward_extra_info {key: data.non_tensor_batch[key] for key in reward_extra_keys}return {reward_tensor: data.batch[rm_scores], reward_extra_info: reward_extra_info}else:return data.batch[rm_scores]reward_tensor torch.zeros_like(data.batch[responses], dtypetorch.float32)reward_extra_info defaultdict(list)already_print_data_sources {}for i in range(len(data)):data_item data[i] # DataProtoItemprompt_ids data_item.batch[prompts]prompt_length prompt_ids.shape[-1]######################################################### 值得注意的是。prompt_ids是左填充的# response_ids是右填充的########################################################valid_prompt_length data_item.batch[attention_mask][:prompt_length].sum()valid_prompt_ids prompt_ids[-valid_prompt_length:]response_ids data_item.batch[responses]valid_response_length data_item.batch[attention_mask][prompt_length:].sum()valid_response_ids response_ids[:valid_response_length]# decodeprompt_str self.tokenizer.decode(valid_prompt_ids, skip_special_tokensTrue)response_str self.tokenizer.decode(valid_response_ids, skip_special_tokensTrue)eos_token self.tokenizer.eos_tokenif response_str.endswith(eos_token):response_str response_str[: -len(eos_token)]ground_truth data_item.non_tensor_batch[reward_model][ground_truth]data_source data_item.non_tensor_batch[self.reward_fn_key]extra_info data_item.non_tensor_batch.get(extra_info, {})rollout_reward_scores data_item.non_tensor_batch.get(reward_scores, {})extra_info[rollout_reward_scores] rollout_reward_scoresresult self.compute_score(data_sourcedata_source,solution_strresponse_str,ground_truthground_truth,extra_infoextra_info,)score: floatif isinstance(result, dict):score result[score]# Store the information including original rewardfor key, value in result.items():reward_extra_info[key].append(value)else:score resultreward_extra_info[acc].append(score)reward score######################################################### 这里是overlong reward的计算########################################################if self.overlong_buffer_cfg.enable:overlong_buffer_len self.overlong_buffer_cfg.lenexpected_len self.max_resp_len - overlong_buffer_lenexceed_len valid_response_length - expected_lenoverlong_penalty_factor self.overlong_buffer_cfg.penalty_factoroverlong_reward min(-exceed_len / overlong_buffer_len * overlong_penalty_factor, 0)reward overlong_rewardif self.overlong_buffer_cfg.log:reward_extra_info[overlong_reward].append(overlong_reward)reward_extra_info[overlong].append(overlong_reward 0)reward_tensor[i, valid_response_length - 1] rewardif data_source not in already_print_data_sources:already_print_data_sources[data_source] 0if already_print_data_sources[data_source] self.num_examine:already_print_data_sources[data_source] 1print([prompt], prompt_str)print([response], response_str)print([ground_truth], ground_truth)if isinstance(result, dict):for key, value in result.items():print(f[{key}], value)else:print([score], score)if return_dict:return {reward_tensor: reward_tensor,reward_extra_info: reward_extra_info,}else:return reward_tensor
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

广州市住房建设部网站邢台有限公司

Linux 性能分析与实时编程 在 Linux 系统的开发与维护中,性能分析和实时编程是两个至关重要的领域。性能分析能够帮助我们找出系统或应用程序的瓶颈,而实时编程则确保系统能够在规定的时间内响应事件。下面将详细介绍这两个方面的相关工具和技术。 性能分析工具 Valgrind …

张小明 2026/1/10 13:15:07 网站建设

南宁市网站开发公司电话网络优化工程师需要学什么

如何快速获取1629个高质量书源?阅读3.0终极指南 【免费下载链接】最新1629个精品书源.json阅读3.0 最新1629个精品书源.json阅读3.0 项目地址: https://gitcode.com/open-source-toolkit/d4322 还在为找不到优质书源而烦恼吗?这个开源项目为你准备…

张小明 2026/1/9 17:55:44 网站建设

知名网站定制报价用dede做网站后台

本地AI工具实战指南:如何选择最适合的FlashAI解决方案 【免费下载链接】flashai_vision 项目地址: https://ai.gitcode.com/FlashAI/vision 在数据安全日益重要的今天,企业纷纷寻求既能保护隐私又能高效处理各类文件的AI工具。本文将为你深度解析…

张小明 2026/1/10 4:59:10 网站建设

生鲜网站开发余姚做网站62752762

第一章:启明910芯片与C语言开发概览启明910是一款面向高性能计算与人工智能推理场景的国产AI加速芯片,具备高算力密度与低功耗特性。其架构支持多种编程模型,其中C语言因其贴近硬件的控制能力,成为底层驱动与性能优化开发的重要工…

张小明 2026/1/10 4:36:19 网站建设

杭州 网站建设网站全球网

HTML5 MathML:深入解析数学标记语言在HTML5中的应用 引言 随着互联网技术的不断发展,网页内容日益丰富多样。数学作为自然科学的重要组成部分,其表达和展示在网页上也变得尤为重要。HTML5 MathML应运而生,它为网页开发者提供了一种强大的数学表达式标记语言。本文将深入解…

张小明 2026/1/10 4:44:26 网站建设

网站建设托管网页设计实训报告格式

2025终极指南|5步掌握ezdata:从零到精通的数据处理平台 【免费下载链接】ezdata 基于python开发的数据处理和任务调度系统。 支持数据源管理,数据模型管理,数据集成,数据查询API接口封装,低代码自定义数据处…

张小明 2026/1/10 10:32:26 网站建设