跳到主要内容

Python Olap API

本文档主要介绍 OlapBase OlapOnDB 和 OlapOnDisk 在Python中的API用法

1. 概述

本手册将介绍使用TuGraph图计算系统Python接口需要的简单配置,同时结合代码对TuGraph Python API进行解释。关于ParallelBitset、OlapBase各类的作用,详见olap-base-api.md,olap-on-db-api.md和olap-on-disk-api.md

2. 配置要求

如果要使用TuGraph图计算编写以及编译自己的应用程序,需要的配置要求为:

  • linux操作系统,目前在Ubuntu16.04, Ubuntu18.04, Ubuntu20.04和Centos7, Centos8系统上可成功运行。
  • 支持C++17的编译器,要求GCC版本为8.4.0或更新的版本。
  • Cython,版本要求3.0.0以上,已测试可运行版本为3.0.0a11

3. Cython

Cython是一种高效的编程语言,是Python的超集。Cython能将py文件翻译为C/C++代码后编译为Python拓展类,在Python中通过import调用。在TuGraph中,所有的Python plugin都由Cython编译为Python拓展类后使用。

Cython的Pure Python模式在保证Python语法的同时具有C/C++的性能,TuGraph Python接口均使用Cython实现。

Cython 文档

4. Olap API

见procedures/algo_cython/olap_base.pxd文件,用法与功能基本与C++接口相同,olap_base.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *的方式导入,由Cython编译py文件后才能运行。

原子操作

  • cas[T](ptr: cython.pointer(T), oldv: T, newv: T)-> cython.bint:如果ptr指向的值等于oldv,则将ptr指向的值赋为newv并返回true,否则返回false
  • write_min[T](a: cython.pointer(T), b: T)-> cython.bint:如果b比a指向的值更小,那么将a指向的值赋为b并返回true,否则返回false。
  • write_max[T](a: cython.pointer(T), b: T)-> cython.bint:如果b比a指向的值更大,那么将a指向的值赋为b并返回true,否则返回false。
  • write_add[T](a: cython.pointer(T), b: T)-> cython.bint:将b的值加到a指向的值上。
  • write_sub[T](a: cython.pointer(T), b: T)-> cython.bint:将a指向的值减去b的值。

点集合类ParallelBitset

  • Size()-> size_t:表示Bitmap中的点个数。
  • ParallelBitset(size: size_t):初始化size和data,data长度为(size >> 6)+1
  • Clear()-> cython.void:清空集合
  • Fill()-> cython.void:将所有点加入集合
  • Has(size_t i)-> cython.bint:检查点i是否在集合中
  • Add(size_t i)-> cython.bint:将点i加入集合中
  • Swap(ParallelBitset &other)-> cython.void:和另一组ParallelBitset集合交换元素

点数组类ParallelVector

  • ParallelVector[T](size_t capacity) 构建ParallelVector,capacity为点数组的初始容量大小
  • operator[](i: size_t)-> T:下标为i的数据
  • begin()-> cython.pointer(T):ParallelVector的起始指针
  • end()-> cython.pointer(T):ParallelVector的结束指针。begin和end的用法类似于vector容器的begin和end指针,可以使用这两个指针对数组进行顺序访问
  • Back()-> T:ParallelVector最后一个数据
  • Data()-> cython.pointer(T):表示数组本身数据
  • Destroy()-> cython.void:清空ParallelVector数组内数据并删除数组
  • Size()-> size_t:表示ParallelVector中的数据个数
  • Resize(size: size_t)-> cython.void:更改ParallelVector为size大小,该size应大于等于更改前的大小且小于capacity
  • Clear()-> cython.void:清空ParallelVector内数据
  • ReAlloc(capacity: size_t)-> cython.void:给ParallelVector分配新的容量大小,若数组有数据则将数据迁移至新内存
  • Fill(elem: T)-> cython.void:为ParallelVector的全部数据赋值为elem
  • Append(elem: T, atomic: cython.bint = true)-> cython.void:向ParallelVector结尾添加一个数据
  • Swap(other: ParallelVector[T])-> cython.void:和其他的ParallelVector交换数据
  • Copy()-> ParallelVector[T]:复制当前的ParallelVector数据存至Copy数组中

