<<返回python首页 python

《Python 应用案例》

使用Keras对文本进行情感分类

文本分类用电脑对文本集(或其他实体或物件)按照一定的分类体系或标准进行自动分类标记,本节我们将使用Keras库对给定的文本数据进行情感分类,本节需要您具备 相应的Python语言基础,和一定的深度学习基础。

基于CNN的文本分类

安装keras库

pip3 install keras -i https://pypi.tuna.tsinghua.edu.cn/simple/  

导入相应库

import numpy as np
import re
import itertools
from collections import Counter

建立文本处理函数

def clean_str(string):
    """
    对数据集中的字符串做清洗.
    """
    string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
    string = re.sub(r"\'s", " \'s", string)
    string = re.sub(r"\'ve", " \'ve", string)
    string = re.sub(r"n\'t", " n\'t", string)
    string = re.sub(r"\'re", " \'re", string)
    string = re.sub(r"\'d", " \'d", string)
    string = re.sub(r"\'ll", " \'ll", string)
    string = re.sub(r",", " , ", string)
    string = re.sub(r"!", " ! ", string)
    string = re.sub(r"\(", " \( ", string)
    string = re.sub(r"\)", " \) ", string)
    string = re.sub(r"\?", " \? ", string)
    string = re.sub(r"\s{2,}", " ", string)
    return string.strip().lower()


def load_data_and_labels():
    """
    从文件加载正负面的训练数据集,将数据拆分为词汇并标签。

    返回被拆分的句子和标签。
    """
    # 从文件中加载数据
    positive_examples = list(open("/share/datasets/text_cnn/rt-polarity.pos", "r", encoding='latin-1').readlines())
    positive_examples = [s.strip() for s in positive_examples]
    negative_examples = list(open("/share/datasets/text_cnn/rt-polarity.neg", "r", encoding='latin-1').readlines())
    negative_examples = [s.strip() for s in negative_examples]
    # 分拆词汇
    x_text = positive_examples + negative_examples
    x_text = [clean_str(sent) for sent in x_text]
    x_text = [s.split(" ") for s in x_text]
    # 生成标签
    positive_labels = [[0, 1] for _ in positive_examples]
    negative_labels = [[1, 0] for _ in negative_examples]
    y = np.concatenate([positive_labels, negative_labels], 0)
    return [x_text, y]


def pad_sentences(sentences, padding_word="<PAD/>"):
    """
    将所有句子padding到相同的长度。 长度由数据集中最长的句子确定。

    返回padding后的句子。
    """
    sequence_length = max(len(x) for x in sentences)
    padded_sentences = []
    for i in range(len(sentences)):
        sentence = sentences[i]
        num_padding = sequence_length - len(sentence)
        new_sentence = sentence + [padding_word] * num_padding
        padded_sentences.append(new_sentence)
    return padded_sentences


def build_vocab(sentences):
    """
    根据句子构建从词汇到索引(index)的词汇映射表。

    返回词汇映射表和反向词汇映射表。
    """
    # 构建词汇表(vocabulary)
    word_counts = Counter(itertools.chain(*sentences))
    # 从index映射到word
    vocabulary_inv = [x[0] for x in word_counts.most_common()]
    vocabulary_inv = list(sorted(vocabulary_inv))
    # 从word映射到index
    vocabulary = {x: i for i, x in enumerate(vocabulary_inv)}
    return [vocabulary, vocabulary_inv]


def build_input_data(sentences, labels, vocabulary):
    """
    基于词汇表将语句和标签映射为向量(vectors)。
    """
    x = np.array([[vocabulary[word] for word in sentence] for sentence in sentences])
    y = np.array(labels)
    return [x, y]


def load_data():
    """
    加载数据集的和做文本预处理数据。
    返回输入向量、标签、词汇映射表和反向词汇映射表。
    """
    # 载入数据集并对其进行预处理
    sentences, labels = load_data_and_labels()
    sentences_padded = pad_sentences(sentences)
    vocabulary, vocabulary_inv = build_vocab(sentences_padded)
    x, y = build_input_data(sentences_padded, labels, vocabulary)
    return [x, y, vocabulary, vocabulary_inv]

