一、Hello world

1
2
3
4
5
6
7
8
9
#include <iostream>
#include <mpi.h>
using namespace std;
int main(){
MPI_Init(NULL,NULL);
cout << "hello world" << endl;
MPI_Finalize();
return 0;
}
1
mpicxx hello.cpp -o hello
1
mpirun -np 10 ./hello

代码结构:

  • 头文件
  • 初始化MPI环境:MPI_Init()
  • 分布式代码
  • 终止MPI环境:MPI_Finalize()
  • 结束

二、基本API

2.1 初始化环境 MPI_Init

1
2
3
4
5
6
int MPI_Init(int* argc,char* argv[]);
//参数argc_p和argc_v是指向参数argc和argv的指针,当程序不使用这些参数时可直接写为NULL。
//该函数的返回值一般为一个整型int,我们一般忽略。
//所以一般最常见的用法为:
MPI_Init(NULL,NULL);
//除此之外,该函数一般表示MPI函数的开始,在此之前不应该有其他MPI函数

2.2 是否初始化:MPI_Initialized

1
int MPI_Initialized(int *flag)

唯一可在MPI_Init前使用的函数,用来检测MPI系统是否已经初始化,已经调用MPI_Init,返回flag=true,否则返回flag=false

2.3 终止环境:MPI_Finalized

1
int MPI_Finalize(void)

在一个进程执行完其全部MPI函数调用后,将调用函数MPI_Finalize,从而让系统释放分配给MPI的资源(例如内存等)

2.4 获取进程数:MPI_Comm_size

1
int MPI_Comm_size(MPI_Comm comm, int* size)

通过调用函数来确定一个通信域中的进程总数

如果comm是MPI_COMM_WORLD,那就是当前程序能用的所有进程数

2.5 获取进程ID:MPI_Comm_rank

1
int MPI_Comm_rank(MPI_Comm comm, int* rank)

在一个有p个进程的通信域中,每一个进程有一个唯一的序号(ID号),取值为0~p-1

当MPI初始化后,每一个活动进程变成了一个叫MPI_COMM_WORLD的通信域中的成员。

2.6 获取程序运行的主机名

1
int MPI_Get_processor_name(char* name, int* resultlen)
  • name:返回名称
  • resultlen:返回名称所占用的字节

应提供参数name不少于MPI_MAX_PROCESSOR_NAME个字节的存储空间。

2.7 终止一个comm的所有进程:MPI_Abort

1
int MPI_Abort(MPI_Comm comm, int errorcode)

异常终止MPI程序,在出现了致命错误而希望异常终止MPI程序时执行,MPI系统会设法终止comm通信器中的所有进程,输入整形参数errocode,将被作为进程的退出码返回给系统。

2.8 同步栅栏函数 MPI_Barrier

1
void MPI_Barrier(MPI_Comm communicator)

可以用二叉树、双回环等方式实现,依靠的是Send、Recv阻塞机制

三、通信API

  • MPI提供了消息的缓存机制
  • 消息可以以阻塞或非阻塞的方式发送
  • 顺序性:MPI保证接收者收到消息的顺序和发送者的发送顺序一致
  • 公平性:MPI不保证调度公平性,程序员自己去防止进程饥饿

3.1 MPI消息数据类型

MPI 数据类型 C 语言数据类型 MPI 数据类型 C 语言数据类型
MPI_CHAR signed char MPI_UNSIGNED unsigned int
MPI_SHORT signed short int MPI_UNSIGNED_LONG unsigned long int
MPI_INT signed int MPI_FLOAT float
MPI_LONG signed long int MPI_DOUBLE double
MPI_LONG_LONG signed long long int MPI_LONG_DOUBLE long double
MPI_UNSIGNED_CHAR unsigned char MPI_BYTE
MPI_UNSIGNED_SHORT unsigned short int MPI_PACKED

3.2 点对点通信

status参数用于指出接收的消息的源和标记(status.MPI_SOURCE、status.MPI_TAG)如果不关心这些参数,可以用MPI_STATUS_IGNORE

  • 阻塞发送/接收:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
    // buf: 发送缓冲区起始地址
    // count: 发送消息的数据单元个数
    // datatype: 发送消息的数据类型
    // dest: 目标进程号
    // tag: 发送消息的标签
    // comm: 通信域
    int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)
    //status可以用MPI_STATUS_IGNORE忽略使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #include <stdio.h>
    #include <string.h>
    #include "mpi.h"
    void main(int argc, char* argv[])
    {
    int numprocs, myid, source;
    MPI_Status status;
    char message[100];
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    if (myid != 0) { //非0号进程发送消息
    strcpy(message, "Hello World!");
    MPI_Send(message, strlen(message) + 1, MPI_CHAR, 0, 99,
    MPI_COMM_WORLD);
    }
    else { // myid == 0,即0号进程接收消息
    for (source = 1; source < numprocs; source++) {
    MPI_Recv(message, 100, MPI_CHAR, source, 99,
    MPI_COMM_WORLD, &status);
    printf("接收到第%d号进程发送的消息:%s\n", source, message);
    }
    }
    MPI_Finalize();
    }
  • 非阻塞发送/接收:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request)
    // buf: 发送缓冲区起始地址
    // count: 发送消息的数据单元个数
    // datatype: 发送消息的数据类型
    // dest: 目标进程号
    // tag: 发送消息的标签
    // comm: 通信域
    // request: 非阻塞发送请求对象
    int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)
    //通过request可以知道Isend或Irecv的状态(是否完成)

    MPI_Wait用于等待某一个通信的完成,MPI_Waitall等待一组通信的完成

    1
    2
    int MPI_Wait(MPI_Request *request, MPI_Status *status)
    //status可用`MPI_STATUS_IGNORE`代替
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    #include <mpi.h>
    #include <iostream>
    #include <vector>
    int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (size != 2) {
    std::cerr << "This program requires exactly 2 MPI processes." << std::endl;
    MPI_Abort(MPI_COMM_WORLD, 1);
    }
    const int N = 10;
    std::vector<int> data(N);
    MPI_Request request;
    MPI_Status status;

    if (rank == 0) {
    for (int i = 0; i < N; ++i) {
    data[i] = i;
    }
    MPI_Isend(data.data(), N, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);
    MPI_Wait(&request, &status);
    } else if (rank == 1) {
    MPI_Irecv(data.data(), N, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
    MPI_Wait(&request, &status);
    std::cout << "Process 1: Data received." << std::endl;
    std::cout << "Process 1: Received data = ";
    for (int i = 0; i < N; ++i) {
    std::cout << data[i] << " ";
    }
    std::cout << std::endl;
    }
    MPI_Finalize();
    return 0;
    }

