# code

CUDA Stream and CUDA Graph

📅 2026-02-13 ✍️ Layla

CUDA Stream

在解释什么是CUDA Stream之前,我们先来考虑一个单任务在执行的流程:

时间轴:
───────────────────────────────────────────────────────────►
CPU:  [准备数据A]               [处理结果]        [空闲]
                  ↓                    ↑
GPU:          [传输A]  [计算A]  [传回A]    [空闲GPU]
              └────────串行执行────────┘

在这种模式下,所有的操作都排成一条直线。你会发现,在任何一个垂直的时间点上,只有一个硬件单元在干活。当 GPU 在算数时,PCIe 传输通道(H2D/D2H)完全在带薪休假;而当数据传输时,昂贵的计算核心又在摸鱼。

既然PCIe和计算核心是两个不同的硬件单元,那为什么不能以流水线的方式来异步执行呢?

时间轴:
─────────────────────────────────────────────────►
Stream 0:  [传A0] [算A0] [回A0]
Stream 1:         [传A1] [算A1] [回A1]
Stream 2:                [传A2] [算A2] [回A2]
Stream 3:                       [传A3] [算A3] [回A3]
           └──────重叠!并发!──────┘

在这种方式下,PCIe和计算单元都在一刻不停地工作。

什么是Stream?

你可以把 GPU 想象成一个拥有成千上万个工人的庞大工厂,而 CUDA Stream 就是工厂里的一条条独立的作业流水线

在默认情况下,如果你不特意去创建 stream ,所有的任务(无论是搬运数据还是启动计算核函数)都会被扔进一个默认的“0号 Stream ”里。这就好比整个工厂只有一条传送带,不管你是要把原材料运进来,还是要把零件组装起来,所有人都得在这条传送带上按顺序排队。

这就解释了为什么 Stream 的本质其实是一个硬件指令队列。当你把一个任务放入某个 Stream 时,你实际上是在说:“在这个 Stream 里,必须先完成 A 才能做 B。”但是——这才是重点——不同 Stream 之间的任务是没有必然先后顺序的。如果不特意去创建Stream,就会出现上面所说的第一种情况:数据传输与数据计算在串行执行,效率十分低下。

逻辑独立性与物理竞争

虽然在软件层面 Stream 是完全独立的,但在物理层面,它们还是在竞争同一块蛋糕。比如,所有的 Stream 都要共用 PCIe 总线的带宽,所有的核函数都要去抢那几个显存控制器和计算单元。

这就解释了为什么 Stream 的数量并不是越多越好。如果你开了 1000 个 Stream ,但 PCIe 带宽已经被前 5 个流填满了,剩下的 995 个 Stream 也只能排队等待带宽。当多个 Stream 的核函数同时运行时,如果它们加起来需要的寄存器或共享内存超过了 GPU 的上限,硬件也会根据优先级进行排队。


Pinned Memory

这里我必须再强调一个容易被忽视的细节:如果你想让 Stream 发挥作用,Host 端的内存必须是 Pinned Memory

什么是Pinned Memory?简单来说,Pinned Memory(锁页内存,也叫 Page-locked Memory)就是一块在物理内存中被“钉死”的区域。

在普通的 C++ 编程中,你用 new 或者 malloc 申请的内存叫做 Pageable Memory(可分页内存)。为了让有限的内存装下更多的程序,操作系统有个很聪明的招数:虚拟内存管理。如果你有一块内存暂时没用,操作系统会把它偷偷搬到硬盘的交换区(Swap Space),把宝贵的物理内存腾出来。当你下次再访问它时,系统再把它搬回来。

这意味着,普通内存的物理地址是会随时变化的。

这就解释了为什么 GPU 搬运数据时会遇到麻烦。GPU 搬运数据主要靠 DMA(直接内存访问)引擎,这个引擎很死板,它必须知道确切的物理地址才能干活。如果你在搬运过程中,操作系统突然把这块内存“分页”到了硬盘,或者换了个物理位置,DMA 就会抓瞎,甚至导致系统崩溃。

为了安全起见,当你使用普通的 cudaMemcpy 搬运 Pageable Memory 时,CUDA 驱动程序其实在后台偷偷做了两步工作:

  1. 先申请一块临时的 Pinned Memory。
  2. 把你的数据从普通内存拷贝到这块 Pinned Memory,然后再由 DMA 把它推向 GPU。

