国外大型网站什么是虚拟网站

张小明 2026/1/3 12:08:40
国外大型网站,什么是虚拟网站,建网站 南京,企业网站的模式前文我们介绍了 Flink 的四种执行图#xff0c;并且通过源码了解了 Flink 的 StreamGraph 是怎么生成的#xff0c;本文我们就一起来看下 Flink 的另一种执行图——JobGraph 是如何生成的。 StreamGraph 和 JobGraph 的区别 在正式开始之前#xff0c;我们再来回顾一下 Stre…前文我们介绍了 Flink 的四种执行图并且通过源码了解了 Flink 的 StreamGraph 是怎么生成的本文我们就一起来看下 Flink 的另一种执行图——JobGraph 是如何生成的。StreamGraph 和 JobGraph 的区别在正式开始之前我们再来回顾一下 StreamGraph 和 JobGraph 的区别。假设我们的任务是建造一座大楼StreamGraph 就像是设计蓝图它描述了每个窗户、每根水管的位置和规格而 JobGraph 像是给到施工队的施工流程图它描述了每个任务模块例如先把地基浇筑好再铺设管线等。总的来说JobGraph 更偏向执行层面它是由 StreamGraph 优化而来。回到 Flink 本身我们通过一个表格来了解两个图的区别。StreamGraphJobGraph生成阶段客户端执行 execute() 时客户端提交前由 StreamGraph 转换生成抽象层级高层逻辑图直接对应 API优化后的执行图为调度做准备核心优化无主要是算子链优化节点StreamNodeJobVertex边StreamEdgeJobEdge提交对象无提交给 JobManager包含资源无包含执行作业所需的 Jar 包、依赖库和资源文件JobVertexJobGraph 中的节点是 JobVertex在 StreamGraph 转换成 JobGraph 的过程中会将多个节点串联起来最终生成 JobVertex。JobVertex包含以下成员变量我们分别看一下这些成员变量及其作用。1、标识符相关// JobVertex的id在作业执行过程中的唯一标识。监控、调度和故障恢复都会使用privatefinalJobVertexIDid;// operator id列表按照深度优先顺序存储。operator 的管理、状态分配都会用到privatefinalListOperatorIDPairoperatorIDs;2、输入输出相关// 定义所有的输入边privatefinalListJobEdgeinputsnewArrayList();// 定义所有的输出数据集privatefinalMapIntermediateDataSetID,IntermediateDataSetresultsnewLinkedHashMap();// 输入分片源主要用于批处理作业定义如何将数据分成多个片privateInputSplitSource?inputSplitSource;3、执行配置相关// 并行度即运行时拆分子任务数量默认使用全局配置privateintparallelismExecutionConfig.PARALLELISM_DEFAULT;// 最大并行度privateintmaxParallelismMAX_PARALLELISM_DEFAULT;// 存储运行时实际执行的类使 Flink 可以灵活处理不同类型的操作符// 流任务可以是org.apache.flink.streaming.runtime.tasks.StreamTask// 批任务可以是org.apache.flink.runtime.operators.BatchTaskprivateStringinvokableClassName;// 自定义配置privateConfigurationconfiguration;// 是否是动态设置并发度privatebooleandynamicParallelismfalse;// 是否支持优雅停止privatebooleanisStoppablefalse;4、资源管理相关// JobVertex 最小资源需求privateResourceSpecminResourcesResourceSpec.DEFAULT;// JobVertex 推荐资源需求privateResourceSpecpreferredResourcesResourceSpec.DEFAULT;// 用于资源优化运行不同的 JobVertex 的子任务运行在同一个 slotNullableprivateSlotSharingGroupslotSharingGroup;// 需要严格共址的 JobVertex 组每个 JobVertex 的第 n 个子任务运行在同一个 TaskManagerNullableprivateCoLocationGroupImplcoLocationGroup;5、协调器// 操作符协调器用于处理全局协调逻辑privatefinalListSerializedValueOperatorCoordinator.ProvideroperatorCoordinatorsnewArrayList();6、显示和描述信息// JobVertex 的名称privateStringname;// 操作符名称比如 Flat Map 或 JoinprivateStringoperatorName;// 操作符的描述比如 Hash Join 或 Sorted Group ReduceprivateStringoperatorDescription;// 提供比 name 更友好的描述信息privateStringoperatorPrettyName;7、状态和行为标志// 是否支持同一个子任务并发多次执行privatebooleansupportsConcurrentExecutionAttemptstrue;// 标记并发度是否被显式设置privatebooleanparallelismConfiguredfalse;// 是否有阻塞型输出privatebooleananyOutputBlockingfalse;8、缓存数据集// 存储该 JobVertex 需要消费的缓存中间数据集的 ID可提高作业执行效率privatefinalListIntermediateDataSetIDintermediateDataSetIdsToConsumenewArrayList();JobEdge在 StreamGraph 中StreamEdge 是连接 StreamNode 的桥梁。在 JobGraph 中与之对应的是 JobEdge不同点在于 JobEdge 中保存的是输入节点和输出结果。1、连接关系成员// 定义数据流向哪个 JobVertexprivatefinalJobVertextarget;// 定义这条边的源数据privatefinalIntermediateDataSetsource;// 输入类型的编号privatefinalinttypeNumber;// 多个输入间的键是否相关如果为 true相同键的数据在一个输入被分割时在其他数据对应的记录也会发送到相同的下游节点privatefinalbooleaninterInputsKeysCorrelated;// 同一输入内相同的键是否必须发送到同一下游任务privatefinalbooleanintraInputKeyCorrelated;2、数据分发模式// 定义数据在并行任务期间的分发模式// 可能值// ALL_TO_ALL全连接每个上游子任务连接所有下游任务// POINTWISE点对点连接一对一或一对多的本地连接privatefinalDistributionPatterndistributionPattern;3、数据传输策略// 是否为广播连接privatefinalbooleanisBroadcast;// 是否为 forward 连接forward 连接最高效直接转发无需序列化网络传输privatefinalbooleanisForward;// 数据传输策略名称用于显示privateStringshipStrategyName;4、状态重分布映射器// 下游状态重分布映射器当作业扩容时决定是否重新分配下游算子的持久化状态privateSubtaskStateMapperdownstreamSubtaskStateMapperSubtaskStateMapper.ROUND_ROBIN;// 上游状态重分布映射器当作业扩容时决定是否重新分配上游算子的持久化状态privateSubtaskStateMapperupstreamSubtaskStateMapperSubtaskStateMapper.ROUND_ROBIN;5、描述和缓存信息// 预处理操作的名称privateStringpreProcessingOperationName;// 操作符级别缓存的描述privateStringoperatorLevelCachingDescription;StreamGraph 转换成 JobGraph现在我们再来看一下 StreamGraph 是如何转换成 JobGraph 的。转换逻辑的入口是 StreamGraph.getJobGraph 方法。它只是调用了 StreamingJobGraphGenerator.createJobGraph核心逻辑在 createJobGraph 方法中。privateJobGraphcreateJobGraph(){// 预验证检查 StreamGraph 配置正确性preValidate(streamGraph,userClassloader);// 【核心】链化操作符setChaining();if(jobGraph.isDynamic()){// 支持动态扩缩容场景为动态图设置并行度setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// setVertexParallelismsForDynamicGraphIfNecessary may affect the parallelism of job// vertices and partition-reusefinalMapInteger,MapStreamEdge,NonChainedOutputopIntermediateOutputsnewHashMap();// 设置不能链化的输出边setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs,jobVertexBuildContext);setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);// 设置物理边连接setPhysicalEdges(jobVertexBuildContext);// 设置支持并发执行的 JobVertexmarkSupportingConcurrentExecutionAttempts(jobVertexBuildContext);// 验证混合 shuffle 模式只在批处理模式下使用validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);// 设置 Slot 共享和协同定位setSlotSharingAndCoLocation(jobVertexBuildContext);// 设置托管内存比例setManagedMemoryFraction(jobVertexBuildContext);// 为 JobVertex 名称添加前缀addVertexIndexPrefixInVertexName(jobVertexBuildContext,newAtomicInteger(0));// 设置操作符描述信息setVertexDescription(jobVertexBuildContext);// Wait for the serialization of operator coordinators and stream config.// 序列化操作符协调器和流配置serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor,jobVertexBuildContext);returnjobGraph;}可以看到在 createJobGraph 方法中调用了 setChaining 方法即进行链化操作。这也是 JobGraph 最核心的优化之一。下面我们来看一下具体怎么做链化。privatevoidsetChaining(){// we separate out the sources that run as inputs to another operator (chained inputs)// from the sources that needs to run as the main (head) operator.finalMapInteger,OperatorChainInfochainEntryPointsbuildChainedInputsAndGetHeadInputs();finalCollectionOperatorChainInfoinitialEntryPointschainEntryPoints.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList());// iterate over a copy of the values, because this map gets concurrently modifiedfor(OperatorChainInfoinfo:initialEntryPoints){createChain(info.getStartNodeId(),1,// operators start at position 1 because 0 is for chained source inputsinfo,chainEntryPoints,true,serializationExecutor,jobVertexBuildContext,null);}}setChaining 方法中主要分为两步第一步是处理 Source 节点将可以链化的 Source 和不能链化的 Source 节点分开。先来看如何判断一个 Source 是否可被链化。publicstaticbooleanisChainableSource(StreamNodestreamNode,StreamGraphstreamGraph){// 最基本的一些判空输出边数量为1if(streamNode.getOperatorFactory()null||!(streamNode.getOperatorFactory()instanceofSourceOperatorFactory)||streamNode.getOutEdges().size()!1){returnfalse;}finalStreamEdgesourceOutEdgestreamNode.getOutEdges().get(0);finalStreamNodetargetstreamGraph.getStreamNode(sourceOutEdge.getTargetId());finalChainingStrategytargetChainingStrategyPreconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy();// 链化策略必须 HEAD_WITH_SOURCES输出边是可链化的returntargetChainingStrategyChainingStrategy.HEAD_WITH_SOURCESisChainableInput(sourceOutEdge,streamGraph,false);}privatestaticbooleanisChainableInput(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodeupStreamVertexstreamGraph.getSourceVertex(edge);StreamNodedownStreamVertexstreamGraph.getTargetVertex(edge);if(!(streamGraph.isChainingEnabled()// 上下游节点是否在同一个 slot 共享组upStreamVertex.isSameSlotSharingGroup(downStreamVertex)// 操作符是否可以链化主要做并行度检查areOperatorsChainable(upStreamVertex,downStreamVertex,streamGraph,allowChainWithDefaultParallelism)// 分区器和交换模式是否支持链化arePartitionerAndExchangeModeChainable(edge.getPartitioner(),edge.getExchangeMode(),streamGraph.isDynamic()))){returnfalse;}// check that we do not have a union operation, because unions currently only work// through the network/byte-channel stack.// we check that by testing that each type (which means input position) is used only once// 检查是否为 Union 操作Union 操作不能链化for(StreamEdgeinEdge:downStreamVertex.getInEdges()){if(inEdge!edgeinEdge.getTypeNumber()edge.getTypeNumber()){returnfalse;}}returntrue;}Source 的链化条件主要就是这些我们结合一些例子来看一下。Source(并行度4) - Map(并行度4) - Filter(并行度4) Source - Map 边 1. isChainingEnabled() true 2. isSameSlotSharingGroup() true (都在默认组) 3. areOperatorsChainable() true (Source可链化Map是HEAD_WITH_SOURCES) 4. arePartitionerAndExchangeModeChainable() true (ForwardPartitioner) 5. Union检查通过 结果可链化 Map - Filter 边 1. isChainingEnabled() true 2. isSameSlotSharingGroup() true 3. areOperatorsChainable() true (Map和Filter都是ALWAYS) 4. arePartitionerAndExchangeModeChainable() true (ForwardPartitioner) 5. Union检查通过 结果可链化 最终Source - Map - Filter 三者链化到一个JobVertex中 Source(并行度2) - Map(并行度4) // 并行度不匹配 Source - Map 边 1. isChainingEnabled() true 2. isSameSlotSharingGroup() true 3. areOperatorsChainable() false (并行度不匹配) 结果不可链化需要网络传输 Source1 --\ Union - Map Source2 --/ Source1 - Union 边 虽然满足前4个条件但Union节点有两个输入边typeNumber相同 Union检查失败不可链化得到了所有入口之后就可以进行后续节点的链化操作了它的逻辑在 createChain 方法中。这里主要是一个递归过程先将节点的输出边分为可链化和不可链化两个 list之后对可链化的边进行递归调用链化。对不可链化的边需要创建出新的链。由于篇幅原因这里只贴一部分核心的代码publicstaticListStreamEdgecreateChain(finalIntegercurrentNodeId,finalintchainIndex,finalOperatorChainInfochainInfo,finalMapInteger,OperatorChainInfochainEntryPoints,finalbooleancanCreateNewChain,finalExecutorserializationExecutor,finalJobVertexBuildContextjobVertexBuildContext,finalNullableConsumerIntegervisitedStreamNodeConsumer){......// 拆分可链化边和不可链化边for(StreamEdgeoutEdge:currentNode.getOutEdges()){if(isChainable(outEdge,streamGraph)){chainableOutputs.add(outEdge);}else{nonChainableOutputs.add(outEdge);}}// 处理可链化边for(StreamEdgechainable:chainableOutputs){StreamNodetargetNodestreamGraph.getStreamNode(chainable.getTargetId());AttributetargetNodeAttributetargetNode.getAttribute();if(isNoOutputUntilEndOfInput){if(targetNodeAttribute!null){targetNodeAttribute.setNoOutputUntilEndOfInput(true);}}transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex1,chainInfo,chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer));// Mark upstream nodes in the same chain as outputBlockingif(targetNodeAttribute!nulltargetNodeAttribute.isNoOutputUntilEndOfInput()){currentNodeAttribute.setNoOutputUntilEndOfInput(true);}}// 处理不可链化边for(StreamEdgenonChainable:nonChainableOutputs){transitiveOutEdges.add(nonChainable);// Used to control whether a new chain can be created, this value is true in the// full graph generation algorithm and false in the progressive generation// algorithm. In the future, this variable can be a boolean type function to adapt// to more adaptive scenarios.if(canCreateNewChain){createChain(nonChainable.getTargetId(),1,// operators start at position 1 because 0 is for chained source// inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k)-chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer);}}// 创建 JobVertexStreamConfigconfig;if(currentNodeId.equals(startNodeId)){JobVertexjobVertexjobVertexBuildContext.getJobVertex(startNodeId);if(jobVertexnull){jobVertexcreateJobVertex(chainInfo,serializationExecutor,jobVertexBuildContext);}confignewStreamConfig(jobVertex.getConfiguration());}else{confignewStreamConfig(newConfiguration());}// 判断是否为起始节点如果不是将对应的配置信息存到链化起始节点的 key 中if(currentNodeId.equals(startNodeId)){chainInfo.setTransitiveOutEdges(transitiveOutEdges);jobVertexBuildContext.addChainInfo(startNodeId,chainInfo);config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setTransitiveChainedTaskConfigs(jobVertexBuildContext.getChainedConfigs().get(startNodeId));}else{config.setChainIndex(chainIndex);StreamNodenodestreamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());jobVertexBuildContext.getOrCreateChainedConfig(startNodeId).put(currentNodeId,config);}......}是否可链化依赖于 isChainable 方法的结果。它主要判断了下游的输入边数量是否为1然后调用了 isChainableInput这个方法我们刚刚已经看过了。publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph){returnisChainable(edge,streamGraph,false);}publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodedownStreamVertexstreamGraph.getTargetVertex(edge);returndownStreamVertex.getInEdges().size()1isChainableInput(edge,streamGraph,allowChainWithDefaultParallelism);}总结本文我们主要介绍了生成 JobGraph 的相关代码。首先了解了 JobGraph 中的节点和边对应的类以及它们和 StreamGraph 中的类的映射关系。然后又看了生成 JobGraph 的核心代码其中重点学习了链化相关的代码。最后补充一个生成 JobGraph 的调用链路感兴趣的同学可以看下。clusterClient.submitJob()→MiniCluster.submitJob()→Dispatcher.submitJob()→JobMasterServiceLeadershipRunnerFactory→DefaultJobMasterServiceFactory→JobMaster→DefaultSchedulerFactory.createInstance()→StreamGraph.getJobGraph()
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