3.3 集合通信

3.3.1 广播 Bcast

从指定的一个根进程中把相同的数据广播发送给组中的所有其他进程

img
1
2
3
4
5
6
MPI_Bcast(
void *buffer,
int count,
MPI_Datatype datatype,
int root,
MPI_Comm comm)

for循环调用MPI_Send和MPI_Recv也能实现广播的效果,但是时间复杂度有很大的区别,for循环实现时间复杂度为O(n),Bcast实现时间复杂度为O(logn)

Bcast的实现示意图:

MPI_Bcast tree
  • T0时刻:0号进程将数据传递给1号进程
  • T1时刻:0号进程和1号进程都有了数据,0号进程将数据发送给2号进程,1号进程发送给3号进程
  • T2时刻:0->4,1->5,2->6,3->7

3.3.2 分发 Scatter

把指定的根进程中的数据分散发送给组中的所有进程(包括自己

img
1
2
3
4
MPI_Scatter(
void* send_data,int send_count,MPI_Datatype send_datatype,
void* recv_data,int recv count,MPI_Datatype recv_datatype,
int root,MPI_Comm communicator)

3.3.3 收集 Gather

在组中指定一个进程收集组中进程发来的消息,这个函数操作与MPI_Scatter函数操作相反 所有进程调用该函数,把指定位置的数据发送给根进程的指定位置

img
1
2
3
4
MPI_Gather(
void *sendbuf,int sent_count,MPI_Datatype send_datatype,
void *recv_data,int recv_count,MPI_Datatype recv_datatype,
int root,MPI_Comm communicator)

3.3.4 全收集 Allgather

将所有的数据聚合到每个进程中。

image-20240526132009884
1
2
3
4
MPI_Allgather(
void* send_data,int send_count,MPI_Datatype send_datatype,
void* recv_data,int recv_count,MPI_Datatype recv_datatype,
MPI_Comm communicator)

与gather的区别就是没有root参数

3.3.5 规约 Reduce

将每个进程中的数据按给定的操作op进行运算,并将其结果返回到序列号为root的进程。

image-20240526132613368

可以处理多个值

img
1
2
3
4
5
6
7
8
MPI_Reduce(
void *send_data,
void *recv_data,
int count,
MPI_Datatype datatype,
MPI_Op operator,/*操作*/
int root,
MPI_Comm comm);
opration

3.3.6 全规约 Allreduce

MPI,OpenMPI与深度学习-有驾
1
2
3
4
5
6
7
MPI_Allreduce(
void* send_data,
void* recv_data,
int count,
MPI_Datatype datatype,
MPI_Op op,
MPI_Comm communicator)

3.4 通信域和进程组

进程组就是一组并行运行的MPI进程的集合,同一进程可以属于不同的进程组

一个进程组可以包含任意数量的进程,而一个通信域由一个进程组构成,但一个进程组可以用于创建多个不同的通信域。

在之前的MPI学习中一直使用的通信域MPI_COMM_WORLD所对应的进程组就是全体进程的集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <mpi.h>
#include <stdio.h>

int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);

int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

// 获取默认通信域的进程组
MPI_Group world_group;
MPI_Comm_group(MPI_COMM_WORLD, &world_group);

// 创建包含前半部分进程的新进程组
int half_size = size / 2;
int *ranks = (int *)malloc(half_size * sizeof(int));
for (int i = 0; i < half_size; i++) {
ranks[i] = i;
}

MPI_Group new_group;
MPI_Group_incl(world_group, half_size, ranks, &new_group);

// 使用新进程组创建新的通信域
MPI_Comm new_comm;
MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);

// 在新通信域中进行操作
if (new_comm != MPI_COMM_NULL) {
int new_rank, new_size;
MPI_Comm_rank(new_comm, &new_rank);
MPI_Comm_size(new_comm, &new_size);

int data = new_rank;
// 广播数据
MPI_Bcast(&data, 1, MPI_INT, 0, new_comm);
printf("Process %d in new_comm received data %d\n", new_rank, data);

// 规约操作
int sum;
MPI_Reduce(&data, &sum, 1, MPI_INT, MPI_SUM, 0, new_comm);
if (new_rank == 0) {
printf("Sum of ranks in new_comm: %d\n", sum);
}
}

// 释放通信域
MPI_Comm_free(&new_comm);

// 释放进程组
MPI_Group_free(&world_group);
MPI_Group_free(&new_group);

MPI_Finalize();
return 0;
}