LSTM 기반 텍스트 분류

​# 모듈 import

import numpy as np

import pandas as pd

import tensorflow as tf

from sklearn.preprocessing import LabelEncoder

from sklearn.model_selection import train_test_split

from tensorflow.keras.preprocessing.text import Tokenizer

from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.layers import Dense, Flatten, Conv1D, MaxPool2D

from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, SimpleRNN, GRU

from tensorflow.keras.models import Sequential

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

import copy

# Load data

df0 = pd.read_csv('~~~.csv' ) # csv파일 경로 읽어오기

print(df0.info())

 

#데이터 예시

#   감정    문장

#   분노    일은 왜 해도 해도 끝이 없을까? 화가 난다.

#   분노    이번 달에 또 급여가 깎였어! 물가는 오르는데 월급만 자꾸 깎이니까 너무 화가 나.

 분노    회사에 신입이 들어왔는데 말투가 거슬려. 그런 애를 매일 봐야 한다고 생각하니까 스...

#   분노    직장에서 막내라는 이유로 나에게만 온갖 심부름을 시켜. 일도 많은 데 정말 분하고 ...

#   분노    얼마 전 입사한 신입사원이 나를 무시하는 것 같아서 너무 화가 나.

 

# 데이터 전처리

df = copy.copy(df0)

# "문장" 열에서 숫자, 영문자, 특수 문자등은 없는 것을 확인

number_abnormal = df['문장'][df['문장'].str.contains('[^가-힣 ]')].sum()

print(number_abnormal)

# Null, 중복 제거

# '문장' 열에서 양 끝의 빈공간 삭제

df['문장'] = df['문장'].str.strip()

# 결측값이 있는 행을 제거

df = df.dropna()

# "문장"열에서 중복 데이터를 삭제

df.drop_duplicates(subset=['문장'], inplace=True)

# 데이터 시각화

# label 분포 확인

dist_label = df['감정'].value_counts()

print(dist_label)

# "감정" 열(Label)기준으로 plot Bar차트 그리기

dist_label.plot(kind='bar')

# Label 인코딩

# 감정 리스트

list1 = df['감정'].value_counts().index.values

list1

# 라벨와 클래스을 매핑 작업

label2class = {}

class2label = {}

for cl, la in enumerate(list1):

# print(i, j)

label2class[la] = cl

class2label[cl] = la

print(label2class)

print(class2label)

df['label'] = df['감정'].map(label2class)

df.tail()

## Label과 Class를 Mapping

#encoder = LabelEncoder()

#df['감정'] = encoder.fit_transform(df['감정'])

#encoder.classes_

# X, y 분리

features = df['문장'].values

labels = df['감정'].values

print(f"feature 차원: {features.shape} \n label 차원: {labels.shape} \n")

print('이벤트 문자열 최대 길이 :{}'.format(max(len(l) for l in features)))

print('이벤트 문자열 평균 길이 :{}'.format(sum(map(len, features))/len(features)))

# Train, Test 데이터 셋 분리

x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, stratify=labels, random_state=41)

# Tokenizing

tokenizer = Tokenizer()

tokenizer.fit_on_texts(x_train)

max_words = len(tokenizer.index_word)

print(max_words) # 총 단어 갯수 확인

# texts_to_sequences

x_train_seq = tokenizer.texts_to_sequences(x_train)

x_test_seq = tokenizer.texts_to_sequences(x_test)

# 문장을 숫자로 변경후 갯수 확인

# x_train.shape, x_test.shape, y_train.shape, y_test.shape : ((41259,), (10315,), (41259,), (10315,))

print(len(x_train_seq), len(x_test_seq))

# Padding Sequence

# 문장의 최대 길이 파악 (maxlen)

maxlen = max(len(line) for line in x_train_seq)

# 모든 문장을 최대 문장 Seq 길이에 맞춘다.

x_train_pad =pad_sequences(x_train_seq, maxlen=maxlen)

x_test_pad = pad_sequences(x_test_seq, maxlen=maxlen)

# 모델 구축

# Hyper parameters

max_words = max_words+1 #총 단어 갯수 + padding 0 번호

max_len = maxlen # 최대 문장 길이

embedding_dim = 32 # embedding 차원

model = Sequential()

# 단어를 32차원으로 Vector 변경(Embedding)

model.add(Embedding(max_words, embedding_dim, input_length=max_len))

model.add(LSTM(16, return_sequences=True))

model.add(LSTM(16, return_sequences=True))

model.add(Flatten())

model.add(Dense(128, activation='swish'))

model.add(Dense(32, activation='swish'))

model.add(Dense(6, activation='softmax'))

# 모델 compile

model.compile(loss = 'sparse_categorical_crossentropy',

optimizer = 'adam',

metrics = ["accuracy"])

model.summary()

# 조기종료 콜백함수 정의(EarlyStopping)

es = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

# 체크포인트 저장(ModelCheckpoint)

checkpoint_path = "tmp_checkpoint.weights.h5"

cp = ModelCheckpoint(checkpoint_path, monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=True)

# 모델 학습(fit)

history = model.fit(x_train_pad, y_train, epochs=50, batch_size=512,

validation_split=0.2, verbose =1, callbacks=[es, cp])

# Model Performance

epochs = range(1, len(history.history['accuracy']) + 1)

