spark GraphX 图计算 介绍 教程 入门 手册 调研

更新时间:2023-09-24 18:03:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

第一章 Graphx

参考:http://book.51cto.com/art/201408/450049.htmSpark+GraphX大规模图计算和图挖掘(V3.0)

本章旨在介绍图计算、Spark GraphX和梳理GraphX学习时的关键知识结构。

1.1 分布式计算

1.1.1 分布式图计算框架的目的

将对于巨型图的各种操作包装为简单的接口,让分布式存储、并行计算等复杂问题对上层透明,从而使复杂网络和图算法的工程师,更加聚焦在图相关的模型设计和使用上,而不用关心底层的分布式细节。

需要解决两个通用问题:图存储模式和图计算模式。 1.1.2 图存储模式

巨型图的存储总体上有边分割和点分割两种存储方式。2013年,GraphLab2.0将其存储方式由边分割变为点分割,在性能上取得重大提升,目前基本上被业界广泛接受并使用。

1.1.2.1 边分割

每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大。

1.1.2.2 点分割

每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量。

现在是点分割占上风,各种分布式图计算框架都将自己底层的存储形式变成了点分割。主要原因有以下两个。

磁盘价格下降,存储空间不再是问题,而内网的通信资源没有突破性进展,集群计算时内网带宽是宝贵的,时间比磁盘更珍贵。这点就类似于常见的空间换时间的策略。

在当前的应用场景中,绝大多数网络都是“无尺度网络”,遵循幂律分布,不同点的邻居数量相差非常悬殊。而边分割会使那些多邻居的点所相连的边大多数被分到不同的机器上,这样的数据分布会使得内网带宽更加捉襟见肘,于是边分割存储方式被渐渐抛弃了。

1.1.3 图计算模型

1

目前的图计算框架基本上都遵循BSP(Bulk Synchronous Parallell)计算模式。在BSP中,一次计算过程由一系列全局超步组成,每一个超步由并发计算、通信和栅栏同步三个步骤组成。同步完成,标志着这个超步的完成及下一个超步的开始。BSP模式很简洁。基于BSP模式,目前有两种比较成熟的图计算模型。

1.1.3.1 Pregel模型——像顶点一样思考

2010年,Google的新的三架马车Caffeine、Pregel、Dremel发布。随着Pregel一起,BSP模型广为人知。Pregel借鉴MapReduce的思想,提出了“像顶点一样思考”(Think Like A Vertex)的图计算模式,让用户无需考虑并行分布式计算的细节,只需要实现一个顶点更新函数,让框架在遍历顶点时进行调用即可。

这个模型虽然简洁,但很容易发现它的缺陷。对于邻居数很多的顶点,它需要处理的消息非常庞大,而且在这个模式下,它们是无法被并发处理的。所以对于符合幂律分布的自然图,这种计算模型下很容易发生假死或者崩溃。

1.1.3.2 GAS模型——邻居更新模型

相比Pregel模型的消息通信范式,GraphLab的GAS模型更偏向共享内存风格。它允许用户的自定义函数访问当前顶点的整个邻域,可抽象成Gather、Apply和Scatter三个阶段,简称为GAS。相对应,用户需要实现三个独立的函数gather、apply和scatter。

由于gather/scatter函数是以单条边为操作粒度,所以对于一个顶点的众多邻边,可以分别由相应的worker独立调用gather/scatter函数。这一设计主要是为了适应点分割的图存储模式,从而避免Pregel模型会遇到的问题。

1.1.3.3 Giraph

Giraph是apache基金会开源项目之一,被定义为迭代式图处理系统。他架构在hadoop之上,提供了图处理接口,专门处理大数据的图问题。

Giraph计算的输入是由点和直连的边组成的图。例如,点可以表示人,边可以表示朋友请求。每个顶点保存一个值,每个边也保存一个值。输入不仅取决于图的拓扑逻辑,也包括定点和边的初始值。