自定义数据结构

  • Empty:内容为空的特殊数据类型。
  • EdgeUnit[EdgeData]:表示权值类型为EdgeData的边,用于解析输入文件,包含三个成员变量:
    • src: size_t:边的起始点
    • dst: size_t:边的终点
    • edge_data: EdgeData:边的权值
  • AdjUnit[EdgeData]:表示权值类型为EdgeData的边,用于批处理计算过程中,包含两个成员变量:
    • neighbour: size_t:边的邻居点
    • edge_data: EdgeData:边的权值
  • AdjList[EdgeData]:权值类型为EdgeData的点的邻接表,常用于表示点的入边和出边集合,包含两个成员变量:
    • begin()-> cython.pointer(AdjUnit[T]):列表的起始指针
    • end()-> cython.pointer(AdjUnit[T]):列表的结束指针。
    • operator[](i: size_t)-> AdjUnit[EdgeData]: 下标为i的数据

图类OlapBase

  • NumVertices()-> size_t:获取点数

  • NumEdges()-> size_t:获取边数

  • OutDegree(size_t vid)-> size_t:点vid的出度

  • InDegree(size_t vid)-> size_t:点vid的入度

  • AllocVertexArray[VertexData]() ->ParallelVector[VertexData]:分配一个类型为VertexData的数组,大小为点个数

  • AllocVertexSubset()-> ParallelBitset:分配一个ParallelBitset集合,用于表示所有点的状态是否激活

  • OutEdges(vid: size_t)-> AdjList[EdgeData]:获取点v的所有出边集合

  • InEdges(vid: size_t)-> AdjList[EdgeData]:获取点v的所有入边集合

  • Transpose()-> cython.void:对有向图进行图反转

  • LoadFromArray(edge_array: cython.p_char, input_vertices: size_t, input_edges: size_t, edge_direction_policy: EdgeDirectionPolicy):从数组中加载图数据,包含四个参数,其含义分别表示:

    • edge_array:将该数组中的数据读入图,一般情况下该数组包含多条边。
    • input_vertices:指定数组读入图的点个数。
    • input_edges:指定数组读入图的边的条数。
    • edge_direction_policy:指定图为有向或无向,包含三种模式,分别为DUAL_DIRECTION、MAKE_SYMMETRIC以及INPUT_SYMMETRIC。对应的详细介绍见include/lgraph/olap_base.h文件的enum EdgeDirectionPolicy
  • AcquireVertexLock(vid: size_t)-> cython.void:对点vid加锁,禁止其它线程对该锁对应的点数据进行访存

  • void ReleaseVertexLock(vid: size_t)-> cython.void:对点vid解锁,所有线程均可访存该锁对应的点数据

TuGraph提供了两个批处理操作来并行地进行以点为中心的批处理过程,在Python中与C++使用方法稍有不同。

# 函数名称:ProcessVertexInRange[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# lower: size_t, upper: size_t,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# 函数用途:对Graph中节点编号介于lower和upper之间的节点执行work函数。第四个参数表示累加的基数,默认为0;
# 第五个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
# 具体实现请参考include/lgraph/olap_base.h中具体代码
#
# 使用示例:统计数组parent数组中有出边的点个数

import cython
from cython.cimports.olap_base import *


@cython.cclass
class CountCore:
graph: cython. pointer(OlapBase[Empty])
parent: ParallelVector[size_t]

@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
if self.graph.OutDegree(self.parent[vi]) > 0:
return 1
return 0

def run(self, pointer_g: cython. pointer(OlapBase[Empty])):
self.graph = pointer_g
self.parent = self.graph.AllocVertexArray[size_t]()
vertex_num: size_t
vertex_num = self.graph.ProcessVertexInRange[size_t, CountCore](self.Work, 0, self.parent.Size(), self)
print("the number is", vertex_num)