这就产生了一次额外的内存拷贝开销!而如果你直接申请 Pinned Memory(在 CUDA 里使用 cudaHostAlloc),你实际上是和操作系统签了一个协议:“这块地我要了,无论发生什么,你都不准把它挪走,也不准把它存到硬盘里。”

因为物理地址固定了,DMA 引擎可以直接从这块内存抓数据往 GPU 塞,跳过了中间商赚差价。这就是为什么 Pinned Memory 的传输带宽通常比普通内存高得多。

更核心的一点是,如果想实现 Stream 异步重叠,Pinned Memory 是强制性的前提。

由于 Pinned Memory 不需要 CPU 参与数据搬运的准备工作(因为物理地址已知且固定),CPU 只要下个令,DMA 就可以独立完成任务。这就释放了 CPU,让它能立刻去调度其他的 Stream。如果用的是普通内存,CPU 会被强行留下来配合驱动程序做同步锁定,异步 Stream 也就退化成了一个 Stream 串行执行。

不过,这么好的东西为什么不全部都用?

因为 Pinned Memory 是直接抢占物理内存份额的。它不能被交换到硬盘,意味着如果你申请了太多的 Pinned Memory,系统的可用内存就会急剧萎缩。所以,我们通常只对那些频繁需要与 GPU 交换数据的核心缓冲区使用 cudaHostAlloc

// 错误:异步传输用普通内存(不会报错,但不异步!)
float *h_data = (float*)malloc(size);
cudaMemcpyAsync(d_data, h_data, size, ..., stream);  // 实际是同步!

// 正确:异步传输用Pinned Memory
float *h_data;
cudaMallocHost(&h_data, size);  // Pinned Memory
cudaMemcpyAsync(d_data, h_data, size, ..., stream);  // 真正异步

Stream 异步实现多batch矩阵乘法

假设我们要计算100个矩阵A和同一个矩阵B的计算结果,也就是100个矩阵C。在同步执行中,我们需要将这100次矩阵乘法串行执行。

下面我们来看看如何用 Stream 异步实现。

首先是kernel:

__global__ void gemm_naive(float* A, float* B, float* C, int M, int N, int K) {
    const int idx = threadIdx.x + blockDim.x * blockIdx.x;
    const int idy = threadIdx.y + blockDim.y * blockIdx.y;

    if (idy < M && idx < N) {
        float temp = 0;
        for (int k = 0; k < K; k++) {
            // 核心逻辑:A的行乘以B的列
            temp += A[idy * K + k] * B[k * N + idx];
        }
        C[idy * N + idx] = temp;
    }
}

这里就不多讲了。

批次大小,以及三个矩阵的维度:

const int num_batch = 100;
const int M = 1024, N = 1024, K = 512;
const size_t Asize = M * K, Bsize = K * N, Csize = M * N;

使用 Pinned Memory 分配 Host 端内存。

注意这里的 h_A 和 h_C,实际上是一个二维数组[num_batch][size]压成了一维。为什么写成二维数组然后循环分配内存呢?因为调用一次malloc开销太大了!不如提前把所有的都分配好。

float *h_A, *h_B, *h_C;
cudaMallocHost((void**)&h_A, num_batch * Asize * sizeof(float));
cudaMallocHost((void**)&h_B, Bsize * sizeof(float)); // B 是共享的
cudaMallocHost((void**)&h_C, num_batch * Csize * sizeof(float));

初始化数据:

init_data(h_A, num_batch * Asize);
init_data(h_B, Bsize);

预先分配 Device 端内存,避免在循环中 malloc:

float *d_A, *d_B, *d_C;
cudaMalloc((void**)&d_A, num_batch * Asize * sizeof(float));
cudaMalloc((void**)&d_B, Bsize * sizeof(float));
cudaMalloc((void**)&d_C, num_batch * Csize * sizeof(float));
// 先把共享的 B 矩阵传过去
cudaMemcpy(d_B, h_B, Bsize * sizeof(float), cudaMemcpyHostToDevice);

创建 Stream Pool:

const int pool_size = 8;
cudaStream_t streams[pool_size];
for (int i = 0; i < pool_size; i++) cudaStreamCreate(&streams[i]);

执行异步流水线:

dim3 blocksize(32, 32);
dim3 gridsize((N + blocksize.x - 1) / blocksize.x, (M + blocksize.y - 1) / blocksize.y);

