ACK One Argo Workflows 在任务编排中实现动态扇出风扇

小夏 科技 更新 2024-02-07

作者:庄宇。

在工作流编排过程中,为了加快大任务处理效率,可以使用扇出扇入任务编排,将大任务分解为小任务,然后并行运行小任务,最后聚合结果。

从上图可以看出,可以使用DAG(Directed Acyclic Graph)来编排扇出扇入任务,子任务分为静态和动态,分别对应静态DAG和动态DAG。 动态 dag 扇出扇入也可以理解为 mapreduce。 每个子任务都是一个映射,最终的聚合结果是递减的。

静态匕首:拆分子任务分类是固定的,例如,在数据采集场景中,同时采集数据库 1 和数据库 2 的数据,最后对结果进行聚合。

动态匕首:拆分子任务分类是动态的,取决于上一个任务的输出,例如,在一个数据处理场景中,任务A可以扫描待处理的数据集,对每个子数据集(例如一个子目录)开始子任务bn处理,当所有子任务bns完成后,将结果聚合到子任务C中,启动多少个子任务b取决于任务A的输出。 您可以根据实际业务场景,自定义任务A中子任务的拆分规则。

在实际业务场景中,为了加快大任务的执行速度,提高效率,往往需要将一个大任务分解成上千个子任务,为了保证上千个子任务的同时运行,需要调度数以万计的CPU资源。 例如,修改后的算法的回归测试需要由一个子任务针对所有驱动场景运行,开发团队要求所有子场景并行测试,以加快迭代速度。

如果您需要使用动态 DAG 来编排数据处理、计算和科学计算场景中的任务,或者需要调度数万个 CPU 资源来加速任务运行,您可以使用阿里云 Ack One 分布式工作流 Argo 集群

ACK ONE分布式工作流Argo集群,产品化托管Argo Workflow提供售后支持,支持动态DAG扇出扇入任务编排,支持云算力按需调度,利用云弹性调度数万CPU资源并行运行大规模子任务,操作完成后及时资源,减少运行时间,节约成本。 支持数据处理、机器学习、计算、科学计算、CICD等业务场景。

Argo Workflow 是一个开源的 CNCF 毕业项目,专注于云原生领域的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,使用 Kubernetes Pod 在集群中调度运行。

本文介绍如何使用 Argo Workflow 编排动态 DAG 扇出扇入任务。

我们将构建一个动态的 DAG 扇出扇入工作流,在 OSS 中读取一个大日志文件,将其拆分为多个小文件,启动多个子任务来计算每个小文件中的关键字数量(count),最后聚合结果(merge)。

1.创建分布式工作流 ARGO 集群

2.通过附加阿里云 OSS 存储卷,您可以像处理本地文件一样操作阿里云 OSS 上的文件。 参考:工作流使用存储卷

3.使用以下工作流 yaml 创建工作流,请参阅: 创建工作流。有关详细信息,请参阅注释。

apiversion: argoproj.io/v1alpha1kind: workflowmetadata: generatename: dynamic-dag-map-reduce-spec: entrypoint: main # claim a oss pvc, workflow can read/write file in oss through pvc. volumes: -name: workdir persistentvolumeclaim: claimname: pvc-oss # how many tasks to split, default is 5. arguments: parameters: -name: numparts value: "5" templates: -name: main # dag definition. dag: tasks: # split log files to several small files, based on numparts. -name: split template: split arguments: parameters: -name: numparts value: "}" # multiple map task to count words in each small file. -name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" # run as a loop, partid from split task json outputs. withparam: '}' - name: reduce template: reduce arguments: parameters: -name: numparts value: "}" depends: "map" # the `split` task split the big log file to several small files. each file has a unique id (partid). # finally, it dumps a list of partid to stdout as output parameters - name: split inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["split.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # one `map` per partid is started. finds its own "part file" and processes it. -name: map inputs: parameters: -name: partid container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["count.py"] env: -name: part_id value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # the `reduce` task takes the "results directory" and returns a single result. -name: reduce inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["merge.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol outputs: artifacts: -name: result path: /mnt/vol/result.json
4.动态 DAG 实现。