装修公司网站设计规划福州网站建设推广平台

从驱动安装到量产烧录:J-Link J-Flash 实战全解析 你有没有遇到过这样的场景?新项目刚上电,J-Flash 却提示“无法连接目标”;产线批量烧录时,每块板子要等半分钟,效率低得让人抓狂;甚至在调试…

张小明 2025/12/30 21:44:28 网站建设

昆明网页建站模板嘉兴做网站费用

BiliRaffle:B站动态抽奖自动化解决方案 【免费下载链接】BiliRaffle B站动态抽奖组件 项目地址: https://gitcode.com/gh_mirrors/bi/BiliRaffle 还在为B站动态抽奖的繁琐流程而烦恼吗?手动统计参与者、筛选有效用户、随机抽取中奖者,…

张小明 2025/12/31 7:55:22 网站建设

有网站怎样做推广网站建设技术部职责

多用户环境下如何稳住Multisim数据库路径?实战避坑全解析你有没有遇到过这样的场景:团队里某位同事打开一个原本正常的Multisim工程,突然满屏“未知器件”,仿真跑不起来,报错提示:“multisim数据库未找到”…

张小明 2025/12/30 20:21:37 网站建设

中华商标交易网官方网站devexpress网站开发

计算机系统全方位指南:从基础操作到安全维护 1. 账户管理与安全设置 在计算机使用中,账户管理是保障系统安全的重要环节。可以创建不同类型的账户,如管理员账户、标准用户账户、来宾账户和 Live ID 账户等。创建账户时,需注意设置强密码,遵循包含字母、数字和符号的原则…

张小明 2025/12/31 14:26:22 网站建设

成都大型网站建设公司排名php5 mysql网站开发基础与应用

第一章:Azure CLI 量子作业的提交日志在使用 Azure Quantum 进行量子计算开发时,通过 Azure CLI 提交量子作业是核心操作之一。提交日志不仅记录了作业的执行状态,还提供了调试和性能分析的关键信息。掌握如何查看和解析这些日志,…

张小明 2025/12/31 23:54:13 网站建设

哪个网站做民宿更好呢工商注册费用多少钱

目录具体实现截图项目介绍论文大纲核心代码部分展示项目运行指导结论源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作具体实现截图 本系统(程序源码数据库调试部署讲解)同时还支持java、ThinkPHP、Node.js、Spring B…

张小明 2025/12/31 12:25:18 网站建设