,

1. NLP from Scratch

你将通过三个部分的学习来构建和训练一个用于分词的RNN神经网络,你将会学习如下知识:

  • 如何从头构建一个RNN网络
  • NLP中的数据处理方法
  • 如何训练RNN来分词

1.1 NLP From Scratch: Classifying Names with a Character-Level RNN

Character-Level RNN将单词读成一系列字符——在每个步骤输出预测和“隐藏状态”,将其之前的隐藏状态输入到每个下一步。我们将最终预测为输出,即该单词属于哪个类别,具体来说,我们将对来自18种原始语言的数千个姓氏进行训练,并根据拼写预测名字来自哪种语言。

RNNs的相关知识:LSTM网络, RNNs的一些缺点

Preparing the Data
这里下载数据,并将其提取到当前目录中。data/names目录中包含18个名为[Language].txt的文本文件。每个文件都包含一堆名字,每行一个名字,大部分是罗马化的(但我们仍然需要从Unicode转换为ASCII)。

第一步是定义和清理我们的数据。最初,我们需要将Unicode转换为纯ASCII,以限制RNN输入层。这是通过将Unicode字符串转换为ASCII并只允许一小组允许的字符来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import string
import unicodedata

# 定义允许的字符集:ASCII字母、空格、基本标点和下划线(用于表示未处理字符)
allowed_characters = string.ascii_letters + " .,;'" + "_" # 例如:'A-Za-z .,;'_'
n_letters = len(allowed_characters) # 计算允许字符的总数

def unicodeToAscii(s):
"""
将Unicode字符串转换为纯ASCII字符(过滤非允许字符并去除重音符号)
参数:
s (str): 输入的Unicode字符串
返回:
str: 处理后的ASCII字符串
"""
return ''.join(
c for c in unicodedata.normalize('NFD', s) # 对字符串进行Unicode规范化分解(NFD格式)
if unicodedata.category(c) != 'Mn' # 过滤掉组合标记(如重音符号)
and c in allowed_characters # 只保留允许字符集中的字符
)

# 你可以通过下面这个示例来测试
print (f"converting 'Ślusàrski' to {unicodeToAscii('Ślusàrski')}")

Turning Names into Tensors

现在我们已经整理好了所有的名字,我们需要将它们变成张量来使用它们。

为了表示单个字母,我们使用大小为<1 x n_letters> 的 one-hot vector。除了当前字母索引处的1外,其余填充了0,例如“b”= <0 1 0 0 0 …>。为了制作一个单词,我们将一堆单词加入到一个二维矩阵<line_length x 1 x n_letters>中。

额外的1个维度是因为PyTorch假设所有东西都是batch——我们在这里只是使用batch_size = 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 查找字母在allowed_characters中的索引位置,例如"a"返回0
def letterToIndex(letter):
"""
将单个字符转换为在allowed_characters中的索引
参数:
letter (str): 输入字符(长度必须为1)
返回:
int: 字符的索引位置(未知字符返回下划线_的位置)
"""
# 如果字符不在允许的字符集中,返回下划线_的索引(表示未知字符)
if letter not in allowed_characters:
return allowed_characters.find("_") # "_"作为OOV字符的表示
else:
return allowed_characters.find(letter) # 返回字符在allowed_characters中的位置

# 将一行文本转换为形状为<line_length x 1 x n_letters>的张量
# 即每个字符用one-hot向量表示,整个行是这些向量的序列
def lineToTensor(line):
"""
将字符串转换为三维张量(序列长度 x 1 x 字母表大小)
参数:
line (str): 输入文本行
返回:
torch.Tensor: 形状为(L,1,n_letters)的one-hot编码张量
"""
# 初始化全零张量: [序列长度, 1(批处理维度), 字母表大小]
tensor = torch.zeros(len(line), 1, n_letters)
# 遍历字符串中的每个字符
for li, letter in enumerate(line):
# 在当前字符位置对应的one-hot向量中,设置对应索引处为1
tensor[li][0][letterToIndex(letter)] = 1
return tensor

# 下面这是一个示例
print (f"The letter 'a' becomes {lineToTensor('a')}") #notice that the first position in the tensor = 1
print (f"The name 'Ahn' becomes {lineToTensor('Ahn')}") #notice 'A' sets the 27th index to 1

接下来,我们需要将所有examples合并到一个数据集中,以便我们可以训练、测试和验证我们的模型。为此,我们将使用Dataset和DataLoader来存储我们的数据集。每个数据集都需要实现三个函数:initlen__和__getitem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from io import open
import glob
import os
import time
import torch
from torch.utils.data import Dataset