1)拆分大文件后,拆分任务会在标准输出中输出一个json字符串,包括:子任务要处理的partid,例如:

2)map任务使用withparam引用拆分任务的输出,解析json字符串得到所有的},并以每个}作为输入参数启动多个map任务。

- name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" withparam: '}'
有关如何定义它的更多信息,请参阅开源 Argo 工作流文档5.工作流运行后,通过分布式工作流 Argo 集群控制台运行查看任务 DAG 进程和运行结果。

6.阿里云OSS文件列表,log-count-datatxt 是输入日志文件,split-output,cout-output 中间结果目录,resultjson 是最终结果文件。

7.示例中源信息详见aliyuncontainerservice github argo-workflow-examples

Argo Workflow 是一个开源的 CNCF 毕业项目,专注于云原生领域的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo Cluster,产品化托管 Argo Workflow,提供售后支持,加强控制面,实现数万个子任务(Pod)的稳定高效调度和运营,数据面支持云上大规模算力的无服务器调度,无需运维集群或节点,支持云算力按需调度, 利用云弹性,调度数万CPU资源并行运行大规模子任务,减少运行时间,支持数据处理、机器学习、**计算、科学计算、CICD等业务场景。

欢迎加入ACK ONE客户沟通钉钉群与我们交流。 (钉钉群号:35688562)。

1] 阿里云 Ack One 分布式工作流 Argo 集群。

2] argo workflow

3] 创建分布式工作流 argo 集群。

4] 工作流使用存储卷。

5] 创建工作流。

6] 开源 Argo 工作流文档。

walk-through/loops/

7] 分布式工作流 Argo 集群控制台。

8] aliyuncontainerservice github argo-workflow-examples

相似文章

    企业服务 HA 系统审批流程 工作流的产品设计

    为了满足企业业务管控的需要,审批流程存在于各种各样的业务系统中。审批一般分为一般审批和业务审批。一般的审批一般可以通过钉钉 飞书等办公OA自带的审批功能来完成,非常成熟,不用多说业务审批往往与业务文档耦合,如采购订单的审批等,毫无疑问,相当一部分业务审批的可用性相对较差,今天我们就来谈谈如何设计一个...

    智能卧式回转柜产品优势及工作流程

    智能水平转盘是一种现代存储设备,具有许多优点和高效的工作流程。科明智库将为您介绍智能卧式回转柜的产品优势和工作流程。智能卧式旋转柜的产品优势之一是空间利用率高。智能水平转盘采用垂直旋转设计,通过最大限度地利用垂直空间来存放物品。每个柜子都配有多层货架,可以灵活地调节和分配货架的高度,以容纳不同尺寸和...

    日本游戏制造商公布了 AI 工作流程示例

    虽然我们还在期待AI将如何改变生活和工作,但刻板印象中 自给自足 的日本游戏开发者已经 开放 并且已经使用了AI工作流程。无论你是游戏玩家还是业内人士,这篇最新发布的数据分析一定让你有所收获,一起来看看吧来自知名厂商Level 的AI应用在游戏开发和推广中的应用 关于级 首先,我们来介绍一下本文的主...

    5 种 AI 工具,让您的工作流程更智能

    生成式人工智能的热潮始于去年ChatGPT的出现,在短短一年的时间里,该技术已经集成到各种生产力平台中,大大降低了进入门槛和操作我们日常工作流程的难度。我知道,很多朋友在听到 AI在工作场景中 时,首先担心的是会不会被新技术取代。别担心,本文中讨论的工具并不是要取代人类,而是旨在提高工作效率。这些人...

    使用一个易于销售的工作流程,高效处理 N 个复杂的业务场景

    随着企业业务的不断扩展,企业现有的业务流程已无法支撑复杂的业务需求现有流程无法支持日益复杂的业务需求,定制化新业务流程的开发费时费力,业务流程效率低下为了帮助企业更好地适应不断变化的业务需求,SalesEasy推出了工作流功能,支持个性化流程的构建,支持业务的快速发展,为提高企业业务运营效率提供 起...