LSTM 기반 텍스트 분류

​# 모듈 import

import numpy as np

import pandas as pd

import tensorflow as tf

from sklearn.preprocessing import LabelEncoder

from sklearn.model_selection import train_test_split

from tensorflow.keras.preprocessing.text import Tokenizer

from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.layers import Dense, Flatten, Conv1D, MaxPool2D

from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, SimpleRNN, GRU

from tensorflow.keras.models import Sequential

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

import copy

# Load data

df0 = pd.read_csv('~~~.csv' ) # csv파일 경로 읽어오기

print(df0.info())

 

#데이터 예시

#   감정    문장

#   분노    일은 왜 해도 해도 끝이 없을까? 화가 난다.

#   분노    이번 달에 또 급여가 깎였어! 물가는 오르는데 월급만 자꾸 깎이니까 너무 화가 나.

 분노    회사에 신입이 들어왔는데 말투가 거슬려. 그런 애를 매일 봐야 한다고 생각하니까 스...

#   분노    직장에서 막내라는 이유로 나에게만 온갖 심부름을 시켜. 일도 많은 데 정말 분하고 ...

#   분노    얼마 전 입사한 신입사원이 나를 무시하는 것 같아서 너무 화가 나.

 

# 데이터 전처리

df = copy.copy(df0)

# "문장" 열에서 숫자, 영문자, 특수 문자등은 없는 것을 확인

number_abnormal = df['문장'][df['문장'].str.contains('[^가-힣 ]')].sum()

print(number_abnormal)

# Null, 중복 제거

# '문장' 열에서 양 끝의 빈공간 삭제

df['문장'] = df['문장'].str.strip()

# 결측값이 있는 행을 제거

df = df.dropna()

# "문장"열에서 중복 데이터를 삭제

df.drop_duplicates(subset=['문장'], inplace=True)

# 데이터 시각화

# label 분포 확인

dist_label = df['감정'].value_counts()

print(dist_label)

# "감정" 열(Label)기준으로 plot Bar차트 그리기

dist_label.plot(kind='bar')

# Label 인코딩

# 감정 리스트

list1 = df['감정'].value_counts().index.values

list1

# 라벨와 클래스을 매핑 작업

label2class = {}

class2label = {}

for cl, la in enumerate(list1):

# print(i, j)

label2class[la] = cl

class2label[cl] = la

print(label2class)

print(class2label)

df['label'] = df['감정'].map(label2class)

df.tail()

## Label과 Class를 Mapping

#encoder = LabelEncoder()

#df['감정'] = encoder.fit_transform(df['감정'])

#encoder.classes_

# X, y 분리

features = df['문장'].values

labels = df['감정'].values

print(f"feature 차원: {features.shape} \n label 차원: {labels.shape} \n")

print('이벤트 문자열 최대 길이 :{}'.format(max(len(l) for l in features)))

print('이벤트 문자열 평균 길이 :{}'.format(sum(map(len, features))/len(features)))

# Train, Test 데이터 셋 분리

x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, stratify=labels, random_state=41)

# Tokenizing

tokenizer = Tokenizer()

tokenizer.fit_on_texts(x_train)

max_words = len(tokenizer.index_word)

print(max_words) # 총 단어 갯수 확인

# texts_to_sequences

x_train_seq = tokenizer.texts_to_sequences(x_train)

x_test_seq = tokenizer.texts_to_sequences(x_test)

# 문장을 숫자로 변경후 갯수 확인

# x_train.shape, x_test.shape, y_train.shape, y_test.shape : ((41259,), (10315,), (41259,), (10315,))

print(len(x_train_seq), len(x_test_seq))

# Padding Sequence

# 문장의 최대 길이 파악 (maxlen)

maxlen = max(len(line) for line in x_train_seq)

# 모든 문장을 최대 문장 Seq 길이에 맞춘다.

x_train_pad =pad_sequences(x_train_seq, maxlen=maxlen)

x_test_pad = pad_sequences(x_test_seq, maxlen=maxlen)

# 모델 구축

# Hyper parameters

max_words = max_words+1 #총 단어 갯수 + padding 0 번호

max_len = maxlen # 최대 문장 길이

embedding_dim = 32 # embedding 차원

model = Sequential()

# 단어를 32차원으로 Vector 변경(Embedding)

model.add(Embedding(max_words, embedding_dim, input_length=max_len))

model.add(LSTM(16, return_sequences=True))

model.add(LSTM(16, return_sequences=True))

model.add(Flatten())

model.add(Dense(128, activation='swish'))

model.add(Dense(32, activation='swish'))

model.add(Dense(6, activation='softmax'))

# 모델 compile

model.compile(loss = 'sparse_categorical_crossentropy',

optimizer = 'adam',

metrics = ["accuracy"])

model.summary()

# 조기종료 콜백함수 정의(EarlyStopping)

es = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

# 체크포인트 저장(ModelCheckpoint)

checkpoint_path = "tmp_checkpoint.weights.h5"

cp = ModelCheckpoint(checkpoint_path, monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=True)

# 모델 학습(fit)

history = model.fit(x_train_pad, y_train, epochs=50, batch_size=512,

validation_split=0.2, verbose =1, callbacks=[es, cp])

# Model Performance

epochs = range(1, len(history.history['accuracy']) + 1)

plt.plot(epochs, history.history['accuracy'])

plt.plot(epochs, history.history['val_accuracy'])

plt.title('model accuracy')

plt.ylabel('loss')

plt.xlabel('epoch')

plt.legend(['train', 'valid'], )

plt.show()

model.evaluate(x_test_pad, y_test)