for (int i = 0; i < num_batch; i++) {
    int s_idx = i % pool_size;	// 每个batch 循环使用 stream pool

    float* cur_h_A = h_A + i * Asize;
    float* cur_d_A = d_A + i * Asize;
    float* cur_d_C = d_C + i * Csize;
    float* cur_h_C = h_C + i * Csize;
	// 异步传输数据(使用PCIe总线 H2D)
    cudaMemcpyAsync(cur_d_A, cur_h_A, Asize * sizeof(float), cudaMemcpyHostToDevice, streams[s_idx]);
	// 计算
    gemm_naive<<<gridsize, blocksize, 0, streams[s_idx]>>>(cur_d_A, d_B, cur_d_C, M, N, K);
	// 异步传输数据(使用PCIe总线 D2H)
    cudaMemcpyAsync(cur_h_C, cur_d_C, Csize * sizeof(float), cudaMemcpyDeviceToHost, streams[s_idx]);
}

最后是验证结果与释放内存:

// 等待所有任务完成并清理
cudaDeviceSynchronize();

std::cout << "正在验证第 0 个 Batch 的计算结果..." << std::endl;
bool is_correct = verify_result(h_A, h_B, h_C, M, N, K);

if (is_correct) {
    std::cout << "验证通过!" << std::endl;
} else {
    std::cout << "错误!" << std::endl;
}

for (int i = 0; i < pool_size; i++) cudaStreamDestroy(streams[i]);
cudaFree(d_A); cudaFree(d_B); cudaFree(d_C);
cudaFreeHost(h_A); cudaFreeHost(h_B); cudaFreeHost(h_C);

完整代码:

#include <cuda_runtime.h>
#include <iostream>
#include <vector>

__global__ void gemm_naive(float* A, float* B, float* C, int M, int N, int K) {
    const int idx = threadIdx.x + blockDim.x * blockIdx.x;
    const int idy = threadIdx.y + blockDim.y * blockIdx.y;

    if (idy < M && idx < N) {
        float temp = 0;
        for (int k = 0; k < K; k++) {
            temp += A[idy * K + k] * B[k * N + idx];
        }
        C[idy * N + idx] = temp;
    }
}

void init_data(float* data, size_t size) {
    for (size_t i = 0; i < size; ++i) {
        data[i] = static_cast<float>(rand() % 5);
    }
}

bool verify_result(float* host_A, float* host_B, float* gpu_res, int M, int N, int K) {
    for (int i = 0; i < M; ++i) {
        for (int j = 0; j < N; ++j) {
            float cpu_sum = 0;
            for (int k = 0; k < K; ++k) {
                cpu_sum += host_A[i * K + k] * host_B[k * N + j];
            }
            if (std::abs(cpu_sum - gpu_res[i * N + j]) > 1e-5) {
                std::cout << "验证失败! 位置: [" << i << "," << j << "] "
                          << "CPU: " << cpu_sum << " GPU: " << gpu_res[i * N + j] << std::endl;
                return false;
            }
        }
    }
    return true;
}