Giraph应用还需要考虑Hadoop的兼容性。 Giraph的存在很有必要,现在的大数据的图问题又很多,例如表达人与人之间的关系的有社交网络,搜索引擎需要经常计算网页与网页之间的关系,而map-reduce接口不太适合实现图算法。构成图的基本元素有哪些?

构成图的基本元素有顶点和边,边可以有权重和方向。Giraph如何表示图?

Giraph里同样也有顶点和边,Giraph中的顶点也是有值的,可以用于保存计算的中间结果。在Giraph图处理模型中,顶点是一切的中心,运算也是围绕着顶点进行的,顶点可以

2

错误!文档中没有指定样式的文字。

收发消息,这里的消息跟通讯中的消息不同,这里的消息指的是从上一个计算步骤接收信息,在顶点进行计算,并把计算好的结果通过消息发给下一个顶点。而顶点计算的输入数据就是输入的消息、顶点的当前的值。一个顶点可以有多条输入的消息,也可以有多条输出的消息,把这些消息发给与当前顶点相连的边的另一端的顶点。Giraph的应用:用于分析用户或者内容之间的联系或重要性。 1.1.3.4 GraphX

在设计GraphX时,点分割和GAS都已成熟,并在设计和编码中针对它们进行了优化,在功能和性能之间寻找最佳的平衡点。如同Spark本身,每个子模块都有一个核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。

如同Spark,GraphX的代码非常简洁。GraphX的核心代码只有3千多行,而在此之上实现的Pregel模型,只要短短的20多行。GraphX的代码结构其中大部分的实现,都是围绕Partition的优化进行的。这在某种程度上说明了点分割的存储和相应的计算优化,的确是图计算框架的重点和难点。

可以看到,GraphX设计这个模型的用意。它综合了Pregel和GAS两者的优点,即接口相对简单,又保证性能,可以应对点分割的图存储模式,胜任符合幂律分布的自然图的大型计算。

GraphX也提供了一套图算法工具包,方便用户对图进行分析。

1.1.3.5 GraphLab

GraphLab是面向机器学习的流处理并行框架。GraphLab将数据抽象成Graph结构,将算法的执行过程抽象成Gather、Apply、Scatter三个步骤。其并行的核心思想是对顶点的切分。

数据模型:GRAPH

顶点是其最小并行粒度和通信粒度,边是机器学习算法中数据依赖性的表现方式。

对于某个顶点,其被部署到多台机器,一台机器作为master顶点,其余机器上作为mirror。Master作为所有mirror的管理者,负责给mirror安排具体计算任务;mirror作为该顶点在各台机器上的代理执行者,与master数据的保持同步。

对于某条边,GraphLab将其唯一部署在某一台机器上,而对边关联的顶点进行多份存储,解了边数据量大的问题。

同一台机器上的所有edge(边)和vertex(顶点)构成local graph,在每台机器上,存在本地id到全局id的映射表。vertex是一个进程上所有线程共享的,在并行计算过程中,各个线程分摊进程中所有顶点的gather->apply->scatter操作。

执行模型:GATHER-APPLY-SCATTER

每个顶点每一轮迭代经过gather->apple->scatter三个阶段。

1) Gather阶段

3

工作顶点的边 (可能是所有边,也有可能是入边或者出边)从领接顶点和自身收集数据,记为gather_data_i,各个边的数据graphlab会求和,记为sum_data。这一阶段对工作顶点、边都是只读的。

2) Apply阶段

Mirror将gather计算的结果sum_data发送给master顶点,master进行汇总为total。Master利用total和上一步的顶点数据,按照业务需求进行进一步的计算,然后更新master的顶点数据,并同步mirror。Apply阶段中,工作顶点可修改,边不可修改。

3) Scatter阶段

工作顶点更新完成之后,更新边上的数据,并通知对其有依赖的邻结顶点更新状态。这scatter过程中,工作顶点只读,边上数据可写。