CNN_text模型部分

from keras.layers import Input, Dense, Embedding, Conv2D, MaxPool2D
from keras.layers import Reshape, Flatten, Dropout, Concatenate
from keras.callbacks import ModelCheckpoint
from keras.optimizers import Adam
from keras.models import Model
from sklearn.model_selection import train_test_split

print('载入数据集...')
x, y, vocabulary, vocabulary_inv = load_data()

# x.shape -> (10662, 56)
# y.shape -> (10662, 2)
# len(vocabulary) -> 18765
# len(vocabulary_inv) -> 18765


print('数据集加载成功!')
vocabulary
vocabulary_inv[:10]
X_train, X_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=42)
sequence_length = x.shape[1] # 56
vocabulary_size = len(vocabulary_inv) # 18765
embedding_dim = 80
filter_sizes = [1,2,3,4]
num_filters = 128
drop = 0.5

epochs = 30
batch_size = 128

创建CNN文本分类模型

# 这将返回tensor
print("创建CNN文本分类模型...")
inputs = Input(shape=(sequence_length,), dtype='int32')
embedding = Embedding(input_dim=vocabulary_size, output_dim=embedding_dim, input_length=sequence_length)(inputs)
reshape = Reshape((sequence_length,embedding_dim,1))(embedding)

conv_0 = Conv2D(num_filters, kernel_size=(filter_sizes[0], embedding_dim), padding='valid',
   kernel_initializer='normal', activation='relu')(reshape)
conv_1 = Conv2D(num_filters, kernel_size=(filter_sizes[1], embedding_dim), padding='valid', 
   kernel_initializer='normal', activation='relu')(reshape)
conv_2 = Conv2D(num_filters, kernel_size=(filter_sizes[2], embedding_dim), padding='valid',
   kernel_initializer='normal', activation='relu')(reshape)

#添加dropout层,放置过拟合
conv_0 = Dropout(drop)(conv_0)
conv_1 = Dropout(drop)(conv_1)
conv_2 = Dropout(drop)(conv_2)


maxpool_0 = MaxPool2D(pool_size=(sequence_length - filter_sizes[0] + 1, 1), strides=(1,1), padding='valid')(conv_0)
maxpool_1 = MaxPool2D(pool_size=(sequence_length - filter_sizes[1] + 1, 1), strides=(1,1), padding='valid')(conv_1)
maxpool_2 = MaxPool2D(pool_size=(sequence_length - filter_sizes[2] + 1, 1), strides=(1,1), padding='valid')(conv_2)

#将3个不同filter_size的层拼接起来
concatenated_tensor = Concatenate(axis=1)([maxpool_0, maxpool_1, maxpool_2])
#铺平层,便于喂进前馈的dense层中
flatten = Flatten()(concatenated_tensor)
dropout = Dropout(drop)(flatten)
output = Dense(units=2, activation='softmax')(dropout)

# 由此创建好模型
model = Model(inputs=inputs, outputs=output)
print("模型创建成功!")

在Keras中使用词向量的相似词查询

import tensorflow as tf
'''Of course you need to know the indices of the words. 
I assume you have a word2idx mapping, so you can get them like this: [vocabulary_inv[w] for w in pos_words].'''


def most_similar(emb_layer, pos_words = [], neg_words = [], top_n=10):
    weights = emb_layer.weights[0]
    inverse_dict = dict([val,key] for key,val in vocabulary.items())
    pos_word_idxs = [vocabulary[w] for w in pos_words]
    neg_word_idxs = [vocabulary[w] for w in neg_words]

    mean = []
    for idx in pos_word_idxs:
        mean.append(weights.value()[idx, :])

    for idx in neg_word_idxs:
        mean.append(weights.value()[idx, :] * -1)

    mean = tf.reduce_mean(mean, 0)

    dists = tf.tensordot(weights, mean, 1)
    best = tf.math.top_k(dists, top_n)

    # Mask words used as pos or neg
    mask = []

    for v in set(pos_word_idxs + neg_word_idxs):
        mask.append(tf.cast(tf.equal(best.indices, v), tf.int8))
    mask = tf.less(tf.reduce_sum(mask, 0), 1)

    idxs = tf.boolean_mask(best.indices, mask)
    vals =tf.boolean_mask(best.values, mask)


    with tf.Session() as sess:
        init = tf.global_variables_initializer()
        sess.run(init)
        idxs = sess.run(idxs)
        vals = sess.run(vals)
    return [(inverse_dict[i],j) for i,j in list(zip(idxs,vals))]