int main() {
    const int num_batch = 100;
    const int M = 1024, N = 1024, K = 512;
    const size_t Asize = M * K, Bsize = K * N, Csize = M * N;

    // 使用 Pinned Memory 分配 Host 端内存
    float *h_A, *h_B, *h_C;
    cudaMallocHost((void**)&h_A, num_batch * Asize * sizeof(float));
    cudaMallocHost((void**)&h_B, Bsize * sizeof(float)); // B 是共享的
    cudaMallocHost((void**)&h_C, num_batch * Csize * sizeof(float));

    // 初始化数据
    init_data(h_A, num_batch * Asize);
    init_data(h_B, Bsize);

    // 预先分配 Device 端内存,避免在循环中 malloc
    float *d_A, *d_B, *d_C;
    cudaMalloc((void**)&d_A, num_batch * Asize * sizeof(float));
    cudaMalloc((void**)&d_B, Bsize * sizeof(float));
    cudaMalloc((void**)&d_C, num_batch * Csize * sizeof(float));

    // 先把共享的 B 矩阵传过去
    cudaMemcpy(d_B, h_B, Bsize * sizeof(float), cudaMemcpyHostToDevice);

    const int pool_size = 8;
    cudaStream_t streams[pool_size];
    for (int i = 0; i < pool_size; i++) cudaStreamCreate(&streams[i]);

    // 执行异步流水线
    dim3 blocksize(32, 32);
    dim3 gridsize((N + blocksize.x - 1) / blocksize.x, (M + blocksize.y - 1) / blocksize.y);

    for (int i = 0; i < num_batch; i++) {
        int s_idx = i % pool_size;
        
        float* cur_h_A = h_A + i * Asize;
        float* cur_d_A = d_A + i * Asize;
        float* cur_d_C = d_C + i * Csize;
        float* cur_h_C = h_C + i * Csize;

        cudaMemcpyAsync(cur_d_A, cur_h_A, Asize * sizeof(float), cudaMemcpyHostToDevice, streams[s_idx]);
        
        gemm_naive<<<gridsize, blocksize, 0, streams[s_idx]>>>(cur_d_A, d_B, cur_d_C, M, N, K);
        
        cudaMemcpyAsync(cur_h_C, cur_d_C, Csize * sizeof(float), cudaMemcpyDeviceToHost, streams[s_idx]);
    }

    // 等待所有任务完成并清理
    cudaDeviceSynchronize();
    
    std::cout << "正在验证第 0 个 Batch 的计算结果..." << std::endl;
    bool is_correct = verify_result(h_A, h_B, h_C, M, N, K);

    if (is_correct) {
        std::cout << "验证通过!" << std::endl;
    } else {
        std::cout << "错误!" << std::endl;
    }

    for (int i = 0; i < pool_size; i++) cudaStreamDestroy(streams[i]);
    cudaFree(d_A); cudaFree(d_B); cudaFree(d_C);
    cudaFreeHost(h_A); cudaFreeHost(h_B); cudaFreeHost(h_C);
}

CUDA Graph

什么是CUDA Graph

在介绍完 Stream 之后,继续写点与 Stream 有着密切连续的另一项技术:CUDA Graph

CUDA Graph 实际上是为了解决 CPU 与 GPU 之间的沟通内耗问题。当我们让 GPU 执行任务时,每进行一次计算,CPU 都要发送一个指令(比如启动一个 Kernel、拷贝一段内存),GPU 接到指令后开始干活,干完了再等下一个。这种“下达命令——执行——再下达命令”的流式模式(Stream)虽然灵活,但在处理那种由成百上千个小任务组成的复杂任务时,问题就暴露出来了。这时候,最拖后腿的往往不是 GPU 的算力,而是 CPU 准备发射指令的延迟(Launch Overhead)。在这种情况下,CPU 与 GPU 之间的指令通信消耗的时间远大于 GPU kernel 计算所消耗的时间。

CUDA Graph 的核心思想就是预编译和重用。简单来说,它允许我们将一连串的 GPU 操作(包括 Kernel 启动、内存拷贝、信号量等待等)定义为一个完整的、有向无环图(DAG)。在这个图里,节点代表任务,边代表任务之间的依赖关系。这种模式将任务的定义和执行完全解耦了。你只需要在程序初始化阶段,把整个复杂的运算逻辑录制下来,形成一个静态的可执行图。这就好比你提前规划好了工厂的自动化流水线,每一台机器什么时候启动、零件怎么流转都定死了。一旦这条流水线(Graph)建立好并上传到 GPU 的显存里,CPU 以后只需要下达一个极其简单的指令:“启动那条流水线”。这就解释了为什么 CUDA Graph 能显著降低 CPU 的开销,因为它把原本成百上千次的交互压缩成了一次点击。

不过,这种高性能并不是到处都适用。正如刚才提到的“自动化流水线”,它的优势在于稳定和高效,但劣势在于死板。如果计算逻辑经常变动,比如模型的分支走向依赖于实时的计算结果,或者输入的张量形状变幻莫测,那么重新捕获和构建 Graph 的开销反而会得不偿失。

录制CUDA Graph的流程

和上一个例子相同,使用 stream 和 cuda graph 来运行100次矩阵乘法。

前期准备工作