在执行模型中,graphlab通过控制三个阶段的读写权限来达到互斥的目的。在gather阶段只读,apply对顶点只写,scatter对边只写。并行计算的同步通过master和mirror来实现,mirror相当于每个顶点对外的一个接口人,将复杂的数据通信抽象成顶点的行为。

基础组件层

提供Graphlab数据传输、多线程管理等基础并行结构的组件模块,下面将主要介绍其通信、数据序列化、数据交换、多线程管理四个功能模块。

1) 通信(dc_tcp_comm.cpp)

Graphlab基于TCP协议的长连接在机器之间进行数据通信。在Graphlab初始化阶段,所有机器建立连接,将socket数据存储在std::vector sock 结构中。

Graphlab使用单独的线程来接收和发送数据,其中接收或发送都可以配置多个线程,默认每个线程中负责与64台机器进行通信。在接收连接中,tcp_comm基于libevent采用epoll的方式获取连接到达的通知,效率高。

需要补充的是,Graphlab在数据通信中,并没有采用MPI的接口,但在源码中封装了MPI_tools,其用途是在distributed_control::init时,获取系统参数(包括机器IP和端口)提供两种方式,一种是系统配置中初始化,一种是通过MPI接口实现(dc_init_from_mpi::init_param_from_mpi)。

2) 数据序列化(oarchive & iarchive)

Oarchive通过重载操作符>>将对象序列化后写入ostream中,在Graphlab中对于POD( Plain Old Data)和非POD数据区分对待, POD类型的数据直接转为为char*写入ostream, 而非POD数据需要用户实现save方法,否则将抛出异常。iarchive的过程与oarchive的过程相反。

所有通过rpc传输的数据都通过oarchive和iarchive转化为stream,比如vertex_program, vertex_data。

4

错误!文档中没有指定样式的文字。

3) 数据传输流(buffered_stream_send2.cpp)

Oarchive,iarchive是数据序列化的工具, 在实际的传输过程中,数据并没有立即发送出去,而是缓存在buffered_stream_send。

4) Pthread_tools:

Thread类封装了lpthread的方法 提供thread_group管理线程队列

封装了锁、信号量、条件变量等同步方法。

1.2 学习GraphX之前要学什么

1.2.1 RDD

详见3.2小节。 1.2.2 Scala

GraphX很多函数操作和Scala是一样的。

1.3 Spark Graphx是什么

参考:http://book.51cto.com/art/201408/450049.htm Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求。

大家都知道,社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博、微信,这些都是大数据产生的地方,都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理,Spark GraphX由于底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。

图的分布式或者并行处理其实是把这张图拆分成很多的子图,然后我们分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。

下面我们看一下图计算的简单示例:

5

从源码中可以看出EdgeRDD继承的RDD的类型是Edge[ED].

1.6.3 Triplets

Triplets的属性有:源顶点ID、源顶点属性、边属性、目标顶点ID、目标顶点属性,Triplets其实是对Vertices和Edges做了join操作,如下图所示:

Vertices具有顶点ID和属性,Edges具有原定点ID、目标顶点ID和自己的属性,Triplets对Vertices和Edges的join操作使得Triplets具备源顶点的ID和属性、目标顶点的ID和属性以及自己的属性,也就是说Triplets把包含源顶点和目标顶点的属性以及自身的属性全部连在一起了,如果我们需要使用顶点及自己的属性以及和顶点关联的边的属性,那就必须使用Triplets。Triplets的RDD的类型是EdgeTriplet,其源码实现如下:

16

错误!文档中没有指定样式的文字。

从源码中我们看到EdgeTriplet继承自Edge[ED],同时EdgeTriplet有srcAttr来表示源顶点的属性和dstAttr来表示目标顶点的属性,我们看一下其父类Edge的源代码:

从Edge的源代码中我们可以看到构造方法中包含了srcId来标志源顶点的ID、dstId来标志目标顶点的ID、attr来表示edge的属性,这样作为Edge的继承者EdgeTriplet就拥有五个属性,而前面分析的VertexRDD只有两个属性集Vertex的ID和属性。

