Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

8.2 随机梯度法

这个 notebook 把第8章-部分2中的图像实验整理成独立版本。目标不是做大规模机器学习,而是先在一个一维非凸函数上看清楚随机梯度的基本行为,再回到 Logistic 回归中观察 mini-batch 训练:

  1. 随机梯度怎样成为完整梯度的无偏估计;

  2. 噪声为什么会让轨迹不平滑;

  3. mini-batch 如何通过平均多个随机梯度降低噪声;

  4. batch size 如何影响噪声大小和示例中的探索行为;

  5. Logistic 回归中 full-batch 与 mini-batch 训练有什么差别。

0. 准备环境

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset

SEED = 6
torch.manual_seed(SEED)

plt.rcParams['figure.figsize'] = (6, 3)
plt.rcParams['axes.grid'] = True

1. 一个有限和形式的非凸目标

f(θ)=12f1(θ)+12f2(θ),f1(θ)=2sin(θ),f2(θ)=(0.2)θ2.f(\theta)=\frac{1}{2} f_1(\theta)+\frac{1}{2} f_2(\theta), \qquad f_1(\theta)=2\sin(\theta), \qquad f_2(\theta)=(0.2)\cdot\theta^2.

于是

f(θ)=sin(θ)+(0.1)θ2.f(\theta)=\sin(\theta)+(0.1)\cdot\theta^2.

如果均匀随机选择 i{1,2}i\in\{1,2\},则 fi(θ)\nabla f_i(\theta) 的条件期望正好等于完整梯度 f(θ)\nabla f(\theta)

c = 0.1

def f1(theta):
    return 2 * np.sin(theta)

def f2(theta):
    return 2 * c * theta ** 2

def objective(theta):
    return 0.5 * (f1(theta) + f2(theta))

def grad_f1(theta):
    return 2 * np.cos(theta)

def grad_f2(theta):
    return 4 * c * theta

def grad_full(theta):
    return 0.5 * (grad_f1(theta) + grad_f2(theta))

def sample_gradient(theta, rng, batch_size=1):
    labels = rng.integers(0, 2, size=batch_size)
    gradients = np.where(labels == 0, grad_f1(theta), grad_f2(theta))
    return gradients.mean(), labels

theta_grid = np.linspace(-8, 8, 800)
plt.plot(theta_grid, objective(theta_grid), label=r'$f(\theta)$')
plt.plot(theta_grid, 0.5 * f1(theta_grid), '--', label=r'$\frac{1}{2} f_1(\theta)$')
plt.plot(theta_grid, 0.5 * f2(theta_grid), '--', label=r'$\frac{1}{2} f_2(\theta)$')
plt.xlabel(r'$\theta$')
plt.ylabel('value')
plt.legend()
plt.show()
<Figure size 600x300 with 1 Axes>

2. 检查无偏性

下面固定一个 θ\theta,重复抽样很多次。样本平均值应接近完整梯度。

theta_demo = 2.0
rng = np.random.default_rng(SEED)
samples = np.array([sample_gradient(theta_demo, rng)[0] for _ in range(20_000)])

print('完整梯度:', round(float(grad_full(theta_demo)), 4))
print('随机梯度样本均值:', round(float(samples.mean()), 4))
print('随机梯度样本标准差:', round(float(samples.std()), 4))
完整梯度: -0.0161
随机梯度样本均值: -0.0187
随机梯度样本标准差: 0.8161

3. 比较 GD 与 SGD 的轨迹

完整梯度下降每步使用 f(θ)\nabla f(\theta)。SGD 每步只随机选择一个分量梯度。下面的随机种子会展示一种典型现象:SGD 轨迹更抖,并且在这个例子中有机会离开右侧局部区域。这个现象有助于建立直觉,但不是一般收敛定理的保证。

def run_full_gradient(theta0, steps, lr):
    path = [theta0]
    theta = theta0
    for _ in range(steps):
        theta = theta - lr * grad_full(theta)
        path.append(theta)
    return np.array(path)


def run_stochastic_gradient(theta0, steps, lr, batch_size=1, seed=SEED):
    rng = np.random.default_rng(seed)
    path = [theta0]
    chosen_labels = []
    theta = theta0
    for _ in range(steps):
        gradient, labels = sample_gradient(theta, rng, batch_size=batch_size)
        theta = theta - lr * gradient
        path.append(theta)
        chosen_labels.extend(labels.tolist())
    return np.array(path), np.array(chosen_labels)

steps = 2_000
learning_rate = 0.15
theta0 = 5.0