int main() {
    const int num_batch = 100;
    const int M = 1024, N = 1024, K = 512;
    const size_t Asize = M * K, Bsize = K * N, Csize = M * N;

    // 使用 Pinned Memory 分配 Host 端内存
    float *h_A, *h_B, *h_C;
    cudaMallocHost((void**)&h_A, num_batch * Asize * sizeof(float));
    cudaMallocHost((void**)&h_B, Bsize * sizeof(float)); // B 是共享的
    cudaMallocHost((void**)&h_C, num_batch * Csize * sizeof(float));

    // 初始化数据
    init_data(h_A, num_batch * Asize);
    init_data(h_B, Bsize);

    // 预先分配 Device 端内存,避免在循环中 malloc
    float *d_A, *d_B, *d_C;
    cudaMalloc((void**)&d_A, num_batch * Asize * sizeof(float));
    cudaMalloc((void**)&d_B, Bsize * sizeof(float));
    cudaMalloc((void**)&d_C, num_batch * Csize * sizeof(float));

    // 先把共享的 B 矩阵传过去
    cudaMemcpy(d_B, h_B, Bsize * sizeof(float), cudaMemcpyHostToDevice);

    // 初始化stream pool
    const int pool_size = 8;
    cudaStream_t graph_stream_pool[pool_size];
    for (int i = 0; i < pool_size; ++i)
    {
        cudaStreamCreate(&graph_stream_pool[i]);
    }
    
	...
        
}

step1:capture

在前期操作中,程序的首要意图是建立有向无环图的根节点以及初期的发散结构。通过在主干流(graph_stream_pool[0])上调用 cudaStreamBeginCapture,开启了一个全局的录制会话,主干流就顺理成章地成为了整张执行图的唯一入口。这就解释了为什么后续的所有操作都能被底层系统准确地追踪并关联起来。

// step1: capture
cudaGraph_t graph;
cudaGraphExec_t graphExec;
cudaStreamBeginCapture(graph_stream_pool[0], cudaStreamCaptureModeGlobal);

step2:fork

创建了一个 fork_event,并在主干流中记录下它,随后强制让池子里的所有其他辅助流去等待这个事件。在常规的异步执行模式下,这只是一种普通的跨流同步机制,但在录制模式中,CUDA 的捕获引擎会把所有执行了 wait 操作的辅助流自动纳入到当前的捕获上下文中。从这一刻起,原本单线的逻辑就在图表里正式分叉,变成了多条可以并行推进的高速分支。

// step2: fork
cudaEvent_t fork_event;
cudaEventCreate(&fork_event);
cudaEventRecord(fork_event, graph_stream_pool[0]);

for (int i = 1; i < pool_size; ++i)
{
    cudaStreamWaitEvent(graph_stream_pool[i], fork_event);
}

这么说也许会有点抽象,我最开始没有理解到底什么是cudaEvent,以及cudaStreamWaitEvent的作用是什么。下面来详细讲一下。

先来解决,cudaEvent是干啥的?

在 CUDA 的异步执行架构中,所有的指令(比如核函数启动、内存拷贝)都是像流水线一样发给 GPU 的。由于 CPU 发送指令的速度远快于 GPU 执行的速度,CPU 往往不知道 GPU 到底干到哪一步了。

cudaEvent_t 就是为了解决这个“信息不对称”而存在的。它是一个硬件级别的信号量。当你在代码里创建一个 Event 时,你实际上是在显存里申请了一个极小的空间,用来存放一个状态位。这个状态位只有两个意思:“还没到这儿”或者“已经办完了”。而当你调用cudaEventRecord时,它会触发一个硬件信号,把这个 Event 的状态从“未完成”改为“已完成”。例如cudaEventRecord(fork_event, graph_stream_pool[0]);,意思就是“0号stream的fork_event已经完成了”。

弄明白了cudaEvent,接下来就是循环里的cudaStreamWaitEvent

在普通的非捕获模式下,这只是简单的流同步。但在 Capture 模式 下,这一步操作会产生一个连锁反应:CUDA 发现这些子流正在等待一个“处于捕获状态的流”所发出的信号。为了维持逻辑的完整性,CUDA 会自动将这些子流也拉进捕获状态。通过 WaitEvent建立了一种显式的依赖关系:所有子流的任务都必须在 fork_event 之后开始。这给 CUDA 编译器提供了一个极其清晰的信号——“这些流的任务没有先后顺序,它们只依赖于主流的起点”。而子流与主流产生了依赖关系,也就被capture一并捕获了。

当完成这个同步动作之后,捕获器的视角就瞬间拓展开了。原本它只记录 0 号流,现在它会顺着 fork_event 这条线,把 1 号、2 号、一直到最后号流里的动作全部纳入镜头范围。

