Python Olap API
本文档主要介绍 OlapBase OlapOnDB 和 OlapOnDisk 在Python中的API用法
1. 概述
本手册将介绍使用TuGraph图计算系统Python接口需要的简单配置,同时结合 代码对TuGraph Python API进行解释。关于ParallelBitset、OlapBase各类的作用,详见olap-base-api.md,olap-on-db-api.md和olap-on-disk-api.md
2. 配置要求
如果要使用TuGraph图计算编写以及编译自己的应用程序,需要的配置要求为:
- linux操作系统,目前在Ubuntu16.04, Ubuntu18.04, Ubuntu20.04和Centos7, Centos8系统上可成功运行。
- 支持C++17的编译器,要求GCC版本为8.4.0或更新的版本。
- Cython,版本要求3.0.0以上,已测试可运行版本为3.0.0a11
3. Cython
Cython是一种高效的编程语言,是Python的超集。Cython能将py文件翻译为C/C++代码后编译为Python拓展类,在Python中通过import调用。在TuGraph中,所有的Python plugin都由Cython编译为Python拓展类后使用。
Cython的Pure Python模式在保证Python语法的同时具有C/C++的性能,TuGraph Python接口均使用Cython实现。
4. Olap API
见procedures/algo_cython/olap_base.pxd文件,用法与功能基本与C++接口相同,olap_base.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *
的方式导入,由Cython编译py文件后才能运行。
原子操作
cas[T](ptr: cython.pointer(T), oldv: T, newv: T)-> cython.bint
:如果ptr指向的值等于oldv,则将ptr指向的值赋为newv并返回true,否则返回falsewrite_min[T](a: cython.pointer(T), b: T)-> cython.bint
:如果b比a指向的值更小,那么将a指向的值赋为b并返回true,否则返回false。write_max[T](a: cython.pointer(T), b: T)-> cython.bint
:如果b比a指向的值更大,那么将a指向的值赋为b并返回true,否则返回false。write_add[T](a: cython.pointer(T), b: T)-> cython.bint
:将b的值加到a指向的值上。write_sub[T](a: cython.pointer(T), b: T)-> cython.bint
:将a指向的值减去b的值。
点集合类ParallelBitset
Size()-> size_t
:表示Bitmap中的点个数。ParallelBitset(size: size_t)
:初始化size和data,data长度为(size >> 6)+1Clear()-> cython.void
:清空集合Fill()-> cython.void
:将所有点加入集合Has(size_t i)-> cython.bint
:检查点i是否在集合中Add(size_t i)-> cython.bint
:将点i加入集合中Swap(ParallelBitset &other)-> cython.void
:和另一组ParallelBitset集合交换元素
点数组类ParallelVector
ParallelVector[T](size_t capacity)
构建ParallelVector,capacity为点数组的初始容量大小operator[](i: size_t)-> T
:下标为i的数据begin()-> cython.pointer(T)
:ParallelVector的起始指针end()-> cython.pointer(T)
:ParallelVector的结束指针。begin和end的用法类似于vector容器的begin和end指针,可以使用这两个指针对数组进行顺序访问Back()-> T
:ParallelVector最后一个数据Data()-> cython.pointer(T)
:表示数组本身数据Destroy()-> cython.void
:清空ParallelVector数组内数据并删除数组Size()-> size_t
:表示ParallelVector中的数据个数Resize(size: size_t)-> cython.void
:更改ParallelVector为size大小,该size应大于等于更改前的大小且小于capacityClear()-> cython.void
:清空ParallelVector内数据ReAlloc(capacity: size_t)-> cython.void
:给ParallelVector分配新的容量大小,若数组有数据则将数据迁移至新内存Fill(elem: T)-> cython.void
:为ParallelVector的全部数据赋值为elemAppend(elem: T, atomic: cython.bint = true)-> cython.void
:向ParallelVector结尾添加一个数据Swap(other: ParallelVector[T])-> cython.void
:和其他的ParallelVector交换数据Copy()-> ParallelVector[T]
:复制当前的ParallelVector数据存至Copy数组中
自定义数据结构
Empty
:内容为空的特殊数据类型。EdgeUnit[EdgeData]
:表示权值类型为EdgeData的边,用于解析输入文件,包含三个成员变量:src: size_t
:边的起始点dst: size_t
:边的终点edge_data: EdgeData
:边的权值
AdjUnit[EdgeData]
:表示权值类型为EdgeData的边,用于批处理计算过程中,包含两个成员变量:neighbour: size_t
:边的邻居点edge_data: EdgeData
:边的权值
AdjList[EdgeData]
:权值类型为EdgeData的点的邻接表,常用于表示点的入边和出边集合,包含两个成员变量:begin()-> cython.pointer(AdjUnit[T])
:列表的起始指针end()-> cython.pointer(AdjUnit[T])
:列表的结束指针。operator[](i: size_t)-> AdjUnit[EdgeData]
: 下标为i的数据
图类OlapBase
-
NumVertices()-> size_t
:获取点数 -
NumEdges()-> size_t
:获取边数 -
OutDegree(size_t vid)-> size_t
:点vid的出度 -
InDegree(size_t vid)-> size_t
:点vid的入度 -
AllocVertexArray[VertexData]() ->ParallelVector[VertexData]
:分配一个类型为VertexData的数组,大小为点个数 -
AllocVertexSubset()-> ParallelBitset
:分配一个ParallelBitset集合,用于表示所有点的状态是否激活 -
OutEdges(vid: size_t)-> AdjList[EdgeData]
:获取点v的所有出边集合 -
InEdges(vid: size_t)-> AdjList[EdgeData]
:获取点v的所有 入边集合 -
Transpose()-> cython.void
:对有向图进行图反转 -
LoadFromArray(edge_array: cython.p_char, input_vertices: size_t, input_edges: size_t, edge_direction_policy: EdgeDirectionPolicy)
:从数组中加载图数据,包含四个参数,其含义分别表示:edge_array
:将该数组中的数据读入图,一般情况下该数组包含多条边。input_vertices
:指定数组读入图的点个数。input_edges
:指定数组读入图的边的条数。edge_direction_policy
:指定图为有向或无向,包含三种模式,分别为DUAL_DIRECTION、MAKE_SYMMETRIC以及INPUT_SYMMETRIC。对应的详细介绍见include/lgraph/olap_base.h文件的enum EdgeDirectionPolicy
。
-
AcquireVertexLock(vid: size_t)-> cython.void
:对点vid加锁,禁止其它线程对该锁对应的点数据进行访存 -
void ReleaseVertexLock(vid: size_t)-> cython.void
:对点vid解锁,所有线程均可访存该锁对应的点数据
TuGraph提供了两个批处理操作来并行地进行以点为中心的批处理过程,在Python中与C++使用方法稍有不同。
# 函数名称:ProcessVertexInRange[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# lower: size_t, upper: size_t,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# 函数用途:对Graph中节点编号介于lower和upper之间的节点执行work函数。第四个参数表示累加的基数,默认为0;
# 第五个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
# 具体实现请参考include/lgraph/olap_base.h中具体代码
#
# 使用示例:统计数组parent数组中有出边的点个数
import cython
from cython.cimports.olap_base import *
@cython.cclass
class CountCore:
graph: cython. pointer(OlapBase[Empty])
parent: ParallelVector[size_t]
@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
if self.graph.OutDegree(self.parent[vi]) > 0:
return 1
return 0
def run(self, pointer_g: cython. pointer(OlapBase[Empty])):
self.graph = pointer_g
self.parent = self.graph.AllocVertexArray[size_t]()
vertex_num: size_t
vertex_num = self.graph.ProcessVertexInRange[size_t, CountCore](self.Work, 0, self.parent.Size(), self)
print("the number is", vertex_num)
if __name__ == "__main__":
count_core = CountCore()
count_core.run(cython.address(g))
其中g为图类OlapBase的实例化对象
# 函数名称:ProcessVertexActive[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# active: ParallelBitset,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# 函数用途:对active_vertices中对应为1的节点执行work函数,第三个参数表示累加的基数, 默认为0;
# 第四个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
# 具体实现请参考/include/lgraph/olap_base.h中具体代码
#
# 使用示例:输出Graph中节点1,2,3的所有出度邻居,并统计这三个节点的总出度
import cython
from cython.cimports.olap_base import *
from cython.cimports.libc.stdio import printf
@cython.cclass
class NeighborCore:
graph: cython.pointer(OlapBase[Empty])
active_in: ParallelBitset
@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
degree = self.graph.OutDegree(vi)
dst: size_t
edges = self.graph.OutEdges(vi)
local_out_degree: size_t
for i in range(degree):
dst = edges[i].neighbour
printf("node %lu has neighbour %lu\n", vi, dst)
local_out_degree += 1
return local_out_degree
def run(self, pointer_g: cython.pointer(OlapBase[Empty])):
self.graph = pointer_g
self.active_in = self.graph.AllocVertexSubset()
self. active_in. Add(1)
self. active_in. Add(2)
self. active_in. Add(3)
total_outdegree = cython.declare(
size_t,
self.graph.ProcessVertexActive[size_t, CountCore](self.Work, self.active_in, self))
printf("total outdegree of node1,2,3 is %lu\n",total_outdegree)
if __name__ == "__main__":
neighbor_core = NeighborCore()
neighbor_core.run(cython.address(g))
如上面两个例子所展示,在Python中ProcessVertexActive与ProcessVertexInRange比在C++中额外需要一个算法类指针参数,Work函数一般也作为该算法类的成员函数,满足Work函数访问成员变量的需求(如图graph,点数组parent),在调用批处理函数时将Work函数和算法类的self指针传入批处理函数。
其中Work函数会在多线程中调用,因此加上修饰器@cython.nogil
释放Python全局解释锁,在多线程执行的代码中(例如批处理函数中的Work函数,cython.parallel.prange
中),不能包含Python对象,最好通过dst: type
或者dst = cython.declare(type)
的方式声明变量为C/C++类型。
图类OlapOnDB:
并行化创建有向图:
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL)
并行化创建无向图
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED)
ID_MAPPING创建有向图
olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING)
图类OlapOnDisk
ConfigBase:
-
ConfigBase()
: 构造函数 -
std::string input_dir
: 图边表数据路径 -
std::string output_dir
: 输出结果路径 -
Load(config: ConfigBase[EdgeData], edge_direction_policy: EdgeDirectionPolicy)-> void
: 读入图数据
5. lgraph_db API
见procedures/algo_cython/lgraph_db.pxd与lgraph_db_python.py文件。
lgraph_db.pxd中接口用法与功能基本与C++接口相同,lgraph_db.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *
的方式导入,由Cython编译py文件后才能运行。
VertexIndexIterator
GetVid()-> int64_t
: 获取点的vid
Galaxy
Galaxy(dir_path: std::string)
: 构造函数,dir_path为db路径SetCurrentUser(user: std::string, password: std::string)-> cython.void
: 设置用户SetUser(user: std::string)-> cython.void
: 设置用户OpenGraph(graph: std::string, read_only: bint)-> GraphDB
: 创建GraphDB
GraphDB:
CreateReadTxn()-> Transaction
: 创建只读事务CreateWriteTxn()-> Transaction
: 创建写事务ForkTxn(txn: Transaction)-> Transaction
: 复制事务,只能复制读事务
Transaction:
GetVertexIndexIterator(
label: std::string,
field: std::string,
key_start: std::string,
key_end: std::string)-> VertexIndexIterator