'''Some potential improvements for that function:

1、Make sure it returns top_n words (after the mask it returns less words)
2、gensim uses normalised embeddings (L2_norm)'''

most_similar(model, pos_words =['enjoy','happy'], neg_words =['sad'], top_n= 30)
#自建召回率计算函数,每一Epochs都进行F1计算
import numpy as np
from keras.callbacks import Callback,EarlyStopping
from keras.engine.training import Model
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
class Metrics(Callback):
    def on_train_begin(self, logs={}):
        self.val_f1s = []
        self.val_recalls = []
        self.val_precisions = []

    def on_epoch_end(self, epoch, logs={}):
        val_predict = (np.asarray(self.model.predict(self.validation_data[0]))).round()
        val_targ = self.validation_data[1]
        _val_f1 = f1_score(val_targ, val_predict,average='weighted')
        _val_recall = recall_score(val_targ, val_predict,average='weighted')
        _val_precision = precision_score(val_targ, val_predict,average='weighted')
        self.val_f1s.append(_val_f1)
        self.val_recalls.append(_val_recall)
        self.val_precisions.append(_val_precision)
        print( ' — val_f1: %f — val_precision: %f — val_recall %f' %(_val_f1, _val_precision, _val_recall))
        return

metrics = Metrics()
checkpoint = ModelCheckpoint('weights.{epoch:03d}-{val_acc:.4f}.hdf5', monitor='val_acc', verbose=1, save_best_only=True, mode='auto')
earlyStopping=EarlyStopping(monitor='val_acc', patience=4, verbose=1, mode='max')
callbacks_list = [checkpoint,earlyStopping,metrics]

adam = Adam(lr=1e-4, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)

开始训练模型

model.compile(optimizer=adam, loss='binary_crossentropy', metrics=['accuracy'])
print("开始训练模型...")
history=model.fit(
    X_train,
    y_train, 
    batch_size=batch_size, 
   epochs=epochs, 
   verbose=1,
   callbacks=callbacks_list, 
   validation_data=(X_test, y_test))

print('Training has completed!')

CNN损失曲线

import matplotlib.pyplot as plt
plt.switch_backend('agg')
%matplotlib inline

fig1 = plt.figure()
plt.plot(history.history['loss'],'r',linewidth=3.0)
plt.plot(history.history['val_loss'],'b',linewidth=3.0)
plt.legend(['Training loss', 'Validation Loss'],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel('Loss',fontsize=16)
plt.title('CNN损失曲线',fontsize=16)
fig1.savefig('loss_cnn.png')
plt.show()

CNN准确率曲线

fig2=plt.figure()
plt.plot(history.history['acc'],'r',linewidth=3.0)
plt.plot(history.history['val_acc'],'b',linewidth=3.0)
plt.legend(['Training Accuracy', 'Validation Accuracy'],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel('Accuracy',fontsize=16)
plt.title('CNN准确率曲线',fontsize=16)
fig2.savefig('accuracy_cnn.png')
plt.show()
移动端设备除iPad Pro外,其它移动设备仅能阅读基础的文本文字。
建议使用PC或笔记本电脑,浏览器使用Chrome或FireFox进行浏览,以开启左侧互动实验区来提升学习效率,推荐使用的分辨率为1920x1080或更高。
我们坚信最好的学习是参与其中这一理念,并致力成为中文互联网上体验更好的学练一体的IT技术学习交流平台。
您可加QQ群:575806994,一起学习交流技术,反馈网站使用中遇到问题。
内容、课程、广告等相关合作请扫描右侧二维码添加好友。

狐狸教程 Copyright 2021

进入全屏