class NamesDataset(Dataset):
"""
自定义数据集类,用于加载和处理名称数据
继承自torch.utils.data.Dataset
"""

def __init__(self, data_dir):
"""
初始化数据集
参数:
data_dir (str): 包含.txt文件的目录路径
"""
self.data_dir = data_dir # 存储数据目录路径(用于数据溯源)
self.load_time = time.localtime # 记录数据加载时间(用于数据溯源)

labels_set = set() # 用于存储所有唯一的类别标签

# 初始化数据存储容器
self.data = [] # 存储原始名称数据
self.data_tensors = [] # 存储名称的tensor表示
self.labels = [] # 存储原始标签
self.labels_tensors = [] # 存储标签的tensor表示

# 读取指定目录下所有.txt文件
text_files = glob.glob(os.path.join(data_dir, '*.txt'))
for filename in text_files:
# 从文件名获取类别标签(去掉扩展名)
label = os.path.splitext(os.path.basename(filename))[0]
labels_set.add(label) # 添加到类别集合

# 读取文件内容并处理每行
lines = open(filename, encoding='utf-8').read().strip().split('\n')
for name in lines:
self.data.append(name) # 存储原始名称
self.data_tensors.append(lineToTensor(name)) # 存储名称的tensor表示
self.labels.append(label) # 存储原始标签

# 缓存标签的tensor表示
self.labels_uniq = list(labels_set) # 唯一标签列表
for idx in range(len(self.labels)):
# 将标签转换为索引tensor
temp_tensor = torch.tensor([self.labels_uniq.index(self.labels[idx])], dtype=torch.long)
self.labels_tensors.append(temp_tensor)

def __len__(self):
"""返回数据集中的样本数量"""
return len(self.data)

def __getitem__(self, idx):
"""
获取指定索引的数据样本
参数:
idx (int): 样本索引
返回:
tuple: 包含4个元素的元组:
- label_tensor: 标签的tensor表示
- data_tensor: 名称的tensor表示
- data_label: 原始标签字符串
- data_item: 原始名称字符串
"""
data_item = self.data[idx] # 获取原始名称
data_label = self.labels[idx] # 获取原始标签
data_tensor = self.data_tensors[idx] # 获取名称tensor
label_tensor = self.labels_tensors[idx] # 获取标签tensor

return label_tensor, data_tensor, data_label, data_item

# 下面是一个测试的示例:
alldata = NamesDataset("data/names")
print(f"loaded {len(alldata)} items of data")
print(f"example = {alldata[0]}")

使用数据集对象使我们能够轻松地将数据拆分为训练集和测试集。在这里,我们创造了一个80/20拆分,但torch.utils.data有更有用的实用工具。在这里,我们指定了一个generator,因为我们需要使用与PyTorch相同的设备默认为上述。

1
2
3
train_set, test_set = torch.utils.data.random_split(alldata, [.85, .15], generator=torch.Generator(device=device).manual_seed(2024))

print(f"train examples = {len(train_set)}, validation examples = {len(test_set)}")

Creating the Network

这个CharRNN类实现了一个包含三个组件的RNN。首先,我们使用nn.RNN实现。接下来,我们定义了一个层,将RNN隐藏层映射到我们的输出。最后,我们应用了softmax函数。使用nn.RNN可以显著提高性能,例如cuDNN加速内核,而不是将每个层实现为nn.Linear。它还简化了forward()中的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import torch.nn as nn
import torch.nn.functional as F

class CharRNN(nn.Module):
"""
字符级RNN模型,用于处理序列数据(如名称分类)
继承自torch.nn.Module
"""

def __init__(self, input_size, hidden_size, output_size):
"""
初始化模型结构
参数:
input_size (int): 输入特征维度(字符表大小)
hidden_size (int): 隐藏层维度
output_size (int): 输出类别数
"""
super(CharRNN, self).__init__() # 调用父类初始化

# 定义RNN层: input_size → hidden_size
self.rnn = nn.RNN(input_size, hidden_size)

# 定义线性层: hidden_size → output_size
self.h2o = nn.Linear(hidden_size, output_size)

# 定义LogSoftmax层(在分类任务中更稳定)
self.softmax = nn.LogSoftmax(dim=1)

def forward(self, line_tensor):
"""
前向传播过程
参数:
line_tensor (Tensor): 输入张量,形状为(seq_len, batch=1, input_size)
返回:
Tensor: 输出预测结果,形状为(1, output_size)
"""
# RNN处理: 返回所有时间步输出和最终隐藏状态
# rnn_out形状: (seq_len, 1, hidden_size)
# hidden形状: (1, 1, hidden_size)
rnn_out, hidden = self.rnn(line_tensor)