if __name__ == "__main__":
count_core = CountCore()
count_core.run(cython.address(g))

其中g为图类OlapBase的实例化对象

# 函数名称:ProcessVertexActive[ReducedSum, Algorithm](
# work: (algo: Algorithm, vi: size_t)-> ReducedSum,
# active: ParallelBitset,
# algo: Algorithm,
# zero: ReducedSum = 0,
# reduce: (a: ReducedSum, b: ReducedSum)-> ReducedSum = reduce_plus[ReducedSum])
#
# 函数用途:对active_vertices中对应为1的节点执行work函数,第三个参数表示累加的基数,默认为0;
# 第四个参数表示对每个work处理后的节点返回值进行迭代reduce函数操作,默认为累加操作。
# 具体实现请参考/include/lgraph/olap_base.h中具体代码
#
# 使用示例:输出Graph中节点1,2,3的所有出度邻居,并统计这三个节点的总出度

import cython
from cython.cimports.olap_base import *
from cython.cimports.libc.stdio import printf


@cython.cclass
class NeighborCore:
graph: cython.pointer(OlapBase[Empty])
active_in: ParallelBitset

@cython.cfunc
@cython.nogil
def Work(self, vi: size_t) -> size_t:
degree = self.graph.OutDegree(vi)
dst: size_t
edges = self.graph.OutEdges(vi)
local_out_degree: size_t
for i in range(degree):
dst = edges[i].neighbour
printf("node %lu has neighbour %lu\n", vi, dst)
local_out_degree += 1
return local_out_degree

def run(self, pointer_g: cython.pointer(OlapBase[Empty])):
self.graph = pointer_g
self.active_in = self.graph.AllocVertexSubset()
self. active_in. Add(1)
self. active_in. Add(2)
self. active_in. Add(3)
total_outdegree = cython.declare(
size_t,
self.graph.ProcessVertexActive[size_t, CountCore](self.Work, self.active_in, self))
printf("total outdegree of node1,2,3 is %lu\n",total_outdegree)

if __name__ == "__main__":
neighbor_core = NeighborCore()
neighbor_core.run(cython.address(g))

如上面两个例子所展示,在Python中ProcessVertexActive与ProcessVertexInRange比在C++中额外需要一个算法类指针参数,Work函数一般也作为该算法类的成员函数,满足Work函数访问成员变量的需求(如图graph,点数组parent),在调用批处理函数时将Work函数和算法类的self指针传入批处理函数。

其中Work函数会在多线程中调用,因此加上修饰器@cython.nogil释放Python全局解释锁,在多线程执行的代码中(例如批处理函数中的Work函数,cython.parallel.prange中),不能包含Python对象,最好通过dst: type或者dst = cython.declare(type)的方式声明变量为C/C++类型。

图类OlapOnDB:

并行化创建有向图:

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL)

并行化创建无向图

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED)

ID_MAPPING创建有向图

olapondb = OlapOnDB[Empty](db, txn, SNAPSHOT_PARALLEL | SNAPSHOT_IDMAPPING)

图类OlapOnDisk

ConfigBase:

  • ConfigBase(): 构造函数

  • std::string input_dir: 图边表数据路径

  • std::string output_dir: 输出结果路径

  • Load(config: ConfigBase[EdgeData], edge_direction_policy: EdgeDirectionPolicy)-> void: 读入图数据

5. lgraph_db API

见procedures/algo_cython/lgraph_db.pxd与lgraph_db_python.py文件。

lgraph_db.pxd中接口用法与功能基本与C++接口相同,lgraph_db.pxd中声明的接口都由C++实现,在py文件中必须通过from cython.cimports.olap_base import *的方式导入,由Cython编译py文件后才能运行。

VertexIndexIterator

  • GetVid()-> int64_t: 获取点的vid