gd_path = run_full_gradient(theta0, steps, learning_rate)
sgd_path, sgd_labels = run_stochastic_gradient(theta0, steps, learning_rate, seed=SEED)

gd_loss = objective(gd_path)
sgd_loss = objective(sgd_path)

print(f'GD final theta={gd_path[-1]:.3f}, loss={gd_loss[-1]:.3f}')
print(f'SGD final theta={sgd_path[-1]:.3f}, loss={sgd_loss[-1]:.3f}')
print(f'SGD chose component 2 about {sgd_labels.mean():.3f} of the time')
GD final theta=3.837, loss=0.832
SGD final theta=-0.999, loss=-0.741
SGD chose component 2 about 0.504 of the time
fig, axes = plt.subplots(1, 2, figsize=(10, 3.5))

axes[0].plot(gd_path, label='GD')
axes[0].plot(sgd_path, label='SGD', alpha=0.8)
axes[0].set_xlabel('iteration')
axes[0].set_ylabel(r'$\theta^{(k)}$')
axes[0].legend()

axes[1].plot(gd_loss, label='GD')
axes[1].plot(sgd_loss, label='SGD', alpha=0.8)
axes[1].set_xlabel('iteration')
axes[1].set_ylabel(r'$f(\theta^{(k)})$')
axes[1].legend()

plt.tight_layout()
plt.show()
<Figure size 1000x350 with 2 Axes>
best_index = np.argmin(objective(theta_grid))
best_theta = theta_grid[best_index]
best_loss = objective(best_theta)

plt.figure(figsize=(7, 3.5))
plt.plot(theta_grid, objective(theta_grid), label=r'$f(\theta)$')
plt.scatter(best_theta, best_loss, label='grid minimum', s=70)
plt.scatter(gd_path[-1], gd_loss[-1], label='GD final', marker='s', s=70)
plt.scatter(sgd_path[-1], sgd_loss[-1], label='SGD final', marker='^', s=70)
plt.xlabel(r'$\theta$')
plt.ylabel('loss')
plt.legend()
plt.show()
<Figure size 700x350 with 1 Axes>

4. Mini-batch SGD:batch size 与噪声

现在正式引入 mini-batch SGD。这里的目标只有两个分量,所以 batch size 大于 1 时可以理解为“有放回地多抽几次再平均”。batch size 越大,单次随机梯度的方差越小,方向通常越接近完整梯度;但每次更新也更贵,而且在非凸问题中不保证最终函数值一定更低。

part4_iterations = 7_000
learning_rate = 0.15
seed = 42

batch_runs = {
    'batch=1': \
        run_stochastic_gradient(theta0, part4_iterations, learning_rate, batch_size=1, seed=seed)[0],
    'batch=2': \
        run_stochastic_gradient(theta0, part4_iterations, learning_rate, batch_size=2, seed=seed)[0],
    'batch=4': \
        run_stochastic_gradient(theta0, part4_iterations, learning_rate, batch_size=4, seed=seed)[0],
    'full GD': \
        run_full_gradient(theta0, part4_iterations, learning_rate),
}

plt.figure(figsize=(7, 3.5))
for name, path in batch_runs.items():
    plt.plot(objective(path), label=name)
plt.xlabel('iteration')
plt.ylabel('loss')
plt.legend()
plt.show()

for name, path in batch_runs.items():
    print(f'{name:8s}: final theta={path[-1]: .3f}, final loss={objective(path[-1]): .3f}')
<Figure size 700x350 with 1 Axes>
batch=1 : final theta=-1.305, final loss=-0.795
batch=2 : final theta=-1.046, final loss=-0.756
batch=4 : final theta= 4.095, final loss= 0.862
full GD : final theta= 3.837, final loss= 0.832

5. Logistic 回归:mini-batch 训练

第8.1节中我们用 full-batch 训练 Logistic 回归。现在只改变 DataLoaderbatch_size:full-batch 每个 epoch 更新一次,mini-batch 每个 epoch 会更新多次。因此按 epoch 画图时,mini-batch 曲线同时反映了“更多更新次数”和“随机梯度噪声”两个因素。训练集和测试集分开评估,测试准确率只在未参与训练的测试集上计算。

def make_binary_data(n_per_class, noise_count=0, seed=42, class_std=0.9):
    generator = torch.Generator().manual_seed(seed)
    class_0 = torch.randn(n_per_class, 2, generator=generator) * class_std + torch.tensor([-1.0, -1.0])
    class_1 = torch.randn(n_per_class, 2, generator=generator) * class_std + torch.tensor([1.0, 1.0])

    X = torch.cat([class_0, class_1])
    y = torch.cat([torch.zeros(n_per_class), torch.ones(n_per_class)])

    if noise_count > 0:
        noisy_index = torch.randperm(len(y), generator=generator)[:noise_count]
        y[noisy_index] = 1 - y[noisy_index]

    shuffle_index = torch.randperm(len(y), generator=generator)
    return X[shuffle_index], y[shuffle_index]


