# 在训练作业中使用checkpoint

在机器学习模型训练过程中，往往需要较长的时间完成训练数据的迭代，实现模型的收敛，然而训练过程可能会因为各种原因中断，例如机器故障、网络问题、或是代码原因等。为了避免中断后需要重头开始训练，开发者通常会在训练过程中，定期将模型的状态保存为`checkpoint`文件，以便在训练中断后，能够从保存的`checkpoint`文件获取模型参数，优化器状态，训练步数等训练状态，恢复训练。

本文档介绍如何在PAI的训练作业中使用checkpoint。


## 准备工作

我们需要首先安装PAI Python SDK以运行本示例。

In [None]:
!python -m pip install --upgrade alipai



SDK 需要配置访问阿里云服务需要的 AccessKey，以及当前使用的工作空间和OSS Bucket。在 PAI Python SDK 安装之后，通过在 **命令行终端** 中执行以下命令，按照引导配置密钥，工作空间等信息。


```shell

# 以下命令，请在 命令行终端 中执行.

python -m pai.toolkit.config

```


我们可以通过以下代码验证当前的配置。

## 使用checkpoint保存和恢复训练作业

当使用SDK提供的`pai.estimator.Estimator` 提交训练作业时，训练作业默认会挂载用户的OSS Bucket路径到训练作业的`/ml/output/checkpoints`目录。训练代码可以将checkpoint文件写出到对应的路径，从而保存到OSS中。提交训练作业之后，可以通过 `estimator.checkpoints_data()` 方法可以获取`checkpoints`保存的OSS路径。

当需要使用已有的`checkpoint`时，用户可以通过 `checkpoints_path` 参数指定一个OSS Bucket路径，PAI会将该路径挂载到训练作业的`/ml/output/checkpoints`目录，训练作业可以通过读取对应数据路径下的checkpoint文件来恢复训练。



```python

from pai.estimator import Estimator


# 1. 使用默认的checkpoints路径保存模型的checkpoints
est = Estimator(
	image_uri="<TrainingImageUri>",
	command="python train.py",
)

# 训练作业默认会挂载一个OSS Bucket路径到 /ml/output/checkpoints
# 用户训练代码可以通过写文件到 /ml/output/checkpoints 保存checkpoint
est.fit()

# 查看训练作业的checkpoints路径
print(est.checkpoints_data())

# 2. 使用其他训练作业产出的checkpoints恢复训练
est_load = Estimator(
	image_uri="<TrainingImageUri>",
	command="python train.py",
	# 指定使用上一个训练作业输出的checkpoints.
	checkpoints_path=est.checkpoints_data(),
)

# 训练代码从 /ml/output/checkpoints 中加载checkpoint
est_load.fit()

```





## 在PyTorch中使用checkpoint

在PyTorch中，通常使用`torch.save`方法将模型的参数、优化器的状态、训练进度等信息，以字典的形式作为`checkpoint`进行保存。保存的`checkpoint`文件可以通过 `torch.load` 进行加载。PyTorch提供了如何在训练中保存和加载checkpoint的教程：[Save And Loading A General Checkpoint In PyTorch](https://pytorch.org/tutorials/recipes/recipes/saving_and_loading_a_general_checkpoint.html)。

我们将基于PyTorch的示例教程，演示如何在PAI的训练作业中使用checkpoint。


训练作业使用的代码如下:

1. 在训练开始之前，通过 `/ml/output/checkpoints/` 路径加载checkpoint获取初始化模型参数，优化器，以及训练进度。

2. 基于checkpoint的状态信息训练模型，在训练过程中，定期保存checkpoint到 `/ml/output/checkpoints/` 路径。
   

In [None]:
!mkdir -p train_src

In [None]:
%%writefile train_src/train.py
# Additional information
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F


EPOCH = 5
CHECKPOINT_NAME = "checkpoint.pt"
LOSS = 0.4

# Define a custom mock dataset
class RandomDataset(Dataset):
    def __init__(self, num_samples=1000):
        self.num_samples = num_samples

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        x = torch.randn(10)  # Generating random input tensor
        y = torch.randint(0, 2, (1,)).item()  # Generating random target label (0 or 1)
        return x, y


# Define your model
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.fc = nn.Linear(10, 2)
    
    def forward(self, x):
        return self.fc(x)


net = MyModel()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001)
start_epoch = 0