step3:使用所有的stream处理batch

这一步和前面Steam中的处理方式一模一样,这里就不再赘述了。

// step3: 使用所有的stream处理batch
dim3 blocksize(32, 32);
dim3 gridsize((N + blocksize.x - 1) / blocksize.x, (M + blocksize.x - 1) / blocksize.x);

for (int i = 0; i < num_batch; ++i)
{
    int stream_idx = i % pool_size;
    float* curr_h_A = h_A + i * Asize;
    float* curr_h_C = h_C + i * Csize;
    float* curr_d_A = d_A + i * Asize;
    float* curr_d_C = d_C + i * Csize;

    cudaMemcpyAsync(curr_d_A, curr_h_A, Asize * sizeof(float), cudaMemcpyHostToDevice, graph_stream_pool[stream_idx]);
    gemm_naive<<<gridsize, blocksize, 0, graph_stream_pool[stream_idx]>>>(curr_d_A, d_B, curr_d_C, M, N, K);
    cudaMemcpyAsync(curr_h_C, curr_d_C, Csize * sizeof(float), cudaMemcpyDeviceToHost, graph_stream_pool[stream_idx]);
}

step4:join

这一步与前面的fork相对应。有了前面的解释,下面的代码就很清晰了。

// step4: join
std::vector<cudaEvent_t> join_event(pool_size);
for (int i = 0; i < pool_size; ++i)
{
    cudaEventCreate(&join_event[i]);
    cudaEventRecord(join_event[i], graph_stream_pool[i]);
}
for (int i = 1; i < pool_size; ++i) cudaStreamWaitEvent(graph_stream_pool[0], join_event[i], 0);

当每个 stream 的任务完成时,我们给每个 stream 都创建一个join_event并Record。主流(graph_stream_pool[0])得等待其余所有子流的join_event完成,然后收束在一起。这就解释了为什么 Graph 能够知道什么时候才算真正执行完。通过这种循环等待,拓扑图里建立了一组多对一的依赖关系

为什么要在捕获结束前做这件事?

如果你不做这一步就直接调用 cudaStreamEndCapture,那么捕获器可能会认为这张图在 0 号流任务结束时就完成了。结果就是,当你启动(Launch)这个 Graph 时,由于子流的任务没有被汇聚回来,主流可能会在子流还没算完的时候就“早退”了,导致后续获取的数据出现错误。

step5:结束capture, 实例化graph

// step5: 结束capture, 实例化graph
cudaStreamEndCapture(graph_stream_pool[0], &graph);
cudaGraphInstantiate(&graphExec, graph, nullptr, nullptr, 0);

这一部分挺简单的。讲讲cudaGraphInstantiate函数的最后三个参数吧。

  • 第三个参数 (cudaGraphNode_t *errorNode): 这是一个“指控位”。如果实例化过程因为某个节点配置错误(比如 Kernel 参数不对或者依赖关系自相矛盾)而失败,CUDA 会把那个导致失败的具体节点句柄写进这个指针。如果你传了 nullptr,你就只知道失败了,却不知道是图里的哪块砖出了问题。在调试几千个节点的巨型图时,这个参数就是救命稻草。

  • 第四个参数 (char *logBuffer): 这是“诊断书”。它指向一个你预先分配好的字符数组。当实例化出错时,CUDA 会在这里写下详细的错误文本描述,告诉你是显存不足、参数非法还是拓扑冲突。配合 errorNode 使用,能让你瞬间锁定 Bug 的根源。

  • 第五个参数 (size_t bufferSize): 这很简单,就是你给上面那个 logBuffer 分配的大小。如果你把第四个参数设为 nullptr,这个值自然也就填 0 了。

step6:launch

// step6: launch
cudaGraphLaunch(graphExec, graph_stream_pool[0]);

第二个参数stream应该填之前capture的主流。

完整代码:

