南京建网站公司,企业推广专员招聘,怎么在wordpress建站,网站首页轮播图片如果问 mapreduce 和 spark 什么关系#xff0c;或者说有什么共同属性#xff0c;你可能会回答他们都是大数据处理引擎。如果问 spark 与 tensorflow 呢#xff0c;就可能有点迷糊#xff0c;这俩关注的领域不太一样啊。但是再问 spark 与 MPI 呢#xff1f;这个就更远了。…如果问 mapreduce 和 spark 什么关系或者说有什么共同属性你可能会回答他们都是大数据处理引擎。如果问 spark 与 tensorflow 呢就可能有点迷糊这俩关注的领域不太一样啊。但是再问 spark 与 MPI 呢这个就更远了。虽然这样问多少有些不严谨但是它们都有共同的一部分这就是我们今天谈论的一个话题一个比较大的话题分布式计算框架。
不管是 mapreduce还是 spark 亦或 tensorflow它们都是利用分布式的能力运行某些计算解决一些特定的问题。从这个 level 讲它们都定义了一种“分布式计算模型”即提出了一种计算的方法通过这种计算方法就能够解决大量数据的分布式计算问题。它们的区别在于提出的分布式计算模型不同。Mapreduce 正如其名是一个很基本的 map-reduce 式的计算模型好像没说一样。Spark 定义了一套 RDD 模型本质上是一系列的 map/reduce 组成的一个 DAG 图。Tensorflow 的计算模型也是一张图但是 tensorflow 的图比起 spark 来显得更“复杂”一点。你需要为图中的每个节点和边作出定义。根据这些定义可以指导 tensorflow 如何计算这张图。Tensorflow 的这种具体化的定义使它比较适合处理特定类型的的计算对 tensorflow 来讲就是神经网络。而 spark 的 RDD 模型使它比较适合那种没有相互关联的的数据并行任务。那么有没有一种通用的、简单的、性能还高的分布式计算模型我觉着挺难。通用往往意味着性能不能针对具体情形作出优化。而为专门任务写的分布式任务又做不到通用当然也做不到简单。
插一句题外话分布式计算模型有一块伴随的内容就是调度。虽然不怎么受关注但这是分布式计算引擎必备的东西。mapreduce 的调度是 yarnspark 的调度有自己内嵌的调度器tensorflow 也一样。MPI 呢它的调度就是几乎没有调度一切假设集群有资源靠 ssh 把所有任务拉起来。调度实际上应当分为资源调度器和任务调度器。前者用于向一些资源管理者申请一些硬件资源后者用于将计算图中的任务下发到这些远程资源进行计算其实也就是所谓的两阶段调度。近年来有一些 TensorflowOnSpark 之类的项目。这类项目的本质实际上是用 spark 的资源调度加上 tensorflow 的计算模型。
当我们写完一个单机程序而面临数据量上的问题的时候一个自然的想法就是我能不能让它运行在分布式的环境中如果能够不加改动或稍加改动就能让它分布式化那就太好了。当然现实是比较残酷的。通常情况下对于一个一般性的程序用户需要自己手动编写它的分布式版本利用比如 MPI 之类的框架自己控制数据的分发、汇总自己对任务的失败做容灾通常没有容灾。如果要处理的目标是恰好是对一批数据进行批量化处理那么 可以用 mapreduce 或者 spark 预定义的 api。对于这一类任务计算框架已经帮我们把业务之外的部分脚手架代码做好了。同样的如果我们的任务是训练一个神经网络那么用 tensorflow pytorch 之类的框架就好了。这段话的意思是如果你要处理的问题已经有了对应框架那么拿来用就好了。但是如果没有呢除了自己实现之外有没有什么别的办法呢
今天注意到一个项目Ray声称你只需要稍微修改一下你的代码就能让它变为分布式的实际上这个项目早就发布了只是一直没有刻意关注它。当然这个代码仅局限于 python比如下面这个例子
| **Basic Python** | **Distributed with Ray** |
----------------------------------------------------------------------------------------------------
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| | ray.remote |
| def f(): | def f(): |
| time.sleep(1) | time.sleep(1) |
| return 1 | return 1 |
| | |
| | |
| | ray.init() |
| results [f() for i in range(4)] | results ray.get([f.remote() for i in range(4)]) |
----------------------------------------------------------------------------------------------------
这么简单这样笔者想到了 openmp注意不是 openmpi。来看看
#includeiostream
#includeomp.husing namespace std;void main() {
#pragma omp parallel forfor(int i 0; i 10; i) {cout Test endl;}system(pause);
}
把头文件导入添加一行预处理指令就可以了这段代码立马变为并行执行。当然 openmp 不是分布式只是借助编译器将代码中需要并行化的部分编译为多线程运行本身还是一个进程因此其并行度收到 CPU 线程数量所限。如果 CPU 是双线程那只能 2 倍加速。在一些服务器上CPU 可以是单核 32 线程自然能够享受到 32 倍加速被并行化的部分。不过这些都不重要在用户看来Ray 的这个做法和 openmp 是不是有几分相似之处你不需要做过多的代码改动就能将代码变为分布式执行当然 openmp 要更绝一点因为对于不支持 openmp 的编译器它就是一行注释而已。
那么 Ray 是怎么做到这一点的呢其实 Ray 的做法说起来也比较简单就是定义了一些 API类似于 MPI 中的定义的通信原语。使用的时候将这些 API “注入”到代码合适的位置那么代码就变成了用户代码夹杂着一些 Ray 框架层的 API 调用整个代码实际上就形成了一张计算图。接下来的事情就是等待 Ray 把这张计算图完成返回就好了。Ray 的论文给了个例子
ray.remote
def create_policy():# Initialize the policy randomly.return policy
ray.remote(num_gpus1)
class Simulator(object):def __init__(self):# Initialize the environment.self.env Environment()def rollout(self, policy, num_steps):observations []observation self.env.current_state()for _ in range(num_steps):action policy(observation)observation self.env.step(action)observations.append(observation)return observations
ray.remote(num_gpus2)
def update_policy(policy, *rollouts):# Update the policy.return policy
ray.remote
def train_policy():# Create a policy.policy_id create_policy.remote()# Create 10 actors.simulators [Simulator.remote() for _ in range(10)]# Do 100 steps of training.for _ in range(100):# Perform one rollout on each actor.rollout_ids [s.rollout.remote(policy_id)for s in simulators]# Update the policy with the rollouts.policy_id update_policy.remote(policy_id, *rollout_ids)return ray.get(policy_id)生成的计算图为 所以用户要做的事情就是在自己的代码里加入适当的 Ray API 调用然后自己的代码就实际上变成了一张分布式计算图了。作为对比我们再来看看 tensorflow 对图的定义
import tensorflow as tf
# 创建数据流图y W * x b其中W和b为存储节点x为数据节点。
x tf.placeholder(tf.float32)
W tf.Variable(1.0)
b tf.Variable(1.0)
y W * x b
with tf.Session() as sess:tf.global_variables_initializer().run() # Operation.runfetch y.eval(feed_dict{x: 3.0}) # Tensor.evalprint(fetch) # fetch 1.0 * 3.0 1.0输出
4.0可以看出tensorflow 中是自己需要自己显式的、明确的定义出图的节点placeholder Variable 等等这些都是图节点的具体类型而 Ray 中图是以一种隐式的方式定义的。我认为后者是一种更自然的方式站在开发者的角度看问题而前者更像是为了使用 tensorflow 把自己代码逻辑去适配这个轮子。
那么 ray 是不是就我们要寻找的那个即通用、又简单、还灵活的分布式计算框架呢由于笔者没有太多的 ray 的使用经验这个问题不太好说。从官方介绍来看有限的几个 API 确实是足够简单的。仅靠这几个 API 能不能达成通用且灵活的目的还不好讲。本质上来说Tensorflow 对图的定义也足够 General但是它并不是一个通用的分布式计算框架。由于某些问题不在于框架而在于问题本身的分布式化就存在困难所以试图寻求一种通用分布式计算框架解决单机问题可能是个伪命题。
话扯远了。假设 ray 能够让我们以一种比较容易的方式分布式地执行程序那么会怎么样呢前不久 Databricks 开源了一个新项目Koalas试图以 RDD 的框架并行化 pandas。由于 pandas 的场景是数据分析和 spark 面对的场景类似两者的底层存储结构、概念也是很相似的因此用 RDD 来分布式化 pandas 也是可行的。我想如果 ray 足够简单好用在 pandas 里加一些 ray 的 api 调用花费的时间精力可能会远远小于开发一套 koalas。但是在 pandas 里加 ray 就把 pandas 绑定到了 ray 上即便单机也是这样因为 ray 做不到像 openmp 那样如果支持很好不支持也不影响代码运行。
啰嗦这么多其实就想从这么多引擎的细节中跳出来思考一下到底什么是分布式计算框架每种框架又是设计的解决什么问题有什么优缺点。最后拿大佬的一个观点结束本文。David Patterson 在演讲 “New Golden Age For Computer Architecture” 中提到通用硬件越来越逼近极限要想要达到更高的效率我们需要设计面向领域的架构Domain Specific Architectures。这是一个计算架构层出不穷的时代每种架构都是为了解决其面对的领域问题出现的必然包含对其问题的特殊优化。通用性不是用户解决问题的出发点而更多的是框架设计者的“一厢情愿”用户关注的永远是领域问题。从这个意义上讲面向领域的计算架构应该才是正确的方向。
原文链接 本文为云栖社区原创内容未经允许不得转载。