使用Keras对文本进行情感分类
文本分类用电脑对文本集(或其他实体或物件)按照一定的分类体系或标准进行自动分类标记,本节我们将使用Keras库对给定的文本数据进行情感分类,本节需要您具备 相应的Python语言基础,和一定的深度学习基础。
安装keras库
pip3 install keras -i https://pypi.tuna.tsinghua.edu.cn/simple/
导入相应库
import numpy as np
import re
import itertools
from collections import Counter
建立文本处理函数
def clean_str(string):
"""
对数据集中的字符串做清洗.
"""
string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
string = re.sub(r"\'s", " \'s", string)
string = re.sub(r"\'ve", " \'ve", string)
string = re.sub(r"n\'t", " n\'t", string)
string = re.sub(r"\'re", " \'re", string)
string = re.sub(r"\'d", " \'d", string)
string = re.sub(r"\'ll", " \'ll", string)
string = re.sub(r",", " , ", string)
string = re.sub(r"!", " ! ", string)
string = re.sub(r"\(", " \( ", string)
string = re.sub(r"\)", " \) ", string)
string = re.sub(r"\?", " \? ", string)
string = re.sub(r"\s{2,}", " ", string)
return string.strip().lower()
def load_data_and_labels():
"""
从文件加载正负面的训练数据集,将数据拆分为词汇并标签。
返回被拆分的句子和标签。
"""
# 从文件中加载数据
positive_examples = list(open("/share/datasets/text_cnn/rt-polarity.pos", "r", encoding='latin-1').readlines())
positive_examples = [s.strip() for s in positive_examples]
negative_examples = list(open("/share/datasets/text_cnn/rt-polarity.neg", "r", encoding='latin-1').readlines())
negative_examples = [s.strip() for s in negative_examples]
# 分拆词汇
x_text = positive_examples + negative_examples
x_text = [clean_str(sent) for sent in x_text]
x_text = [s.split(" ") for s in x_text]
# 生成标签
positive_labels = [[0, 1] for _ in positive_examples]
negative_labels = [[1, 0] for _ in negative_examples]
y = np.concatenate([positive_labels, negative_labels], 0)
return [x_text, y]
def pad_sentences(sentences, padding_word="<PAD/>"):
"""
将所有句子padding到相同的长度。 长度由数据集中最长的句子确定。
返回padding后的句子。
"""
sequence_length = max(len(x) for x in sentences)
padded_sentences = []
for i in range(len(sentences)):
sentence = sentences[i]
num_padding = sequence_length - len(sentence)
new_sentence = sentence + [padding_word] * num_padding
padded_sentences.append(new_sentence)
return padded_sentences
def build_vocab(sentences):
"""
根据句子构建从词汇到索引(index)的词汇映射表。
返回词汇映射表和反向词汇映射表。
"""
# 构建词汇表(vocabulary)
word_counts = Counter(itertools.chain(*sentences))
# 从index映射到word
vocabulary_inv = [x[0] for x in word_counts.most_common()]
vocabulary_inv = list(sorted(vocabulary_inv))
# 从word映射到index
vocabulary = {x: i for i, x in enumerate(vocabulary_inv)}
return [vocabulary, vocabulary_inv]
def build_input_data(sentences, labels, vocabulary):
"""
基于词汇表将语句和标签映射为向量(vectors)。
"""
x = np.array([[vocabulary[word] for word in sentence] for sentence in sentences])
y = np.array(labels)
return [x, y]
def load_data():
"""
加载数据集的和做文本预处理数据。
返回输入向量、标签、词汇映射表和反向词汇映射表。
"""
# 载入数据集并对其进行预处理
sentences, labels = load_data_and_labels()
sentences_padded = pad_sentences(sentences)
vocabulary, vocabulary_inv = build_vocab(sentences_padded)
x, y = build_input_data(sentences_padded, labels, vocabulary)
return [x, y, vocabulary, vocabulary_inv]
CNN_text模型部分
from keras.layers import Input, Dense, Embedding, Conv2D, MaxPool2D
from keras.layers import Reshape, Flatten, Dropout, Concatenate
from keras.callbacks import ModelCheckpoint
from keras.optimizers import Adam
from keras.models import Model
from sklearn.model_selection import train_test_split
print('载入数据集...')
x, y, vocabulary, vocabulary_inv = load_data()
# x.shape -> (10662, 56)
# y.shape -> (10662, 2)
# len(vocabulary) -> 18765
# len(vocabulary_inv) -> 18765
print('数据集加载成功!')
vocabulary
vocabulary_inv[:10]
X_train, X_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=42)
sequence_length = x.shape[1] # 56
vocabulary_size = len(vocabulary_inv) # 18765
embedding_dim = 80
filter_sizes = [1,2,3,4]
num_filters = 128
drop = 0.5
epochs = 30
batch_size = 128
创建CNN文本分类模型
# 这将返回tensor
print("创建CNN文本分类模型...")
inputs = Input(shape=(sequence_length,), dtype='int32')
embedding = Embedding(input_dim=vocabulary_size, output_dim=embedding_dim, input_length=sequence_length)(inputs)
reshape = Reshape((sequence_length,embedding_dim,1))(embedding)
conv_0 = Conv2D(num_filters, kernel_size=(filter_sizes[0], embedding_dim), padding='valid',
kernel_initializer='normal', activation='relu')(reshape)
conv_1 = Conv2D(num_filters, kernel_size=(filter_sizes[1], embedding_dim), padding='valid',
kernel_initializer='normal', activation='relu')(reshape)
conv_2 = Conv2D(num_filters, kernel_size=(filter_sizes[2], embedding_dim), padding='valid',
kernel_initializer='normal', activation='relu')(reshape)
#添加dropout层,放置过拟合
conv_0 = Dropout(drop)(conv_0)
conv_1 = Dropout(drop)(conv_1)
conv_2 = Dropout(drop)(conv_2)
maxpool_0 = MaxPool2D(pool_size=(sequence_length - filter_sizes[0] + 1, 1), strides=(1,1), padding='valid')(conv_0)
maxpool_1 = MaxPool2D(pool_size=(sequence_length - filter_sizes[1] + 1, 1), strides=(1,1), padding='valid')(conv_1)
maxpool_2 = MaxPool2D(pool_size=(sequence_length - filter_sizes[2] + 1, 1), strides=(1,1), padding='valid')(conv_2)
#将3个不同filter_size的层拼接起来
concatenated_tensor = Concatenate(axis=1)([maxpool_0, maxpool_1, maxpool_2])
#铺平层,便于喂进前馈的dense层中
flatten = Flatten()(concatenated_tensor)
dropout = Dropout(drop)(flatten)
output = Dense(units=2, activation='softmax')(dropout)
# 由此创建好模型
model = Model(inputs=inputs, outputs=output)
print("模型创建成功!")
在Keras中使用词向量的相似词查询
import tensorflow as tf
'''Of course you need to know the indices of the words.
I assume you have a word2idx mapping, so you can get them like this: [vocabulary_inv[w] for w in pos_words].'''
def most_similar(emb_layer, pos_words = [], neg_words = [], top_n=10):
weights = emb_layer.weights[0]
inverse_dict = dict([val,key] for key,val in vocabulary.items())
pos_word_idxs = [vocabulary[w] for w in pos_words]
neg_word_idxs = [vocabulary[w] for w in neg_words]
mean = []
for idx in pos_word_idxs:
mean.append(weights.value()[idx, :])
for idx in neg_word_idxs:
mean.append(weights.value()[idx, :] * -1)
mean = tf.reduce_mean(mean, 0)
dists = tf.tensordot(weights, mean, 1)
best = tf.math.top_k(dists, top_n)
# Mask words used as pos or neg
mask = []
for v in set(pos_word_idxs + neg_word_idxs):
mask.append(tf.cast(tf.equal(best.indices, v), tf.int8))
mask = tf.less(tf.reduce_sum(mask, 0), 1)
idxs = tf.boolean_mask(best.indices, mask)
vals =tf.boolean_mask(best.values, mask)
with tf.Session() as sess:
init = tf.global_variables_initializer()
sess.run(init)
idxs = sess.run(idxs)
vals = sess.run(vals)
return [(inverse_dict[i],j) for i,j in list(zip(idxs,vals))]
'''Some potential improvements for that function:
1、Make sure it returns top_n words (after the mask it returns less words)
2、gensim uses normalised embeddings (L2_norm)'''
most_similar(model, pos_words =['enjoy','happy'], neg_words =['sad'], top_n= 30)
#自建召回率计算函数,每一Epochs都进行F1计算
import numpy as np
from keras.callbacks import Callback,EarlyStopping
from keras.engine.training import Model
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
class Metrics(Callback):
def on_train_begin(self, logs={}):
self.val_f1s = []
self.val_recalls = []
self.val_precisions = []
def on_epoch_end(self, epoch, logs={}):
val_predict = (np.asarray(self.model.predict(self.validation_data[0]))).round()
val_targ = self.validation_data[1]
_val_f1 = f1_score(val_targ, val_predict,average='weighted')
_val_recall = recall_score(val_targ, val_predict,average='weighted')
_val_precision = precision_score(val_targ, val_predict,average='weighted')
self.val_f1s.append(_val_f1)
self.val_recalls.append(_val_recall)
self.val_precisions.append(_val_precision)
print( ' — val_f1: %f — val_precision: %f — val_recall %f' %(_val_f1, _val_precision, _val_recall))
return
metrics = Metrics()
checkpoint = ModelCheckpoint('weights.{epoch:03d}-{val_acc:.4f}.hdf5', monitor='val_acc', verbose=1, save_best_only=True, mode='auto')
earlyStopping=EarlyStopping(monitor='val_acc', patience=4, verbose=1, mode='max')
callbacks_list = [checkpoint,earlyStopping,metrics]
adam = Adam(lr=1e-4, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
开始训练模型
model.compile(optimizer=adam, loss='binary_crossentropy', metrics=['accuracy'])
print("开始训练模型...")
history=model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
callbacks=callbacks_list,
validation_data=(X_test, y_test))
print('Training has completed!')
CNN损失曲线
import matplotlib.pyplot as plt
plt.switch_backend('agg')
%matplotlib inline
fig1 = plt.figure()
plt.plot(history.history['loss'],'r',linewidth=3.0)
plt.plot(history.history['val_loss'],'b',linewidth=3.0)
plt.legend(['Training loss', 'Validation Loss'],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel('Loss',fontsize=16)
plt.title('CNN损失曲线',fontsize=16)
fig1.savefig('loss_cnn.png')
plt.show()
CNN准确率曲线
fig2=plt.figure()
plt.plot(history.history['acc'],'r',linewidth=3.0)
plt.plot(history.history['val_acc'],'b',linewidth=3.0)
plt.legend(['Training Accuracy', 'Validation Accuracy'],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel('Accuracy',fontsize=16)
plt.title('CNN准确率曲线',fontsize=16)
fig2.savefig('accuracy_cnn.png')
plt.show()
移动端设备除iPad Pro外,其它移动设备仅能阅读基础的文本文字。
建议使用PC或笔记本电脑,浏览器使用Chrome或FireFox进行浏览,以开启左侧互动实验区来提升学习效率,推荐使用的分辨率为1920x1080或更高。
我们坚信最好的学习是参与其中这一理念,并致力成为中文互联网上体验更好的学练一体的IT技术学习交流平台。
您可加QQ群:575806994,一起学习交流技术,反馈网站使用中遇到问题。
内容、课程、广告等相关合作请扫描右侧二维码添加好友。
建议使用PC或笔记本电脑,浏览器使用Chrome或FireFox进行浏览,以开启左侧互动实验区来提升学习效率,推荐使用的分辨率为1920x1080或更高。
我们坚信最好的学习是参与其中这一理念,并致力成为中文互联网上体验更好的学练一体的IT技术学习交流平台。

您可加QQ群:575806994,一起学习交流技术,反馈网站使用中遇到问题。
内容、课程、广告等相关合作请扫描右侧二维码添加好友。
狐狸教程 Copyright 2021