# 取最后一个隐藏状态进行预测
# hidden[0]形状: (1, hidden_size)
output = self.h2o(hidden[0]) # 线性变换

# 应用log softmax得到对数概率
output = self.softmax(output) # 形状: (1, output_size)

return output

n_hidden = 128
rnn = CharRNN(n_letters, n_hidden, len(alldata.labels_uniq))
print(rnn)

之后,我们可以将张量传递给RNN,以获得预测的输出。随后,我们使用函数label_from_output来为类导出文本标签。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def label_from_output(output, output_labels):
"""
从模型输出中获取预测标签
参数:
output (Tensor): 模型输出张量(对数概率)
output_labels (list): 所有可能的类别标签列表
返回:
tuple: (预测标签名称, 预测标签索引)
"""
# 获取输出中概率最大的类别
top_n, top_i = output.topk(1) # topk(1)返回最大的1个值及其索引
label_i = top_i[0].item() # 提取索引值(转为Python标量)
return output_labels[label_i], label_i # 返回标签名称和索引

# 示例使用
input = lineToTensor('Albert') # 将名字转换为模型输入张量
output = rnn(input) # 获取模型预测输出(等价于rnn.forward(input))
print(output) # 打印原始输出(对数概率)
print(label_from_output(output,alldata.labels_uniq)) # 打印(预测标签名称, 标签索引)

Training
现在,训练这个网络只需要给它看examples,让它做出猜测,如果它错了,就告诉它。

我们通过定义一个train()函数来做到这一点,该函数使用minibatches在给定数据集中训练模型。RNNs的训练方式与其他网络相似;因此,为了完整性,我们在这里包括了一个批量训练方法。循环(用于批次中的i)在调整权重之前计算批次中每个项目的损失。重复此操作,直到达到epochs数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import random
import numpy as np

def train(rnn, training_data, n_epoch = 10, n_batch_size = 64, report_every = 50, learning_rate = 0.2, criterion = nn.NLLLoss()):
"""
Learn on a batch of training_data for a specified number of iterations and reporting thresholds
"""
# Keep track of losses for plotting
current_loss = 0
all_losses = []
rnn.train()
optimizer = torch.optim.SGD(rnn.parameters(), lr=learning_rate)

start = time.time()
print(f"training on data set with n = {len(training_data)}")

for iter in range(1, n_epoch + 1):
rnn.zero_grad() # clear the gradients