def load_checkpoint():
    """Load checkpoint if exists."""
    global net, optimizer, start_epoch, LOSS
    checkpoint_dir = os.environ.get("PAI_OUTPUT_CHECKPOINTS")
    if not checkpoint_dir:
        return
    checkpoint_path = os.path.join(checkpoint_dir, CHECKPOINT_NAME)
    if not os.path.exists(checkpoint_path):
        return
    data = torch.load(checkpoint_path)

    net.load_state_dict(data["model_state_dict"])
    optimizer.load_state_dict(data["optimizer_state_dict"])
    start_epoch = data["epoch"]


def save_checkpoint(epoch):
    global net, optimizer, start_epoch, LOSS
    checkpoint_dir = os.environ.get("PAI_OUTPUT_CHECKPOINTS")
    if not checkpoint_dir:
        return
    checkpoint_path = os.path.join(checkpoint_dir, CHECKPOINT_NAME)
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': net.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_path)


def parse_args():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=10)
    args = parser.parse_args()
    return args


def train():
    args = parse_args()
    load_checkpoint()
    batch_size = 4
    dataloader = DataLoader(RandomDataset(), batch_size=batch_size, shuffle=True)
    num_epochs = args.epochs
    print(num_epochs)
    for epoch in range(start_epoch, num_epochs):
        net.train()
        for i, (inputs, targets) in enumerate(dataloader):
            # Forward pass
            outputs = net(inputs)
            loss = criterion(outputs, targets)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Print training progress
            if (i+1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(dataloader)}], Loss: {loss.item()}')
        
        # Save checkpoint
        save_checkpoint(epoch=epoch)
    # save the model
    torch.save(net.state_dict(), os.path.join(os.environ.get("PAI_OUTPUT_MODEL", "."), "model.pt"))
    


if __name__ == "__main__":
    train()

我们将以上的代码提交到PAI执行，训练作业最终提供挂载的OSS路径保存模型。

In [None]:
from pai.estimator import Estimator
from pai.image import retrieve


epochs = 10


# 训练作业默认会挂载一个OSS Bucket路径到 /ml/output/checkpoints/
est = Estimator(
    command="python train.py --epochs {}".format(epochs),
    source_dir="./train_src/",
    image_uri=retrieve("PyTorch", "latest").image_uri,
    instance_type="ecs.c6.large",
    base_job_name="torch_checkpoint",
)

est.fit()

  from tqdm.autonotebook import tqdm


Uploading file: /var/folders/hc/5w4bg25j1ns2mm0yb06zzzbh0000gp/T/tmpt3_0rsuf/source.tar.gz:   0%|          | 0…

View the job detail by accessing the console URI: https://pai.console.aliyun.com/?regionId=cn-hangzhou&workspaceId=58670#/training/jobs/train1u1it512gqg
TrainingJob launch starting
MAX_PARALLELISM=0
C_INCLUDE_PATH=/home/pai/include
KUBERNETES_PORT=tcp://10.192.0.1:443
KUBERNETES_SERVICE_PORT=443
LANGUAGE=en_US.UTF-8
PIP_TRUSTED_HOST=mirrors.cloud.aliyuncs.com
MASTER_ADDR=train1u1it512gqg-master-0
HOSTNAME=train1u1it512gqg-master-0
LD_LIBRARY_PATH=:/lib/x86_64-linux-gnu:/home/pai/lib:/home/pai/jre/lib/amd64/server
MASTER_PORT=23456
HOME=/root
PAI_USER_ARGS=
PYTHONUNBUFFERED=0
PAI_OUTPUT_CHECKPOINTS=/ml/output/checkpoints/
PAI_CONFIG_DIR=/ml/input/config/
WORLD_SIZE=1
REGION_ID=cn-hangzhou
CPLUS_INCLUDE_PATH=/home/pai/include
RANK=0
OPAL_PREFIX=/home/pai/
PAI_TRAINING_JOB_ID=train1u1it512gqg
TERM=xterm-color
KUBERNETES_PORT_443_TCP_ADDR=10.192.0.1
PAI_OUTPUT_MODEL=/ml/output/model/
ELASTIC_TRAINING_ENABLED=false
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/pai/