Galaxy

  • Galaxy(dir_path: std::string): 构造函数,dir_path为db路径
  • SetCurrentUser(user: std::string, password: std::string)-> cython.void: 设置用户
  • SetUser(user: std::string)-> cython.void: 设置用户
  • OpenGraph(graph: std::string, read_only: bint)-> GraphDB: 创建GraphDB

GraphDB:

  • CreateReadTxn()-> Transaction: 创建只读事务
  • CreateWriteTxn()-> Transaction: 创建写事务
  • ForkTxn(txn: Transaction)-> Transaction: 复制事务,只能复制读事务

Transaction:

GetVertexIndexIterator(
label: std::string,
field: std::string,
key_start: std::string,
key_end: std::string)-> VertexIndexIterator

获取索引迭代器。迭代器的field值为 [key_start, key_end]。所以在key_start=key_end=v时,返回指向field值为v的点的迭代器

lgraph_db_python.py是lgraph_db.pxd中C++类 Galaxy与GraphDB的包装,将C++类包装为Python类,将lgraph_db_python.py编译为Python拓展后,可以直接在Python文件或Python命令行中import lgraph_db_python访问lgraph_db_python.PyGraphDB与PyGraphDB.PyGalaxy。

PyGalaxy:

  • PyGalaxy(self, dir_path: str): 构造函数,dir_path为db路径
  • SetCurrentUser(self, user: str password: str)-> void: 设置用户
  • SetUser(self, user: std::string)-> void: 设置用户
  • OpenGraph(self, graph: str, read_only: bool)-> PyGraphDB: 创建PyGraphDB

PyGraphDB:

  • get_pointer(self)-> cython.Py_ssize_t: C++ 类GraphDB的地址

6. 算法插件示例

下面为Python实现的BFS算法的代码示例:

# cython: language_level=3, cpp_locals=True, boundscheck=False, wraparound=False, initializedcheck=False
# distutils: language = c++

# 注释作用如下:
# language_level=3: 使用Python3
# cpp_locals=True: 需要c++17,使用std::optional管理Python代码中的C++对象,可以避免C++对象的拷贝构造
# boundscheck=False: 关闭索引的边界检查
# wraparound=False: 关闭负数下标的处理(类似Python List)
# initializedcheck=False: 关闭检查内存是否初始化,关闭检查后运行性能更快
# language = c++: 将此py文件翻译为C++而不是C文件,TuGraph使用大量模板函数,所以都应该使用C++

import json

import cython
from cython.cimports.olap_base import *
from cython.cimports.lgraph_db import *
# 从procedures/algo_cython/ 中cimportolap_base.pxd与lgraph_db.pxd, 类似C++中#include "xxx.h"

from cython.cimports.libc.stdio import printf
# 类似C++中#include <stdio.h>
# 其他常见的还有cython.cimports.libcpp.unordered_map等

import time


@cython.cclass
# cython.cclass 表示BFSCore为C类型的Class
class BFSCore:
graph: cython.pointer(OlapBase[Empty])
# cython.pointer(OlapBase[Empty])表示OlapBase[Empty]的指针,类似C++中OlapBase[Empty]*
# cython提供了常见类型的指针,如cython.p_int, cython.p_char等,表示int*, char*, ...
parent: ParallelVector[size_t]
active_in: ParallelBitset
active_out: ParallelBitset
root: size_t
# root: size_t 声明root为C++ size_t类型变量,等效于root = cython.declare(size_t)
# 不声明类型的变量为Python object类型
# 声明变量类型会大幅提高性能,同时在多线程部分,只有C/C++类型的变量可以访问