1.7 以最原始的方式构建graph

最原始的方式构建graph其实是使用Graph的伴生对象的apply的方式,其源码如下所示:

17

从源码中我们可以看到构建graph首先需要Vertices的RDD,此时的Vertices的类型是RDD[(VertexId, VD)],当然我们也可以直接使用VertexRDD,因为VertexRDD继承了RDD[(VertexId, VD)],第二点是要放入边edges,第三个元素是放入默认顶点的属性,当然可以null,需呀注意的是默认的顶点属性只是给我们使用,并不在Vertices里面,接下来是edgeStorageLevel和vertexStorageLevel,它们是分来指定存储策略的是的,也就是说图的边和 顶点并不是在一起Cache的,这里默认的存储策略都是StorageLevel.MEMORY_ONLY,最后我们发现创建一个Graph实例的时候使用的是GraphImpl来完成的。

1.8 Vertices、edges、triplets操作示例

本文中我们构建Graph的数据来源有两种,一个是本地的数据集,一个是来自Google的数据集。

我们先看一下本地数据集构建graph的操作,该内容也来自Spark 1.0.2 GraphX的官方文档:http://spark.apache.org/docs/latest/graphx-programming-guide.html

首先在本地启动Spark Shell,如下所示:

18

错误!文档中没有指定样式的文字。

首先把相关应该导入的类导入进来:

可以看出我们导入了org.apache.spark下面的内容,同时也导入了org.apache.spark.graphx下面的内容,最后为了方便后续的RDD操作,我们还导入了org.apache.spark.rdd.RDD。

作为第一个例子,我们用的是下图中表示的数据:

我们看一下Spark GraphX官方文档给我们提供的代码:

19

对于users这个RDD而言,其每一个元素包含一个ID和属性,属性是由name和occupation构成的元组,因为要生产RDD,所以我们使用了sc.parallelize函数来把Array转换一下,parallelize方法的源码如下所示:

从官方给出的第一个GraphX的源码同样可以看出relationships每个元素有源顶点ID、目标顶点ID和边的属性等三部分构成;

接下来有个非常重要的对象为defaultUser,其主要作用就在于当如果想描述一种ralationships中不存在的目标顶点的时候就会使用这个defaultUser,例如5到0这个ralationship是不存在的,那就会默认指向defaultUser,这就是defaultUser的用途,可能有些朋友说不想要defaultUser,这也是可以的;

我们在Spark shell中首先生成users这个RDD,如下所示:

20

错误!文档中没有指定样式的文字。

我们也可以直接查看一下graph边的情况:

1.9 GraphX 重要图算法

1.9.1 ShortestPaths

1.9.2 PageRank 社交网络的推荐

PageRank记录了图中每个顶点的重要性,假设一条边从u到v,代表从u传递给v的重

31

要性。例如,如果一个Twitter用户有很多粉丝,用户排名将很高。

http://book.51cto.com/art/201409/452634.htm 1.9.3 TriangleCount 社区发现

例如说在微博上你关注的人也互相关注,大家的关注关系中就会有很多三角形,这说明社区很强很稳定,大家的联系都比较紧密;如果说只是你一个人关注很多人,这说明你的社交群体是非常小的。

1.10 GraphX重要的图操作

1.10.1 Property Operators 属性操作 1.10.2 Structural Operators 结构化操作 1.10.3 Computing Degree 计算度信息

一个常见的聚合任务是计算每个顶点的度:每个顶点相邻边的数目。在有向图的情况下,往往需要知道入度,出度,以及总度。 1.10.4 Collecting Neighbors 收集邻居

在某些情况下可能更容易通过收集相邻顶点和它们的属性来表达在每个顶点表示的计算。这可以通过使用容易地实现 collectNeighborIds和 collectNeighbors运算。 1.10.5 Join Operators