int main() {
    const int num_batch = 100;
    const int M = 1024, N = 1024, K = 512;
    const size_t Asize = M * K, Bsize = K * N, Csize = M * N;

    // 使用 Pinned Memory 分配 Host 端内存
    float *h_A, *h_B, *h_C;
    cudaMallocHost((void**)&h_A, num_batch * Asize * sizeof(float));
    cudaMallocHost((void**)&h_B, Bsize * sizeof(float)); // B 是共享的
    cudaMallocHost((void**)&h_C, num_batch * Csize * sizeof(float));

    // 初始化数据
    init_data(h_A, num_batch * Asize);
    init_data(h_B, Bsize);

    // 2. 预先分配 Device 端内存,避免在循环中 malloc
    float *d_A, *d_B, *d_C;
    cudaMalloc((void**)&d_A, num_batch * Asize * sizeof(float));
    cudaMalloc((void**)&d_B, Bsize * sizeof(float));
    cudaMalloc((void**)&d_C, num_batch * Csize * sizeof(float));

    // 先把共享的 B 矩阵传过去
    cudaMemcpy(d_B, h_B, Bsize * sizeof(float), cudaMemcpyHostToDevice);

    const int pool_size = 8;
    cudaStream_t graph_stream_pool[pool_size];
    for (int i = 0; i < pool_size; ++i)
    {
        cudaStreamCreate(&graph_stream_pool[i]);
    }

    cudaGraph_t graph;
    cudaGraphExec_t graphExec;
    // step1: capture
    cudaStreamBeginCapture(graph_stream_pool[0], cudaStreamCaptureModeGlobal);

    // step2: fork
    cudaEvent_t fork_event;
    cudaEventCreate(&fork_event);
    cudaEventRecord(fork_event, graph_stream_pool[0]);

    for (int i = 1; i < pool_size; ++i)
    {
        cudaStreamWaitEvent(graph_stream_pool[i], fork_event);
    }

    // step3: 使用所有的stream处理batch
    dim3 blocksize(32, 32);
    dim3 gridsize((N + blocksize.x - 1) / blocksize.x, (M + blocksize.x - 1) / blocksize.x);

    for (int i = 0; i < num_batch; ++i)
    {
        int stream_idx = i % pool_size;
        float* curr_h_A = h_A + i * Asize;
        float* curr_h_C = h_C + i * Csize;
        float* curr_d_A = d_A + i * Asize;
        float* curr_d_C = d_C + i * Csize;

        cudaMemcpyAsync(curr_d_A, curr_h_A, Asize * sizeof(float), cudaMemcpyHostToDevice, graph_stream_pool[stream_idx]);
        gemm_naive<<<gridsize, blocksize, 0, graph_stream_pool[stream_idx]>>>(curr_d_A, d_B, curr_d_C, M, N, K);
        cudaMemcpyAsync(curr_h_C, curr_d_C, Csize * sizeof(float), cudaMemcpyDeviceToHost, graph_stream_pool[stream_idx]);
    }

    // step4: join
    std::vector<cudaEvent_t> join_event(pool_size);
    for (int i = 0; i < pool_size; ++i)
    {
        cudaEventCreate(&join_event[i]);
        cudaEventRecord(join_event[i], graph_stream_pool[i]);
    }
    for (int i = 1; i < pool_size; ++i) cudaStreamWaitEvent(graph_stream_pool[0], join_event[i], 0);

    // step5: 结束capture, 实例化graph
    cudaStreamEndCapture(graph_stream_pool[0], &graph);
    cudaGraphInstantiate(&graphExec, graph, nullptr, nullptr, 0);

    // step6: launch
    cudaGraphLaunch(graphExec, graph_stream_pool[0]);

    // 等待所有任务完成并清理
    cudaStreamSynchronize(graph_stream_pool[0]);
    
    std::cout << "正在验证第 0 个 Batch 的计算结果..." << std::endl;
    bool is_correct = verify_result(h_A, h_B, h_C, M, N, K);

    if (is_correct) {
        std::cout << "验证通过!" << std::endl;
    } else {
        std::cout << "错误!" << std::endl;
    }

    for (int i = 0; i < pool_size; i++) cudaStreamDestroy(graph_stream_pool[i]);
    for (int i = 0; i < pool_size; i++) cudaEventDestroy(join_event[i]);
    cudaEventDestroy(fork_event);
    cudaGraphDestroy(graph);
    cudaFree(d_A); cudaFree(d_B); cudaFree(d_C);
    cudaFreeHost(h_A); cudaFreeHost(h_B); cudaFreeHost(h_C);
}

本文参考:https://github.com/hengshan/Cuda-Tutorials/blob/main/%E8%AE%B2%E4%B9%89-%E7%AC%AC11%E8%AF%BE.md

# 研究讨论 / Discussions