Python/Pytorch

[Pytorch] BERT로 감성 분석하기. 기초 설명까지

언킴 2022. 12. 3. 14:27
반응형

Contents

     

    BERT(Bidirectional Encoder Representation from Transformers)는 Transformer를 기반으로 양방향 학습을 하는 사전학습 모델이다. 사전학습 모델이란 미리 사전에 많은 양의 코퍼스를 학습하고, 그 후 파인튜닝(Fine-tuning)을 통해 본인이 사용하고자 하는 도메인에 적용하는 모델이다. Transformer가 나오기 이전에도 Bi-LSTM 등과 같이 양방향 학습 모델이 존재했지만, 사전 학습 모델은 아니었으며, BERT는 Self-supervised Learning 중 하나인 Making 기법을 사용하여 학습의 성능을 향상시키고, 지금까지도 많이 사용된다.

     

    이번 글에서는 BERT를 사용하는 방법에 대해서 알아보고, 각 코드가 어떤 의미를 나타내는지 알아보자. Yelp.com 데이터를 활용하여 알아본다. Yelp.com 데이터는 레스토랑 리뷰 데이터를 담고 있으며, 사용자, 제품, 평점, 리뷰 등의 데이터가 존재한다. 추천 시스템에서는 감성 분석을 통해 실제 사용자가 남긴 리뷰와 평점 간의 차이를 토대로 학습한 후 사용자가 선호할 만한 레스토랑을 추천하는 형태로 진행된다.

     

    Import-Package

    import time, os
    import tqdm 
    import numpy as np
    import random 
    import pandas as pd 
    
    import torch
    import torch.nn as nn 
    import torch.optim as optim 
    
    from sklearn.model_selection import train_test_split
    
    from torch.utils.data import Dataset, DataLoader
    from transformers import BertTokenizer, BertModel

    분석에 필요한 패키지를 호출한다. time과 tqdm은 학습에 걸리는 경과 시간을 측정하기 위해 사용되고, os는 경로, torch과 transformers, sklearn는 모델을 구축하고 데이터셋을 만들기 위해 사용된다.

     

    Data Load

    data_path = os.path.join('data', 'yelp_sample.csv')
    dataset = pd.read_csv(data_path, encoding='utf-8-sig')

    Yelp.com 데이터는 Sample 데이터로 가지고 왔다. 실제 데이터를 활용하고 싶으면 Yelp.com 홈페이지에서 다운 받으면 된다. Sample 데이터는 [여기]에 업로드해두었다.

     

    Preprocessing

    def sentiment_score(x):
        if x >= 3.5 : return 1
        elif x < 3.5 : return 0
        
    
    train, test = train_test_split(dataset, test_size=0.2, random_state=42)
    train, valid = train_test_split(dataset, test_size=0.1, random_state=42)
    
    train.user_id = train.loc[:, 'user_id'].astype('category')
    train.business_id = train.loc[:, 'business_id'].astype('category')
    
    total_user_category = train.loc[:, 'user_id'].cat.categories
    total_rest_category = train.loc[:, 'business_id'].cat.categories
    
    valid.user_id = valid.loc[:, 'user_id'].astype('category')
    valid.business_id = valid.loc[:, 'business_id'].astype('category')
    
    test.user_id = test.loc[:, 'user_id'].astype('category')
    test.business_id = test.loc[:, 'business_id'].astype('category')
    
    valid.user_id= valid.loc[:, 'user_id'].cat.set_categories(total_user_category)
    valid.business_id = valid.loc[:, 'business_id'].cat.set_categories(total_rest_category)
    
    test.user_id = test.loc[:, 'user_id'].cat.set_categories(total_user_category)
    test.business_id = test.loc[:, 'business_id'].cat.set_categories(total_rest_category)
    
    train.user_id = train.loc[:, 'user_id'].cat.codes
    train.business_id = train.loc[:, 'business_id'].cat.codes
    
    valid.user_id = valid.loc[:, 'user_id'].cat.codes
    valid.business_id = valid.loc[:, 'business_id'].cat.codes
    
    test.user_id = test.loc[:, 'user_id'].cat.codes
    test.business_id = test.loc[:, 'business_id'].cat.codes
    
    
    train.loc[:,'stars'] = train.loc[:, 'stars'].apply(sentiment_score)
    valid.loc[:, 'stars'] = valid.loc[:, 'stars'].apply(sentiment_score)
    test.loc[:, 'stars'] = test.loc[:, 'stars'].apply(sentiment_score)
    
    train = train.dropna().reset_index(drop=True)
    valid = train.dropna().reset_index(drop=True)
    test = train.dropna().reset_index(drop=True)
    
    
    train.loc[:, 'text'] = train.loc[:, 'text'].apply(lambda x: '[CLS] ' + str(x) + ' [SEP]' )
    valid.loc[:, 'text'] = valid.loc[:, 'text'].apply(lambda x: '[CLS] ' + str(x) + ' [SEP]' )
    test.loc[:, 'text'] = test.loc[:, 'text'].apply(lambda x: '[CLS] ' + str(x) + ' [SEP]' )

    Yelp.com 데이터는 user ID, restaurant ID, review, stars로 구성되어 있다. 따라서, user ID와 restaurant ID를 정량화 즉, 숫자로 변환하는 작업이 필요하다. 또한, 이번 글에서는 평점을 예측하는 회귀 문제가 아니기 때문에 선행 연구에서 평점 정보를 긍정인 경우에는 1, 부정인 경우에는 0으로 변환해주어야 한다. 대부분의 선행 연구에서는 평점 3.5를 기준으로 3.5 이상인 경우에는 1로 변환하고, 아닌 경우에는 0으로 표기하기 때문에 동일한 방식으로 진행하였다. 또한, 학습 데이터에는 없는데 검증 데이터와 테스트 데이터에는 존재하는 경우 Cold Start 문제로 학습할 수 없기 때문에 이와 같은 방식으로 전처리를 수행한다.

     

    [CLS] token과 [SEP] token은 문장의 시작, 그리고 끝을 나타내주는 특별한 token을 의미한다. [CLS], [SEP]를 입력으로 넣어주어야 BERT는 이를 인식하고 학습을 시작하고, 끝낼 수 있다. 

     

    Load model and tokenizer

    bert_model = BertModel.from_pretrained('huawei-noah/TinyBERT_General_4L_312D')
    tokenizer = BertTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D')
    
    
    # bert_model = BertModel.from_pretrained('bert-base-uncased')
    # tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    bert_model.config
    
    # BertConfig {
    #  "_name_or_path": "huawei-noah/TinyBERT_General_4L_312D",
    #  "attention_probs_dropout_prob": 0.1,
    #  "cell": {},
    #  "classifier_dropout": null,
    #  "emb_size": 312,
    #  "hidden_act": "gelu",
    #  "hidden_dropout_prob": 0.1,
    #  "hidden_size": 312,
    #  "initializer_range": 0.02,
    #  "intermediate_size": 1200,
    #  "layer_norm_eps": 1e-12,
    #  "max_position_embeddings": 512,
    #  "model_type": "bert",
    #  "num_attention_heads": 12,
    #  "num_hidden_layers": 4,
    #  "pad_token_id": 0,
    #  "position_embedding_type": "absolute",
    #  "pre_trained": "",
    #  "structure": [],
    #  "transformers_version": "4.24.0",
    #  "type_vocab_size": 2,
    #  "use_cache": true,
    #  "vocab_size": 30522
    #}

    transformers에서는 버트를 쉽게 불러올 수 있게 도와준다. TinyBERT 뿐만 아니라 다양한 모델을 호출할 수 있다. 현재 컴퓨터의 성능의 한계로 인해 BERT-base 모델을 사용하지 않고 TinyBERT로 경량화된 BERT 를 사용한다. uncased는 대소문자를 구분하지 않겠다는 뜻이다. 연구의 목적에 맞게 적절한 모델을 호출해서 사용하면 된다. 중요한 것은 모델의 이름과 토크나이저의 이름이 동일해야 한다. 즉, 'bert-base-uncased'를 사용하고자 한다면, 토크나이저도 'bert-base-uncased'를 사용해야 한다는 것이다. Config를 수정해서 다양한 값을 조정할수도 있다. 

     

    class BERTClassifier(nn.Module):
        def __init__(self, args, bertmodel):
            super(BERTClassifier, self).__init__()
            self.bert = bertmodel
            self.classifier = nn.Linear(312, 1) # tinybert=312, bert-base=768
            self.dropout = nn.Dropout(0.3)
    
        
        def forward(self, input_ids, attention_mask, token_type_ids):
            output = self.bert(input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
            pooler = output['pooler_output']
            pooler = self.dropout(pooler)
            fc_layer = self.classifier(pooler)
            return fc_layer
            
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    model = BERTClassifier(bert_model)
    model.to(device)

    그다음으로는 Classifier를 수정하여야 한다. 기본적으로 BERT를 사용하기는 하지만 BERT 자체는 기계 번역 NLU를 하기 위해 제안된 모델이다. 따라서, 감성 분석을 수행하기 위해서는 위와 같은 형태로 코드를 변환하여야 한다. BERT의 self.classifier에서 nn.Linear는 마지막 출력층 Fully Connected Layer를 의미한다. 이때 FC Layer는 이진 분류인 경우에는 1로 설정하고, 다진 분류인 경우에는 Class의 수 만큼 지정한다. Dropout을 사용하고 싶은 경우에는 dr_rate를 적절히 설정하면 된다.  

     

    그 다음으로 forward 부분은 실제 모델이 작동되는 부분이다. 입력으로 input_ids, attention_mask, token_type_ids를 사용한다. input_ids는 token의 ids를 의미하고, attention_mask는 어떤 단어를 masking할지 index로 설정되어 있다. token_type_ids는 segment_id를 의미한다. BERT의 출력은 hidden_state와 pooler로 구성되어 있다. pooler는 [CLS] token 즉, 문장의 입력으로 사용되는 token의 representation이다. [CLS]의 경우 모델을 학습할 때 모든 단어를 예측할 때 사용되기 때문에 실제 분류과정에서 [CLS] token을 이용하여 분류한다.

     

     

    Build Dataset and DataLoader

    class BERTDataset(Dataset):
        def __init__(self, dataframe, tokenizer):
            self.tokenizer = tokenizer 
            self.data = dataframe 
            self.reviews = dataframe.text 
            self.labels = dataframe.stars
            self.max_seq_length = 512 # max_seq_length
    
            
    
        def __len__(self):
            return len(self.labels)
    
        def __getitem__(self, idx):
    
            review = self.reviews[idx]
    
            inputs = self.tokenizer.encode_plus(
                review, 
                None,
                add_special_tokens=True, 
                max_length=self.max_seq_length, 
                padding='max_length', 
                return_token_type_ids=True, 
                truncation=True
            )
    
            input_ids = inputs['input_ids']
            masks = inputs['attention_mask']
            token_type_ids = inputs['token_type_ids']
    
            return (
                torch.tensor(input_ids, dtype=torch.long), # token_ids
                torch.tensor(masks, dtype=torch.long), # attention_mask
                torch.tensor(token_type_ids, dtype=torch.long), # token_type_ids
                torch.tensor(self.labels[idx], dtype = float) # labels
            )
            
    train_dataset = BERTDataset( train, tokenizer)
    valid_dataset = BERTDataset( valid, tokenizer)
    
    train_dataloader = DataLoader(train_dataset, batch_size=32, num_workers=1)
    valid_dataloader = DataLoader(valid_dataset, batch_size=32, num_workers=1)

    BERTDataset으로 처리해도되고, TensorDataset을 통해 사용하여도 무방하다.  BERTDataset은 __init__을 통해 초기치를 설정한 후 __getitem__ 즉 index를 호출하면 __getitem__아래의 값을 통과하여 값을 반환해준다. 리뷰가 들어가면 tokenizer에서 encoding 작업을 수행해 review를 다 숫자로 변환한다. review의 길이가 max sequence length 보다 작은 경우에는 나머지를 전부 0으로 padding해주고, 긴 경우에는 truncate해서 짤라준다.  최종적으로 input_ids, masks, token_tyep_ids가 출력이되고 이를 tensor로 변환하여 return해준다. 그런다음 DataLoader 함수에 입력으로 사용한다. DataLoader에서는 batch_size, shuffle, collate_fn, sampler, num_wokers 등 다양한 인자들이 존재한다. collate_fn은 데이터를 어떻게 반환할 것인지에 대한 것을 함수로 지정하여 사용할 수 있으며, 필수로 사용하지는 않는다. sampler는 RandomSampler, SequenceSampler 등 다양한 형태로 Sampling하는 것을 의미한다. 마지막으로 num_worker는 CPU와 GPU 간의 전달 과정에서 효율적으로 학습하기 위해 예비 작업을 수행하는 head 수를 의미한다. GPU를 따로 사용하지 않는다면 num_workers=1, GPU를 사용한다면 2이상의 값을 사용하는게 좋다.

     

    Training

    def set_seed(seed):
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    
    def epoch_time(start_time, end_time):
        elapsed_time = end_time - start_time 
        elapsed_mins = int(elapsed_time/60)
        elapsed_secs = elapsed_time - elapsed_mins*60 
    
        return elapsed_mins, elapsed_secs
        
        
    num_epochs = 50
    learning_rate = 1e-5
    
    criterion = nn.BCEWithLogitsLoss().to(device)
    optimizer =optim.Adam(model.parameters(), lr = learning_rate, weight_decay = 0.001)

     

    학습을 하기에 앞서 먼저 학습에 필요한 함수들을 선언하고, 하이퍼파라미터도 설정해야 한다. Optimizer는 Adam을 사용하였으며, Loss Function으로는 BCEWithLogitsLoss를 사용하였다. Loss에 대한 내용은 [여기]를 참고하면 된다.

     

    train_loss_list, train_acc_list = [], []
    test_loss_list, test_acc_list = [], []
    
    best_loss = float('inf')
    set_seed(42)
    for epoch in range(1, num_epochs+1):
        train_loss = 0 
        train_acc = 0
    
        test_loss = 0
        test_acc = 0 
    
        model.train()
        start_time = time.time()
        for batch in tqdm.tqdm(train_dataloader, desc = 'training...'):
            batch = tuple(t.to(device) for t in batch)
    
            inputs = {'input_ids':      batch[0], 
                      'attention_mask': batch[1], 
                      'token_type_ids':     batch[2]}
            label = batch[3]
    
            pred_y = model(**inputs).squeeze()
            loss = criterion(pred_y, label)
    
            train_acc += calc_accuracy(pred_y, label)
            train_loss += loss.item()
    
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
        train_acc /= len(train_dataset)
        train_loss /= len(train_dataset)
        train_acc_list.append(train_acc)
        train_loss_list.append(train_loss)

    코드 자체는 BERT도 다른 모델들과 매우 유사하다. DataLoader를 통해 Batch Size 만큼의 데이터를 불러와서 model의 입력으로 사용하고, Loss Function을 사용해 학습한다. 그 후 정확도를 측정하고 역전파알고리즘을 통해 손실값을 계산한 후 optimizer.step을 통해 기울기의 반대방향으로 이동하는 형태이다. optimizer.zero_grad를 하지 않는다면, 그 전에 계산된 기울기 정보가 optimizer에 들어가 있을 수 있기 때문에 zero_grad를 한다. 

     

    Evaluating

        model.eval()
        with torch.no_grad():
            for batch in tqdm.tqdm(valid_dataloader, desc = 'evaluating...'):
                batch = tuple(t.to(args.device) for t in batch)
    
                inputs = {'input_ids':      batch[0], 
                          'attention_mask': batch[1], 
                          'token_type_ids':     batch[2]}
                label = batch[3]
    
                pred_y = model(**inputs).squeeze()
                loss = criterion(pred_y, label)
    
                test_acc += calc_accuracy(pred_y, label)
                test_loss += loss.item()
    
        test_acc /= len(valid_dataset)
        test_loss /= len(valid_dataset)
        test_acc_list.append(test_acc)
        test_loss_list.append(test_loss)
    
    
        end_time = time.time()
        elapsed_mins, elapsed_secs = epoch_time(start_time, end_time)
        print(f'epoch [{epoch}/{args.num_epochs}], elapsed time: {elapsed_mins}m, {elapsed_secs:.2f}s')
        print(f'train loss: {train_loss:.4f}\ttrain accuracy: {train_acc*100:.2f}%')
        print(f'test loss: {test_loss:.4f}\ttest accuracy: {test_acc*100:.2f}%\n')
    
        if best_loss > test_loss :
            best_loss = test_loss 
            bert_param_path = os.path.join(BASE_DIR, 'baseline_parameters')
            if not os.path.exists(bert_param_path):
                os.makedirs(bert_param_path)
    
            torch.save(model.state_dict(), os.path.join(bert_param_path, f'bert_parameters.pt'))
        
    df = pd.DataFrame([train_loss_list, test_loss_list, train_acc_list, test_acc_list], index = ['train_loss', 'test_loss', 'train_acc', 'test_acc']).T 
    
    
    save_path = os.path.join(BASE_DIR, 'baseline')
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    
    df.to_csv(f'{save_path}/bert_results.csv', encoding='utf-8-sig', index=False)

    Evaluating 단계도 Training 단계와 매우 유사하다. 그러나 with no_grad라는 함수를 사용한다. 이 함수는 학습할 때 기울기를 따로 계산하지 마라는 명령어이다. 또한 model.eval()을 통해 모델을 학습하는 것이 아니라 evaluating하는 것이라고 선언해주어야 한다. 그 후 동일한 방식으로 학습하고, Test Loss를 기준으로 Loss 값이 가장 낮은 경우의 모델 구조를 저장하는 형태로 구성되어 있다. 최종적으로 DataFrame 형식으로 저장해 결과를 확인할 수 있다. BERT의 경우 매우 무거운 모델이기 때문에 Local에서 gpu가 없는 경우 학습하는 것이 어렵다. Colab에서 지원하는 GPU를 사용해서 돌려보는 것을 권장한다. 

     

    전체 코드는 [여기]에 자세히 작성해두었다.