许多情况下,有必要从外部集合(RDDS)中加入图形数据。例如,我们可能有额外的用户属性,想要与现有的图形合并,或者我们可能需要从一个图选取一些顶点属性到另一个图。这些任务都可以使用来join经操作完成。 1.10.6 MapReduceTriplets 邻居聚集

图形计算的一个关键部分是聚集每个顶点的邻域信息。例如,我们可能想要知道每个用户追随者的数量或每个用户的追随者的平均年龄。许多图迭代算法(如PageRank,最短路径,连通分量等)反复聚集邻居节点的属性, (例如,当前的PageRank值,到源节点的最短路径,最小可达顶点ID)。 1.10.7 Pregel API 迭代计算

在Spark中,RDDS默认并不保存在内存中。为了避免重复计算,当他们需要多次使用时,必须明确地使用缓存(见Spark编程指南)。在GraphX 中Graphs行为方式相同。当需要多次使用图形时,一定要首先调用Graph.cache()以

在迭代计算,为了最佳性能,也可能需要清空缓存。默认情况下,缓存的RDDS和图表将保留在内存中,直到内存压力迫使他们按照LRU顺序被删除。对于迭代计算,之前的迭代的中间结果将填补缓存。虽然他们最终将被删除,内存中的不必要的数据会使垃圾收集

32

错误!文档中没有指定样式的文字。

机制变慢。一旦它们不再需要缓存,就立即清空中间结果的缓存,这将会更加有效。这涉及物化(缓存和强迫)图形或RDD每次迭代,清空所有其他数据集,并且只使用物化数据集在未来的迭代中。然而,由于图形是由多个RDDS的组成的,正确地持续化他们将非常困难。对于迭代计算,我们推荐使用Pregel API,它正确地unpersists中间结果。

33

错误!文档中没有指定样式的文字。

上述代码生成的是Vertex Table:

接下来生成ralationships这个RDD:

代码中我们使用了Edge这个Case class,其源码为:

上述代码生成的是Edge Table:

21

接下来放入defaultUser:

此时我们使用Graph的apply方法即可构造出图,代码如下所示:

再次看一下Graph实例构造的源代码:

从源代码中可以看出使用了Graph类的伴生对象的apply方法产生了Graph实例对象,具体的实现为GraphImpl,在代码中依旧是使用了GraphImpl伴生对象的apply方法,如下

22

错误!文档中没有指定样式的文字。

所示:

继续跟踪源代码:

进入fromExistingRDDs中:

23

此时使用GraphImpl的类构成出了Graph对象实例:

其中的ReplicatedVertexView的代码如下所示:

24

错误!文档中没有指定样式的文字。

我们可以看到实际使用的GraphImpl来构造出了图的实例,此时我们就构建出了一张如下所示的图:

下面我们按照Spark GraphX官方文档提供的方式对构造出来的graph对象进行操作,例如我们要看一下occupation为pst.doc.的顶点数目,使用如下代码即可:

25

通过计算发现只有User的pst.doc.,此时我们看一下Property Graph,可以发现时顶点为7的这个User:

26

错误!文档中没有指定样式的文字。

例如我们要计算一下生成的graph中源顶点ID大于目标顶点ID的数量,直接使用如下代码即可:

计算结果表明只有一个srcID大于dstID的情况,此时我们看一下前面生成的graph图:

27

从图中可以发现只有5大于3,其它都是srcID小于dstID。

其实对构建出来的Graph除了可以对其Vertices和edges进行操作外,还可以对它的triplets进行操作,其实当我们的graph构建出来的时候,graph本身就有三个非常重要的属性,源码如下所示:

28

错误!文档中没有指定样式的文字。

接下来我们查看一下triplets:

为什么EdgeTriplet的类型是EdgeTriplet[(String, String),String]呢,我们看一下源码:

从源码中可以第一个元素是顶点属性类型,在我们的例子中就是(name, occupation)的元组,第二个元素是边属性类型。因为其数量比较少,接下来我们进行一下collect操作:

29

下面我们看一下顶点:

30

本文来源:https://www.bwwdw.com/article/mfsd.html

Top