@cython.cfunc
# cython.cfunc 表示Work为C类型的函数,参数与返回值应声明
# cfunc性能好,能接受C/C++对象为参数、返回值,但是不能在其他python文件中调用
# 类似的有cython.ccall,如Standalone函数,可以在其他python文件中调用
@cython.nogil
# cython.nogil 表示释放Python全局解释锁,在nogil修饰的部分,不能访问Python对象
# 在多线程部分,都应有nogil修饰器
@cython.exceptval(check=False)
# cython.exceptval(check=False) 表示禁用异常传播,将忽略函数内部引发的Python异常
def Work(self, vi: size_t) -> size_t:
degree = cython.declare(size_t, self.graph.OutDegree(vi))
out_edges = cython.declare(AdjList[Empty], self.graph.OutEdges(vi))
i = cython.declare(size_t, 0)
local_num_activations = cython.declare(size_t, 0)
dst: size_t
for i in range(degree):
dst = out_edges[i].neighbour
if self.parent[dst] == cython.cast(size_t, -1):
# parent[dst] == -1 表示dst没有被bfs访问过
if self.active_out.Add(dst):
# 将dst设置为为活跃节点;ParallelBitmap.Add为原子操作,防止重复计算
self.parent[dst] = vi
local_num_activations += 1
return local_num_activations

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def run(self, g: cython.pointer(OlapBase[Empty]), r: size_t) -> cython.size_t:
self.graph = g
self.root = r
self.active_in = g.AllocVertexSubset()
self.active_out = g.AllocVertexSubset()
self.parent = g.AllocVertexArray[size_t]()
self.parent.Fill(-1)
num_vertices = cython.declare(size_t, self.graph.NumVertices())
printf("num_vertices = %lu\n", num_vertices)
self.parent[self.root] = self.root
num_activations = cython.declare(size_t, 1)
discovered_vertices = cython.declare(size_t, num_activations)
self.active_in.Add(self.root)
while num_activations > 0:
self.active_out.Clear()
num_activations = g.ProcessVertexActive[size_t, BFSCore](self.Work, self.active_in, self)
discovered_vertices += num_activations
self.active_out.Swap(self.active_in)
printf("num_activations = %lu\n", num_activations)
return discovered_vertices


@cython.cfunc
def procedure_process(db: cython.pointer(GraphDB), request: dict, response: dict) -> cython.bint:
cost = time.time()
root_id = "0"
label = "node"
field = "id"
if "root" in request:
root_id = request["root"]
if "label" in request:
label = request["label"]
if "field" in request:
field = request["field"]

txn = db.CreateReadTxn()
olapondb = OlapOnDB[Empty](db[0], txn, SNAPSHOT_PARALLEL)
# 并行创建OlapOnDB
# Cython不支持如 *db 的解引用操作,通过db[0]来解引用
root_vid = txn.GetVertexIndexIterator(
label.encode('utf-8'), field.encode('utf-8'),
root_id.encode('utf-8'), root_id.encode('utf-8')
).GetVid()
# 通过 GetVertexIndexIterator 根据root节点label名和filed名与filed值(root_id)
# 获取root节点的迭代器,通过迭代器获取vid,在无ID_MAPPING时,该vid与OlapOnDB中的id相同
cost = time.time() - cost
printf("prepare_cost = %lf s\n", cython.cast(cython.double, cost))
a = BFSCore()
cost = time.time()
count = a.run(cython.address(olapondb), root_vid)
cost = time.time() - cost
printf("core_cost = %lf s\n", cython.cast(cython.double, cost))
response["found_vertices"] = count
response["num_vertices"] = olapondb.NumVertices()
response["num_edges"] = olapondb.NumEdges()
return True


@cython.ccall
def Process(db: lgraph_db_python.PyGraphDB, inp: bytes):
# Process为embed模式和procedure模式下插件入口,用cython.ccall修饰
# Process函数必须名为Process,参数为lgraph_db_python.PyGraphDB与bytes
# 返回值必须为(bool, str)
_inp = inp.decode("utf-8")
request = json.loads(_inp)
response = {}
addr = cython.declare(cython.Py_ssize_t, db.get_pointer())
# 获取PyGraphDB中GraphDB对象的地址,转换为指针后传递
procedure_process(cython.cast(cython.pointer(GraphDB), addr),
request, response)
return (True, json.dumps(response))