plt.plot(epochs, history.history['accuracy'])

plt.plot(epochs, history.history['val_accuracy'])

plt.title('model accuracy')

plt.ylabel('loss')

plt.xlabel('epoch')

plt.legend(['train', 'valid'], )

plt.show()

model.evaluate(x_test_pad, y_test)

 

  • 이미지 파일을 읽어 라벨링하여 학습, 검증 데이터 생성
  • CNN 모델 생성하여 이미지 분류기 학습
  • Tranfer learning 기반 이미지 분류기 학습

 

----------------------------------------------------------------------------------------------------

- 데이터 불러오기, 라벨링, 학습/검증 data 분리

 

# 모듈 import

import os

from PIL import Image

import numpy as np

import tensorflow as tf

import matplotlib.pyplot as plt

from tensorflow.keras.preprocessing import image

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

import pathlib

 

 

#데이터 불러오기

dataset_url = "데이터 url"

data_dir = tf.keras.utils.get_file('data_folder_name', origin=dataset_url, untar=True)

data_dir = pathlib.Path(data_dir)

 

 

# Train, Test Split 분류

# Hyperparameters

input_shape = (224, 224, 3)

batch_size = 32

num_classes = 5

 

# Train Data

train_ds = tf.keras.preprocessing.image_dataset_from_directory(

directory=str(data_dir),

label_mode="categorical", # binary , categorical

batch_size=batch_size,

image_size=(224, 224),

seed=42,

shuffle=True,

validation_split=0.2,

subset="training"

)

# Test Data

test_ds = tf.keras.preprocessing.image_dataset_from_directory(

directory=str(data_dir),

label_mode="categorical", # binary , categorical

batch_size=batch_size,

image_size=(224, 224),

seed=42,

validation_split=0.2,

subset="validation"

)

 

# Class 이름 확인

print(f"Class 이름: {train_ds.class_names} \n")

 

 

# Training data 차원 확인

batch_img, batch_label = next(iter(train_ds))

print(f"이미지의 차원: {batch_img.shape} \n레이블의 차원: {batch_label.shape}")

----------------------------------------------------------------------------------------------------

 

----------------------------------------------------------------------------------------------------

- CNN 모델 생성, 학습

 

# Hyperparameters

num_epochs = 10

batch_size = 32

learning_rate = 0.001

dropout_rate = 0.5

input_shape = (224, 224, 3) # 데이터 사이즈

num_classes = 5

 

# CNN Model

model = Sequential()

model.add(Rescaling(1. / 255))

model.add(Conv2D(32, kernel_size=(5,5), strides=(1,1), padding='same', activation='relu', input_shape=input_shape))

model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2)))

model.add(Conv2D(64,(3,3), activation='relu', padding='same'))

model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(64,(3,3), activation='relu', padding='same'))

model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(128,(3,3), activation='relu', padding='same'))

model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Dropout(0.2)) model.add(Flatten())

model.add(Dense(128, activation='relu'))

model.add(Dropout(0.3))

model.add(Dense(5, activation='softmax'))

 

# Compile Model

model.compile( optimizer = tf.keras.optimizers.Adam(learning_rate), loss = "categorical_crossentropy", metrics = ["accuracy"] # metrics )

 

# EarlyStopping

es = EarlyStopping( monitor='val_loss', mode='min', verbose=1, patience=3)

 

# 모델 학습

history = model.fit( train_ds, validation_data=(test_ds), epochs=10, callbacks=[es] )

 

----------------------------------------------------------------------------------------------------

 

 

----------------------------------------------------------------------------------------------------

- Transfer learning으로 분류기 학습

 

# Transfer learning 할 모델 종류 확인

print(dir(tf.keras.applications)) # keras.applications에 어떤 종류의 모델이 있는지 확인

 

 

# 학습된 모델 가져오기.

base_model = tf.keras.applications.VGG16(input_shape=(224, 224, 3), weights='imagenet', include_top=False)

base_model.trainable = False

## VGG모델사용, imagenet에 사용된 사전학습된 weight 사용, include_top : 기존 모델의 class 분류기 사용 여부, base_model.trainable : 사전학습된 weight를 학습시킬지 동결시킬지 여부

 

 

 

# 모델 layer 설계

inputs = tf.keras.Input(shape=(224, 224, 3))

x = tf.keras.layers.Rescaling(1./127.5, offset=-1)(inputs)

x = base_model(x, training=False)

 

x = tf.keras.layers.GlobalAveragePooling2D()(x)

# x = tf.keras.layers.Flatten()(x)

 

# 분류 class 수 5개

output = tf.keras.layers.Dense(5, activation='softmax')(x)

 

model = tf.keras.Model(inputs=inputs, outputs=output)

model.summary()

 

 

 

# Compile Model

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate),

loss='categorical_crossentropy',

metrics=['accuracy'])

 

# EarlyStopping

es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=3)

 

 

# 모델 학습

history = model.fit(

train_ds,

validation_data = test_ds,

epochs=2,

callbacks=[es]

)

 

 

# Model Performance

plt.plot(history.history['accuracy'], label='Accuracy')

plt.plot(history.history['val_accuracy'], label='Val Accuracy')

plt.xlabel('Epoch')

plt.ylabel('Accuracy')

plt.legend()

plt.title('Model Accuracy')

plt.show()