在自然语言处理(NLP)任务中,词汇表(vocab)是模型理解文本的基础。一个良好的词汇表可以显著提升模型的性能和泛化能力。
词汇表的修改
加载预训练模型和分词器
首先加载预训练的BERT模型及其对应的分词器(`BertTokenizer`)。以下是代码示例:
from transformers import BertTokenizer
# 加载预训练的分词器
token = BertTokenizer.from_pretrained("/path/to/pretrained/model")
检查现有词汇表
在修改词汇表之前,我们可以先检查现有的词汇表,确保了解当前的词汇情况。可以通过以下代码查看特定词语是否存在于词汇表中:
vocab = token.vocab
print("阳" in vocab) # True
print("光" in vocab) # True
print("阳光" in vocab) # False
添加新词汇
如果发现某些常用词汇不在词汇表中,可以通过`add_tokens`方法将其添加到词汇表中。例如,添加“阳光”这个词:
# 添加新词汇
token.add_tokens(new_tokens=["阳光"])
# 更新词汇表
vocab = token.get_vocab()
print(len(vocab)) # 输出新的词汇表大小
print("阳光" in vocab) # True
验证修改效果
为了验证词汇表修改的效果,可以对包含新词汇的句子进行编码和解码操作:
out = token.batch_encode_plus(
batch_text_or_text_pairs=["阳光洒在大地上"],
add_special_tokens=True,
truncation=True,
padding="max_length",
max_length=20,
return_tensors=None
)
print(token.decode(out["input_ids"][0]))
```
[CLS] 阳光 洒 在 大 地 上 [SEP] [PAD] [PAD] ...
```
基于修改后词汇表的模型训练
加载数据集
from torch.utils.data import Dataset
from datasets import load_dataset
class MyDataset(Dataset):
def __init__(self,split):
#从磁盘加载csv数据
self.dataset = load_dataset(path="csv",data_files=f"data/{split}.csv",split="train")
def __len__(self):
return len(self.dataset)
def __getitem__(self, item):
text = self.dataset[item]["text"]
label = self.dataset[item]["label"]
return text,label
初始化模型
from transformers import BertModel, BertConfig # 导入Bert模型和Bert配置类
import torch # 导入PyTorch库
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 设置设备为CUDA(如果可用),否则为CPU
# pretrained = BertModel.from_pretrained(r"D:\PycharmProjects\demo_15_01\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f").to(DEVICE)
# pretrained.embeddings.position_embeddings = torch.nn.Embedding(1500,768).to(DEVICE)
# print(pretrained)
# config = pretrained.config
configuration = BertConfig.from_pretrained(r"D:\PycharmProjects\demo_15_02\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f") # 从预训练模型加载配置
configuration.max_position_embeddings = 1500 # 设置最大位置嵌入为1500
print(configuration) # 打印配置信息
# 初始化模型
pretrained = BertModel(configuration).to(DEVICE) # 使用配置初始化Bert模型并移动到指定设备
print(pretrained.embeddings.position_embeddings) # 打印模型的位置嵌入
print(pretrained) # 打印整个模型
# 定义下游任务模型(将主干网络所提取的特征分类)
class Model(torch.nn.Module): # 定义一个PyTorch模型
def __init__(self):
super().__init__() # 调用父类构造函数
self.fc = torch.nn.Linear(768, 10) # 定义一个全连接层,输入768维,输出10维
# def forward(self,input_ids,attention_mask,token_type_ids):
# # with torch.no_grad():
# # out = pretrained(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids)
# out = pretrained(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# out = self.fc(out.last_hidden_state[:,0])
# out = out.softmax(dim=1)
# return out
# 调整模型的前向计算,让embeddings部分参与到模型的训练过程(修改了embeddings)
def forward(self, input_ids, attention_mask, token_type_ids): # 定义模型的前向传播
# 让embeddings参与训练
embeddings_output = pretrained.embeddings(input_ids=input_ids) # 获取嵌入层的输出
attention_mask = attention_mask.to(torch.float) # 将注意力掩码转换为浮点类型
# 将数据形状转换为[N,1,1,sequence_length]使其匹配attention层的输入形状
attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # 增加两个维度
attention_mask = attention_mask.to(embeddings_output.dtype) # 将注意力掩码的数据类型与嵌入层输出对齐
# 冻结encoder和pooler,使用torch.no_grade()节省显存
with torch.no_grad(): # 在不计算梯度的情况下执行
encoder_output = pretrained.encoder(embeddings_output, attention_mask=attention_mask) # 获取编码器的输出
out = self.fc(encoder_output.last_hidden_state[:, 0]) # 使用全连接层处理编码器的最后一个隐藏状态
return out # 返回输出
训练模型
使用PyTorch的训练循环进行模型训练。可以利用`transformers`库中的`Trainer`类简化训练过程:
#模型训练
import torch
from MyData import MyDataset
from torch.utils.data import DataLoader
from net import Model
from transformers import BertTokenizer,AdamW
#定义设备信息
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#定义训练的轮次
EPOCH= 30000
token = BertTokenizer.from_pretrained(r"D:\PycharmProjects\disanqi\demo_5\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f")
def collate_fn(data):
sents = [i[0]for i in data]
label = [i[1] for i in data]
#编码
data = token.batch_encode_plus(
batch_text_or_text_pairs=sents,
truncation=True,
max_length=500,
padding="max_length",
return_tensors="pt",
return_length=True
)
input_ids = data["input_ids"]
attention_mask = data["attention_mask"]
token_type_ids = data["token_type_ids"]
labels = torch.LongTensor(label)
return input_ids,attention_mask,token_type_ids,labels
#创建数据集
train_dataset = MyDataset("train")
train_loader = DataLoader(
dataset=train_dataset,
batch_size=100,
shuffle=True,
#舍弃最后一个批次的数据,防止形状出错
drop_last=True,
#对加载进来的数据进行编码
collate_fn=collate_fn
)
val_dataset = MyDataset("validation")
val_loader = DataLoader(
dataset=val_dataset,
batch_size=100,
shuffle=True,
#舍弃最后一个批次的数据,防止形状出错
drop_last=True,
#对加载进来的数据进行编码
collate_fn=collate_fn
)
if __name__ == '__main__':
#开始训练
print(DEVICE)
model = Model().to(DEVICE)
#定义优化器
optimizer = AdamW(model.parameters())
#定义损失函数
loss_func = torch.nn.CrossEntropyLoss()
#初始化最佳验证准确率
best_val_acc = 0.0
for epoch in range(EPOCH):
for i,(input_ids,attention_mask,token_type_ids,labels) in enumerate(train_loader):
#将数据存放到DEVICE上
input_ids, attention_mask, token_type_ids, labels = input_ids.to(DEVICE),attention_mask.to(DEVICE),token_type_ids.to(DEVICE),labels.to(DEVICE)
#前向计算(将数据输入模型,得到输出)
out = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids)
#根据输出,计算损失
loss = loss_func(out,labels)
#根据损失,优化参数
optimizer.zero_grad()
loss.backward()
optimizer.step()
#每隔5个批次输出训练信息
if i%5==0:
out = out.argmax(dim=1)
acc = (out==labels).sum().item()/len(labels)
print(f"epoch:{epoch},i:{i},loss:{loss.item()},acc:{acc}")
#验证模型(判断是否过拟合)
#设置为评估模式
model.eval()
#不需要模型参与训练
with torch.no_grad():
val_acc = 0.0
val_loss = 0.0
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(val_loader):
# 将数据存放到DEVICE上
input_ids, attention_mask, token_type_ids, labels = input_ids.to(DEVICE), attention_mask.to(
DEVICE), token_type_ids.to(DEVICE), labels.to(DEVICE)
# 前向计算(将数据输入模型,得到输出)
out = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# 根据输出,计算损失
val_loss += loss_func(out, labels)
out = out.argmax(dim=1)
val_acc+=(out==labels).sum().item()
val_loss /= len(val_loader)
val_acc /= len(val_loader)
print(f"验证集:loss:{val_loss},acc:{val_acc}")
#根据验证准确率保存最优参数
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(),"params/best_bert.pth")
print(f"Epoch:{epoch}:保存最优参数:acc:{best_val_acc}")
#保存最后一轮参数
torch.save(model.state_dict(),f"params/last_bert.pth")
print(epoch,f"Epcot:{epoch}最后一轮参数保存成功!")
评估模型
训练完成后,可以使用测试集评估模型性能:
import torch
from MyData import MyDataset
from torch.utils.data import DataLoader
from net import Model
from transformers import BertTokenizer
#定义设备信息
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
token = BertTokenizer.from_pretrained(r"D:\PycharmProjects\disanqi\demo_5\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f")
def collate_fn(data):
sents = [i[0]for i in data]
label = [i[1] for i in data]
#编码
data = token.batch_encode_plus(
batch_text_or_text_pairs=sents,
truncation=True,
max_length=500,
padding="max_length",
return_tensors="pt",
return_length=True
)
input_ids = data["input_ids"]
attention_mask = data["attention_mask"]
token_type_ids = data["token_type_ids"]
labels = torch.LongTensor(label)
return input_ids,attention_mask,token_type_ids,labels
#创建数据集
test_dataset = MyDataset("test")
test_loader = DataLoader(
dataset=test_dataset,
batch_size=100,
shuffle=True,
#舍弃最后一个批次的数据,防止形状出错
drop_last=True,
#对加载进来的数据进行编码
collate_fn=collate_fn
)
if __name__ == '__main__':
acc = 0.0
total = 0
#开始测试
print(DEVICE)
model = Model().to(DEVICE)
#加载训练参数
model.load_state_dict(torch.load("params/3_bert.pth"))
#开启测试模型
model.eval()
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(test_loader):
# 将数据存放到DEVICE上
input_ids, attention_mask, token_type_ids, labels = input_ids.to(DEVICE), attention_mask.to(
DEVICE), token_type_ids.to(DEVICE), labels.to(DEVICE)
# 前向计算(将数据输入模型,得到输出)
out = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
out = out.argmax(dim=1)
acc += (out==labels).sum().item()
print(i,(out==labels).sum().item())
total+=len(labels)
print(f"test_acc:{acc/total}")