OlapOnDB API
此文档主要详细介绍了OlapOnDB API的使用说明
1. 简介
一般用户需要自己实现的只是将需要分析的子图抽取出来的过程。用户 也可以通过使用TuGraph中丰富的辅助接口实现自己的图分析算法。
该文档主要介绍Procedure及Embed的接口设计,并介绍部分常用接口,具体的接口信息参见include/lgraph/olap_on_db.h文件。
2. 模型
Procedure及Embed使用到的辅助函数主要包含在OlapOnDB类,还有一些使用频率较高的函数都会逐一介绍
在TuGraph中OLAP相关的有以下常用的数据结构:
- DB图分析类 OlapOnDB<EdgeData>
- 点数组ParallelVector<VertexData>
- 点集合ParallelBitset
- 边数据结构AdjUnit/AdjUnit<Empty>
- 边集合数据结构AdjList<EdgeData>
2.1 基于快照的存储结构
TuGraph中的OlapOnDB类能够提供数据“快照”,即建立一个对指定数据集的完全可用拷贝,该拷贝包括相应数据在某个时间点(拷贝开始的时间点)的镜像。由于OLAP的操作仅涉及读操作而不涉及写操作,OlapOnDB会以一种更紧凑的方式对数据进行排布,在节省空间的同时,提高数据访问的局部性。
2.2 BSP计算模型
TuGraph在计算的过程中使用了BSP(Bulk Synchronous Parallel)模型,使得该过程能够并行执行,极大的提高了程序运行效率。
BSP计算模型的核心思路为超步(Super Step)的提出和使用。在OlapOnDB创建后,在该数据上的计算分为多个超步,比如PageRank,分为多轮迭代,每轮迭代就是一个超步。不同超步之间用存在显式同步,从而保证所有线程在完成同一超步后同时进入下一个超步。在一个超步内部,所有的线程异步执行,利用并行提升计算效率。
利用BSP计算模型能够有效避免死锁,通过障碍同步的方式能够以硬件方式实现粗粒度的全局同步,使得图计算能够并行化执行,而程序员无需在同步互斥上大费周章。
3. 算法举例
在这里对PageRank算法分块做解释,大体上分为主函数Process和PageRank算法流程PageRank函数
3.1 主函数
主函数输入有三个参数,TuGraph数据库参数db,从网页端获取的请求request,给网页端的返回值response,整体流程可以分为一下几步:
- 相关参数的获取
- 快照类的创建
- PageRank算法主流程
- 网页端返回值的获取和发送
extern "C" bool Process(GraphDB & db, const std::string & request, std::string & response) {
    
    // 从网页端请求中获取迭代次数(num_iterations),
    int num_iterations = 20;
    try {
        json input = json::parse(request);
        num_iterations = input["num_iterations"].get<int>();
    } catch (std::exception & e) {
        throw std::runtime_error("json parse error");
        return false;
    }
    // 读事务的创建以及快照类的创建
    auto txn = db.CreateReadTxn();
    OlapOnDB<Empty> olapondb(
        db,
        txn,
        SNAPSHOT_PARALLEL
    );
	
    // 创建pr数组用于存储每个节点的pr值
    ParallelVector<double> pr = olapondb.AllocVertexArray<double>();
    // pagerank算法主流程,获取每个节点的pagerank值
    PageRankCore(olapondb, num_iterations, pr);
    
    auto all_vertices = olapondb.AllocVertexSubset();
    all_vertices.Fill();
    /*
        函数用途:从所有节点中获取pagerank值最大的节点编号
    
        函数流程描述:该函数对点集合all_vertices中所有为1的位对应的节点vi(又称为活跃点)执行Func A,再将Func A的返回值作为Func B的第二个输入参数,得到局部最大值(因为第一个输入参数为0,因此实际上返回值就是每个节点的pagerank值),最后再将所有线程的返回值汇总,再次 执行Func B得到全局返回值,并存入max_pr_vi变量中
    */
    size_t max_pr_vi = olapondb.ProcessVertexActive<size_t>(
        
        //Func A
        [&](size_t vi) {
            return vi;
        },
        all_vertices,
        0,
        
        //Func B
        [&](size_t a, size_t b) {
            return pr[a] > pr[b] ? a : b;
        }
    );
    
    // 网页端返回值的获取和发送
    json output;
    output["max_pr_vid"] = olapondb.OriginalVid(max_pr_vi);
    output["max_pr_val"] = pr[max_pr_vi];
    response = output.dump();
    return true;
}
3.2 PageRank算法流程
pagerank主流程有两个输入参数,快照类(子图)还有迭代次数,整体流程可以分为以下几步:
- 相关数据结构的初始化
- 每个节点pagerank值的初始化
- 每个节点pagerank值的计算,活跃点为所有点(意味着所有点都需要计算pagerank值)
- 得到每个节点经过num_iterations次迭代后的pagerank值
void PageRankCore(OlapBase<Empty>& graph, int num_iterations, ParallelVector<double>& curr) {
    
    // 相关数据结构的初始化
    auto all_vertices = olapondb.AllocVertexSubset();
    all_vertices.Fill();
    auto curr = olapondb.AllocVertexArray<double>();
    auto next = olapondb.AllocVertexArray<double>();
    size_t num_vertices = olapondb.NumVertices();
    double one_over_n = (double)1 / num_vertices;
    // 每个节点pagerank值的初始化,和该节点的出度成反比
    double delta = graph.ProcessVertexActive<double>(
        [&](size_t vi) {
            curr[vi] = one_over_n;
            if (olapondb.OutDegree(vi) > 0) {
                curr[vi] /= olapondb.OutDegree(vi);
            }
            return one_over_n;
        },
        all_vertices);
    // 总迭代过程
    double d = (double)0.85;
        for (int ii = 0;ii < num_iterations;ii ++) {
        printf("delta(%d)=%lf\n", ii, delta);
        next.Fill((double)0);
        /*
            函数用途:计算所有节点的pagerank值
            函数流程描述:该函数用于计算所有节点的pagerank值,对all_vertices中所有为1的位对应的节点vi执行Func C,得到本轮迭代中vi的pagerank值,并返回vi节点的pagerank变化值,最终经过函数内部处理汇总所有活跃节点的总变化值并返回,该值被存储在delta变量中
        */
        delta = graph.ProcessVertexActive<double>(
            // Func C
            [&](size_t vi) {
                double sum = 0;
                // 从邻居中获取当前节点的pagerank值
                for (auto & edge : olapondb.InEdges(vi)) {
                    size_t src = edge.neighbour;
                    sum += curr[src];
                }
                next[vi] = sum;
                // pagerank值计算核心公式
                next[vi] = (1 - d) * one_over_n + d * next[vi];
                if (ii == num_iterations - 1) {
                    return (double)0;
                } else {
    
                    // 相关中间变量统计
                    if (olapondb.OutDegree(vi) > 0) {
                        next[vi] /= olapondb.OutDegree(vi);
                        return fabs(next[vi] - curr[vi]) * olapondb.OutDegree(vi);
                    } else {
                        return fabs(next[vi] - curr[vi]);
                    }
                }
            },
            all_vertices
        );
        // 将本轮迭代得到的pagerank值输出作为下一轮迭代的输入
        curr.Swap(next);
    }
}
4. 其他常用函数功能描述
4.1 事务的创建
//读事务的创建
auto txn = db.CreateReadTxn();
//写事务的创建
auto txn = db.CreateWriteTxn();
4.2 并行化创建有向图
OlapOnDB<Empty> olapondb(
    db,
    txn,
    SNAPSHOT_PARALLEL
)
4.3 并行化创建无向图
OlapOnDB<Empty> olapondb(
    db,
    txn,
    SNAPSHOT_PARALLEL | SNAPSHOT_UNDIRECTED
)
4.4 获取出度
size_t OutDegree(size_t vid)
4.5 获取入度
size_t InDegree(size_t vid)
4.6 获取出边集合
/*
    函数名称:AdjList<EdgeData> OutEdges(size_t vid)
    数据结构:
        AdjList 可以理解为类型为AdjUnit结构体的数组
        AdjUnit 有两个成员变量: 1. size_t neighbour 2. edge_data,其中neighbour表示该出边指向的目标节点编号,如果为有权图,则edge_data数据类型和输入文件中边的权重值相同,否则数据类型为Empty
    使用示例:输出节点vid的所有出度邻居
*/
for (auto & edge : olapondb.OutEdges(vid)) {
    size_t dst = edge.neighbour;
    printf("src = %lu,dst = %lu\n",vid,dst);
}