In [None]:
# 训练作业的checkpoints目录
print(est.checkpoints_data())


以上训练作业对训练数据做了10次迭代，通过使用checkpoint，我们可以在原先模型的基础上继续训练，例如使用训练数据继续迭代20次迭代。


In [None]:
from pai.estimator import Estimator
from pai.image import retrieve


# 训练数据的总迭代次数为30
epochs = 30

resume_est = Estimator(
    command="python train.py --epochs {}".format(epochs),
    source_dir="./train_src/",
    image_uri=retrieve("PyTorch", "latest").image_uri,
    instance_type="ecs.c6.large",
    # 使用上一个训练作业的checkpoints，相应的OSS Bucket路径会被挂载到 /ml/output/checkpoints 路径下
    checkpoints_path=est.checkpoints_data(),
    base_job_name="torch_resume_checkpoint",
)

resume_est.fit()

Uploading file: /var/folders/hc/5w4bg25j1ns2mm0yb06zzzbh0000gp/T/tmpshzpdx_z/source.tar.gz:   0%|          | 0…

View the job detail by accessing the console URI: https://pai.console.aliyun.com/?regionId=cn-hangzhou&workspaceId=58670#/training/jobs/trainu90lc57j1vm
TrainingJob launch starting
MAX_PARALLELISM=0
C_INCLUDE_PATH=/home/pai/include
KUBERNETES_SERVICE_PORT=443
KUBERNETES_PORT=tcp://10.192.0.1:443
LANGUAGE=en_US.UTF-8
PIP_TRUSTED_HOST=mirrors.cloud.aliyuncs.com
MASTER_ADDR=trainu90lc57j1vm-master-0
HOSTNAME=trainu90lc57j1vm-master-0
LD_LIBRARY_PATH=:/lib/x86_64-linux-gnu:/home/pai/lib:/home/pai/jre/lib/amd64/server
MASTER_PORT=23456
HOME=/root
PAI_USER_ARGS=
PYTHONUNBUFFERED=0
PAI_OUTPUT_CHECKPOINTS=/ml/output/checkpoints/
PAI_CONFIG_DIR=/ml/input/config/
WORLD_SIZE=1
REGION_ID=cn-hangzhou
CPLUS_INCLUDE_PATH=/home/pai/include
RANK=0
OPAL_PREFIX=/home/pai/
PAI_TRAINING_JOB_ID=trainu90lc57j1vm
TERM=xterm-color
KUBERNETES_PORT_443_TCP_ADDR=10.192.0.1
PAI_OUTPUT_MODEL=/ml/output/model/
ELASTIC_TRAINING_ENABLED=false
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/pai/

通过训练作业日志的，我们可以看到训练作业加载了之前训练作业的checkpoint，在此基础上，从第11个epoch开始继续训练。

## 结语

本文以`PyTorch`为示例，介绍了如何在PAI的训练作业中使用`checkpoint`：训练代码可以通过`/ml/output/checkpoints/`路径保存和加载`checkpoints`文件，`checkpoints`文件将被保存到OSS Bucket上。当用户使用其他的训练框架，例如`TensorFlow`、`HuggingFace transformers`、`ModelScope`等，也可以通过类似的方式在PAI的训练作业中使用`checkpoint`。
