作者:庄宇。
在工作流编排过程中,为了加快大任务处理效率,可以使用扇出扇入任务编排,将大任务分解为小任务,然后并行运行小任务,最后聚合结果。
从上图可以看出,可以使用DAG(Directed Acyclic Graph)来编排扇出扇入任务,子任务分为静态和动态,分别对应静态DAG和动态DAG。 动态 dag 扇出扇入也可以理解为 mapreduce。 每个子任务都是一个映射,最终的聚合结果是递减的。
静态匕首:拆分子任务分类是固定的,例如,在数据采集场景中,同时采集数据库 1 和数据库 2 的数据,最后对结果进行聚合。
动态匕首:拆分子任务分类是动态的,取决于上一个任务的输出,例如,在一个数据处理场景中,任务A可以扫描待处理的数据集,对每个子数据集(例如一个子目录)开始子任务bn处理,当所有子任务bns完成后,将结果聚合到子任务C中,启动多少个子任务b取决于任务A的输出。 您可以根据实际业务场景,自定义任务A中子任务的拆分规则。
在实际业务场景中,为了加快大任务的执行速度,提高效率,往往需要将一个大任务分解成上千个子任务,为了保证上千个子任务的同时运行,需要调度数以万计的CPU资源。 例如,修改后的算法的回归测试需要由一个子任务针对所有驱动场景运行,开发团队要求所有子场景并行测试,以加快迭代速度。
如果您需要使用动态 DAG 来编排数据处理、计算和科学计算场景中的任务,或者需要调度数万个 CPU 资源来加速任务运行,您可以使用阿里云 Ack One 分布式工作流 Argo 集群
ACK ONE分布式工作流Argo集群,产品化托管Argo Workflow提供售后支持,支持动态DAG扇出扇入任务编排,支持云算力按需调度,利用云弹性调度数万CPU资源并行运行大规模子任务,操作完成后及时资源,减少运行时间,节约成本。 支持数据处理、机器学习、计算、科学计算、CICD等业务场景。
Argo Workflow 是一个开源的 CNCF 毕业项目,专注于云原生领域的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,使用 Kubernetes Pod 在集群中调度运行。
本文介绍如何使用 Argo Workflow 编排动态 DAG 扇出扇入任务。
我们将构建一个动态的 DAG 扇出扇入工作流,在 OSS 中读取一个大日志文件,将其拆分为多个小文件,启动多个子任务来计算每个小文件中的关键字数量(count),最后聚合结果(merge)。
1.创建分布式工作流 ARGO 集群
2.通过附加阿里云 OSS 存储卷,您可以像处理本地文件一样操作阿里云 OSS 上的文件。 参考:工作流使用存储卷
3.使用以下工作流 yaml 创建工作流,请参阅: 创建工作流。有关详细信息,请参阅注释。
apiversion: argoproj.io/v1alpha1kind: workflowmetadata: generatename: dynamic-dag-map-reduce-spec: entrypoint: main # claim a oss pvc, workflow can read/write file in oss through pvc. volumes: -name: workdir persistentvolumeclaim: claimname: pvc-oss # how many tasks to split, default is 5. arguments: parameters: -name: numparts value: "5" templates: -name: main # dag definition. dag: tasks: # split log files to several small files, based on numparts. -name: split template: split arguments: parameters: -name: numparts value: "}" # multiple map task to count words in each small file. -name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" # run as a loop, partid from split task json outputs. withparam: '}' - name: reduce template: reduce arguments: parameters: -name: numparts value: "}" depends: "map" # the `split` task split the big log file to several small files. each file has a unique id (partid). # finally, it dumps a list of partid to stdout as output parameters - name: split inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["split.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # one `map` per partid is started. finds its own "part file" and processes it. -name: map inputs: parameters: -name: partid container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["count.py"] env: -name: part_id value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # the `reduce` task takes the "results directory" and returns a single result. -name: reduce inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["merge.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol outputs: artifacts: -name: result path: /mnt/vol/result.json
4.动态 DAG 实现。
1)拆分大文件后,拆分任务会在标准输出中输出一个json字符串,包括:子任务要处理的partid,例如:
2)map任务使用withparam引用拆分任务的输出,解析json字符串得到所有的},并以每个}作为输入参数启动多个map任务。
- name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" withparam: '}'
有关如何定义它的更多信息,请参阅开源 Argo 工作流文档5.工作流运行后,通过分布式工作流 Argo 集群控制台运行查看任务 DAG 进程和运行结果。
6.阿里云OSS文件列表,log-count-datatxt 是输入日志文件,split-output,cout-output 中间结果目录,resultjson 是最终结果文件。
7.示例中源信息详见aliyuncontainerservice github argo-workflow-examples
Argo Workflow 是一个开源的 CNCF 毕业项目,专注于云原生领域的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,使用 Kubernetes Pod 在集群中调度运行。
阿里云 ACK One 分布式工作流 Argo Cluster,产品化托管 Argo Workflow,提供售后支持,加强控制面,实现数万个子任务(Pod)的稳定高效调度和运营,数据面支持云上大规模算力的无服务器调度,无需运维集群或节点,支持云算力按需调度, 利用云弹性,调度数万CPU资源并行运行大规模子任务,减少运行时间,支持数据处理、机器学习、**计算、科学计算、CICD等业务场景。
欢迎加入ACK ONE客户沟通钉钉群与我们交流。 (钉钉群号:35688562)。
1] 阿里云 Ack One 分布式工作流 Argo 集群。
2] argo workflow
3] 创建分布式工作流 argo 集群。
4] 工作流使用存储卷。
5] 创建工作流。
6] 开源 Argo 工作流文档。
walk-through/loops/
7] 分布式工作流 Argo 集群控制台。
8] aliyuncontainerservice github argo-workflow-examples