# create some minibatches
# we cannot use dataloaders because each of our names is a different length
batches = list(range(len(training_data)))
random.shuffle(batches)
batches = np.array_split(batches, len(batches) //n_batch_size )

for idx, batch in enumerate(batches):
batch_loss = 0
for i in batch: #for each example in this batch
(label_tensor, text_tensor, label, text) = training_data[i]
output = rnn.forward(text_tensor)
loss = criterion(output, label_tensor)
batch_loss += loss

# optimize parameters
batch_loss.backward()
nn.utils.clip_grad_norm_(rnn.parameters(), 3)
optimizer.step()
optimizer.zero_grad()

current_loss += batch_loss.item() / len(batch)

all_losses.append(current_loss / len(batches) )
if iter % report_every == 0:
print(f"{iter} ({iter / n_epoch:.0%}): \t average batch loss = {all_losses[-1]}")
current_loss = 0

return all_losses

我们现在来仔细分析一下训练的过程
1. 初始化阶段

1
2
3
4
current_loss = 0           # 累计当前轮次的损失
all_losses = [] # 存储每轮平均损失
rnn.train() # 设置模型为训练模式(启用Dropout等)
optimizer = SGD(rnn.parameters(), lr=0.2) # 定义优化器

2. 外层循环(Epoch)

1
2
for iter in range(1, n_epoch + 1):  # 遍历每一轮训练
rnn.zero_grad() # 清空上一轮的梯度
  • 作用:模型需要多轮学习才能收敛,每一轮(Epoch)会完整遍历所有数据一次。
  • 关键点:每轮开始时必须清空梯度(否则梯度会累加导致错误)。

3. 数据准备,创建随机批次(batches)

1
2
3
batches = list(range(len(training_data)))  # 生成索引列表 [0,1,2,...]
random.shuffle(batches) # 打乱索引顺序
batches = np.array_split(batches, len(batches)//n_batch_size) # 分割成小批次
  • 为什么手动分批次?
    • 因为文本长度不同,无法直接用PyTorch的DataLoader合并成固定大小的张量。
  • 示例:
    • 如果有1000个样本,n_batch_size=64 → 分成15个批次(前14个含64个样本,最后1个含40个样本)。

4. 内部循环,处理每个批次batch

1
2
3
4
5
6
7
8
9
for idx, batch in enumerate(batches):  # 遍历每个小批次
batch_loss = 0 # 初始化当前批次损失
for i in batch: # 遍历批次内的每个样本
# 获取数据
label_tensor, text_tensor, _, _ = training_data[i]
# 前向传播
output = rnn(text_tensor) # 模型预测
loss = criterion(output, label_tensor) # 计算损失
batch_loss += loss # 累加损失值
  • 关键操作:
    • 对批次内每个样本单独计算损失(因为长度不同,无法向量化处理)
    • batch_loss是所有样本损失的总和

5. 梯度计算与参数更新

1
2
3
4
batch_loss.backward()                  # 反向传播计算梯度
nn.utils.clip_grad_norm_(rnn.parameters(), 3) # 梯度裁剪(防爆炸)
optimizer.step() # 更新模型参数
optimizer.zero_grad() # 清空当前梯度
  • 梯度裁剪:
    • 限制梯度最大值,避免RNN中常见的梯度爆炸问题(这里限制为3)。
  • 参数更新:
    • optimizer.step()根据梯度调整模型参数。

6. 损失记录与日志

1
2
3
4
current_loss += batch_loss.item() / len(batch)  # 计算平均批次损失

if iter % report_every == 0:
print(f"轮次 {iter}: 平均损失 = {all_losses[-1]:.4f}")
  • 损失计算逻辑:
    batch_loss.item() / len(batch) → 当前批次的平均样本损失
    current_loss / len(batches) → 当前轮次的平均批次损失
  • 日志输出:
    每隔report_every轮打印一次进度(如每50轮)。

可以用matplotlib绘制训练的曲线

1
2
3
4
5
6
7
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

plt.figure()
plt.plot(all_losses)
plt.show()`
plt.plot(all_losses)

Evaluating the Results
我们可以通过创建评估函数(evaluate)来判断模型的能力,通过一个热力图,可视化来查看我们的模型预测的结果情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def evaluate(rnn, testing_data, classes):
"""
评估模型性能并生成混淆矩阵

参数:
rnn: 训练好的RNN模型
testing_data: 测试数据集,包含(标签张量, 文本张量, 标签名, 文本)的元组
classes: 所有类别标签的列表
"""
# 初始化混淆矩阵(类别数×类别数)
confusion = torch.zeros(len(classes), len(classes))

# 设置模型为评估模式(关闭Dropout等训练专用层)
rnn.eval()

# 禁用梯度计算(加速评估并减少内存占用)
with torch.no_grad():
# 遍历测试集中的每个样本
for i in range(len(testing_data)):
# 解包测试数据
label_tensor, text_tensor, label, text = testing_data[i]

# 模型预测
output = rnn(text_tensor)

# 从输出获取预测结果(返回标签名和索引)
guess, guess_i = label_from_output(output, classes)

# 获取真实标签的索引
label_i = classes.index(label)

# 在混淆矩阵中对应位置+1
confusion[label_i][guess_i] += 1

# 标准化混淆矩阵(每行除以其总和,得到百分比)
for i in range(len(classes)):
denom = confusion[i].sum() # 计算当前类别的总样本数
if denom > 0:
confusion[i] = confusion[i] / denom # 转换为比例

# ----------------- 可视化部分 -----------------
# 创建图形和坐标轴
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(111)

# 绘制混淆矩阵热力图(需要将Tensor转CPU再转numpy)
cax = ax.matshow(confusion.cpu().numpy(), cmap='Blues')
fig.colorbar(cax) # 添加颜色条

# 设置坐标轴
ax.set_xticks(np.arange(len(classes))) # X轴刻度位置
ax.set_xticklabels(classes, rotation=90) # X轴标签(旋转90度防重叠)
ax.set_yticks(np.arange(len(classes))) # Y轴刻度位置
ax.set_yticklabels(classes) # Y轴标签

# 强制每个刻度显示标签
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

# 添加标题和标签
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted Label")
ax.set_ylabel("True Label")

plt.tight_layout() # 自动调整布局
plt.show()

# 调用评估函数
evaluate(rnn, test_set, classes=alldata.labels_uniq)

More

  • 用更大或者结构更好的模型来训练从而得到更好的结果
    1.调整不同的超参数
    2.尝试LSTM和GRU网络
    3.调整网络大小,增加或者减少隐藏层的节点
    4.将多个RNN结合组成一个更高级的网络

  • 尝试不同的数据集

  1. any word -> language
  2. First name -> gender
  3. Character name -> writer
  4. Page title -> blog or subreddit