X_train, y_train = make_binary_data(200, noise_count=10, seed=42)
X_test, y_test = make_binary_data(100, noise_count=5, seed=43)
train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)
train_eval_loader = DataLoader(train_data, batch_size=len(train_data), shuffle=False)
test_loader = DataLoader(test_data, batch_size=len(test_data), shuffle=False)

criterion = nn.BCEWithLogitsLoss()

def predict_logits(X, a, b):
    return X @ a + b

def evaluate_logistic(data_loader, a, b):
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            logits = predict_logits(X_batch, a, b)
            loss = criterion(logits, y_batch)
            predictions = (torch.sigmoid(logits) >= 0.5).float()

            total_loss += loss.item() * len(y_batch)
            total_correct += (predictions == y_batch).sum().item()
            total_count += len(y_batch)

    return total_loss / total_count, total_correct / total_count


def train_logistic(batch_size, epochs=100, learning_rate=0.02, seed=SEED):
    generator = torch.Generator().manual_seed(seed)
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, generator=generator)

    a = nn.Parameter(torch.tensor([-1.0, -1.0]))
    b = nn.Parameter(torch.zeros(()))
    optimizer = torch.optim.SGD([a, b], lr=learning_rate)

    train_losses = []
    train_accuracies = []
    test_accuracies = []

    train_loss, train_accuracy = evaluate_logistic(train_eval_loader, a, b)
    _, test_accuracy = evaluate_logistic(test_loader, a, b)
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)

    for _ in range(epochs):
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            loss = criterion(predict_logits(X_batch, a, b), y_batch)
            loss.backward()
            optimizer.step()

        train_loss, train_accuracy = evaluate_logistic(train_eval_loader, a, b)
        _, test_accuracy = evaluate_logistic(test_loader, a, b)
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        test_accuracies.append(test_accuracy)

    return train_losses, train_accuracies, test_accuracies


batch_sizes = {
    "full-batch": len(train_data),
    "batch=100": 100,
    "batch=50": 50,
}

logistic_runs = {}
for name, batch_size in batch_sizes.items():
    train_losses, train_accuracies, test_accuracies = train_logistic(batch_size=batch_size)
    logistic_runs[name] = {
        "train_loss": train_losses,
        "train_accuracy": train_accuracies,
        "test_accuracy": test_accuracies,
    }
    print(
        f"{name:10s}: initial test accuracy={test_accuracies[0]:.1%}, "
        f"final train accuracy={train_accuracies[-1]:.1%}, "
        f"final test accuracy={test_accuracies[-1]:.1%}"
    )

fig, axes = plt.subplots(1, 3, figsize=(12, 3))

for name, result in logistic_runs.items():
    axes[0].plot(result["train_loss"], label=name)
    axes[1].plot(result["train_accuracy"], label=name)
    axes[2].plot(result["test_accuracy"], label=name)

axes[0].set_title("Training loss")
axes[0].set_xlabel("epoch")
axes[0].set_ylabel("BCE loss")

axes[1].set_title("Training accuracy")
axes[1].set_xlabel("epoch")
axes[1].set_ylabel("accuracy")
axes[1].set_ylim(0, 1.05)

axes[2].set_title("Test accuracy")
axes[2].set_xlabel("epoch")
axes[2].set_ylabel("accuracy")
axes[2].set_ylim(0, 1.05)

for ax in axes:
    ax.legend()

plt.tight_layout()
plt.show()
full-batch: initial test accuracy=7.5%, final train accuracy=91.5%, final test accuracy=91.0%
batch=100 : initial test accuracy=7.5%, final train accuracy=91.5%, final test accuracy=91.5%
batch=50  : initial test accuracy=7.5%, final train accuracy=91.5%, final test accuracy=91.5%
<Figure size 1200x300 with 3 Axes>

6. 小结

  1. 随机梯度不是随便乱走,而是在无偏性假设下近似完整梯度;

  2. SGD 的轨迹通常更抖,这正是随机梯度噪声的表现;

  3. mini-batch SGD 通过平均多个样本梯度降低噪声;

  4. 增大 batch size 会降低噪声,但也会提高单次更新成本;

  5. Logistic 回归中,减小 batch size 会让每个 epoch 内有更多次参数更新;

  6. 非凸问题中,噪声有时帮助探索,但不保证一定找到全局最优解。