캐글 – 집 값 예측

머신 러닝 파이프라인을 만들기 위하여, 캐글의 집 값 데이터를 사용해 보도록 하겠습니다.

집 값 예측은, 미국 아이오와 주의 에임스에 있는 주거용 주택의 정보를 이용하여 주택의 판매가격을 예측하는 Competition입니다.

이 장에서는 집 값 예측 결과를 캐글에 제출하기 위한 파이프라인을 구성해 볼 것입니다. 그리고 파이프라인에서 Katib를 이용하여 모델의 하이퍼 파라미터 튜닝을 할 것입니다. 데이터를 저장하기 위해서, PVC와 S3 두 개 모두 사용하겠습니다.

데이터 분석 보다는 전체적인 파이프라인을 만드는 것에 중점을 두고 설명하겠습니다.

사전 준비

House Prices: Advanced Regression Techniques 접속

예제에서 사용할 데이터가 있는 집 값예측 Competition 의 주소는 다음과 같습니다.

<https://www.kaggle.com/c/house-prices-advanced-regression-techniques>

해당 주소로 직접 접속하셔도 되고, 상단에 있는 검색바를 이용하여 검색하셔도 됩니다.

다음은 검색바를 이용하여, “House Prices: Advanced Regression Techniques” 을 검색한 결과입니다.

“House Prices: Advanced Regression Techniques” Competition 페이지 접속하면 다음과 같은 화면을 볼 수 있습니다.

데이터는 “Data” 탭에서 받을 수 있습니다. “Data” 탭을 클릭하면, 데이터에 대한 자세한 설명과 다운로드 받는 방법을 볼 수 있습니다.

주피터 노트북

주피터에서 새로운 Terminal 을 엽니다.

주피터 노트북에서 타이타닉 데이터 다운로드하기

작업 디렉토리를 생성한 다음, 타이타닉 데이터를 다운로드 합니다. house-prices-advanced-regression-techniques.zip 라는 파일이 다운로드 됩니다.

mkdir -p ~/workspace/house-prices/kaggle
cd ~/workspace/house-prices/kaggle
kaggle competitions download -c house-prices-advanced-regression-techniques
Downloading house-prices-advanced-regression-techniques.zip to /home/jovyan/workspace/house-prices/kaggle
  0%|                                                                                          | 0.00/34.1k [00:00<?, ?B/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 34.1k/34.1k [00:00<00:00, 842kB/s]

다운로드한 파일의 압축을 풀겠습니다. 모델 학습을 위한 train.csv 파일과, 예측에 사용할 데이터인 test.csv 그리고, 캐글에 제출할 파일의 형식을 보여주는 sample_submission.csv 파일이 생성됩니다.

unzip house-prices-advanced-regression-techniques.zip
Archive:  house-prices-advanced-regression-techniques.zip
  inflating: data_description.txt
  inflating: sample_submission.csv
  inflating: test.csv
  inflating: train.csv

파이썬 패키지 설치

머신 러닝 모델 코드를 작성하기 위한 패키지들을 설치합니다. 이 장에서는 xgboost 패키지와 pandas_profiling 패키지를 추가로 사용합니다. 만약 설치되어 있지 않다면, 다음 명령어를 실행하여 패키지를 설치합니다.

pip install xgboost pandas_profiling  --user

데이터 전처리와 모델 작성

문제 정의하기

주어진 데이터를 바탕으로, 집 값을 예측하는 문제입니다.

데이터 전처리

머신 러닝 모델에서 데이터를 사용할 수 있도록, 데이터를 전처리 해 보겠습니다. 먼저 캐글에서 제공한 데이터를 살펴보도록 하겠습니다. 이번에는 pandas_profiling 라는 패키지를 사용해 보겠습니다.

집 값 데이터 분석 및 검증

먼저 집 값 데이터에 대해 이해하기 위하여, 간단히 탐색해보도록 하겠습니다.

주피터 노트북을 생성합니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

주피터 노트북을 이용하여 데이터를 분석 및 검증해 보겠습니다.

판다스를 이용하여, 다운받은 캐글 데이터를 읽어 오겠습니다.

In[] :

import os
import pandas as pd

input_path='./kaggle'
train_data = pd.read_csv(os.path.join(input_path, 'train.csv'))
test_data = pd.read_csv(os.path.join(input_path, 'test.csv'))

pandas_profiling 을 이용하여, 데이터의 프로파일링 리포트를 출력하겠습니다.

In[] :

import pandas_profiling
train_data.profile_report()

프로파일링 리포트의 결과는 다음과 같습니다. 단 한 줄의 명령어로 데이터를 분석해 볼 수 있습니다.

리포트 결과를 보면 알 수 있듯이, 여러 타입의 데이터가 존재하고, 결측값도 존재하는 것을 확인할 수 있습니다.

데이터 처리

캐글에서 좋은 점수를 받는게 목적이 아니기 때문에, 과감히 ojbect 타입의 데이터를 삭제하겠습니다.

In []:

train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object'])
train_X = train_data.drop(['SalePrice'], axis=1)
train_y = train_data['SalePrice']
test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object'])

결측값들도 SimpleImputer 를 이용하여 간단히 처리하겠습니다. strategy='median' 을 지정하였기 때문에 결측값들은 중간값으로 대치됩니다.

In []:

imputer = SimpleImputer(strategy='median')
imputer.fit(train_X)

train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index)
train_data = pd.concat([train_X, train_y], axis=1)
test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index)

train_data = train_data.astype({'Id': int})
test_data = test_data.astype({'Id': int})

모델 학습 (Train)

준비한 데이터를 이용하여 모델을 학습해 보겠습니다.

먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearntrain_test_split() 를 사용하면, 간단히 나눌 수 있습니다.

In []:

from sklearn.model_selection import train_test_split

X = train_data.drop(['Id', 'SalePrice'], axis=1)
y = train_data['SalePrice']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

xgboostXGBRegressor을 사용하여, 모델을 학습 시켜 보겠습니다.

In []:

from xgboost import XGBRegressor

model = XGBRegressor()
model.fit(X_train, y_train, eval_set=[(X_test, y_test)])

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

[0]	validation_0-rmse:180608
[1]	validation_0-rmse:164048
...
[98]	validation_0-rmse:25150.7
[99]	validation_0-rmse:25156.1

이제 캐글에서 제공한 test 데이터를 가지고 예측해 보도록 하겠습니다.

pred = model.predict(test_data.drop(['Id'], axis=1))

예측한 결과물을 가지고, 캐글에 제공할 submission.csv 파일을 생성해 보겠습니다.

submission = pd.DataFrame({'Id': test_data['Id'], 'SalePrice': pred})
submission.to_csv('submission.csv', index=False)

submission 을 조회해 보겠습니다.

submission.head()

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

생성한 submission.csv 파일을 캐글에 제출하겠습니다. kaggle 명령어를 사용하여 제출합니다. kaggle 명령어가 PATH 에 포함되어 있지 않기 때문에 전체 경로를 적어 주었습니다.

!/home/jovyan/.local/bin/kaggle competitions submit -c house-prices-advanced-regression-techniques -f submission.csv -m "Message"

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

파이프라인 만들기

앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.

파이프라인의 단계는 다음과 같습니다.

  • 데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
  • 데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
  • 데이터 변환 : train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다.
  • 모델 학습 : 변환된 train.csv 데이터를 이용하여 모델을 학습니다.
  • 예측 : 변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
  • 캐글 제출 : 생성된 submission.csv 파일을 캐글에 제출합니다.

기본 이미지 만들기

데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.

먼저 베이스 이미지 디렉토리를 만듭니다.

mkdir -p ~/workspace/base/xgboost
cd ~/workspace/base/xgboost

필요한 파이썬 패키지 목록을 requirements.txt 파일로 작성합니다.

requirements.txt

scikit-learn
joblib
numpy
pandas
fire
xgboost

컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt 파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN apt-get update -y && \\
    apt-get install -y libgomp1

WORKDIR /app
COPY requirements.txt /app

RUN pip --no-cache-dir install -r requirements.txt

컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.

build_image.sh

#!/bin/bash -e

image_name=kangwoo/xgboost
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim

cd "$(dirname "$0")"

docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"

build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.

chmod +x build_image.sh
./build_image.sh

데이터 전처리 파이프라인 작성하기

데이터 전처리 파이프라인에서는 세개의 컴포넌트를 사용합니다.

  • Download : 캐글에서 데이터를 다운로드 받습니다.
  • Unzip : 압축을 풉니다.
  • Transofrm : 데이터를 전처리한 후 S3에 저장합니다.

모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Download 컴포넌트와 Unzip 컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. AWS 접속하여 사용할 S3 버킷을 생성합니다.

파이프라인의 전체적인 흐름은 다음과 같습니다.

파이프라인의 데이터 변환 작업이 kubeflow 네임스페이스 실행되므로, 해당 작업이 S3에 접근할 수 있도록, kubeflow 네임스페이스에 aws-secret 을 생성해줘야합니다.

export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY>

kubectl -n kubeflow create secret generic aws-secret \\
    --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\
    --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

데이터 전처리 파이프라인 컴포넌트 작성하기

주피터 노트북을 생성합니다. 파일 이름은 transform_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

패키지 추가

파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.

In []:

from kfp import components
from kfp import dsl

캐글 데이터 다운로드 컴포넌트

캐글에서 데이터를 다운로드 하는 컴포넌트를 정의합니다. 앞서 “타이타닉 생존 예측”에서 만들 컴포넌트를 재사용 합니다.

In []:

download_op = components.load_component_from_file('../components/kaggle/competitions_downloader/component.yaml')

데이터 압축풀기 컴포넌트

unzip 명령어를 사용하여 압축을 풀겠습니다.

In []:

def unzip_op(filename, exdir):
    return dsl.ContainerOp(name='Unzip',
                           image='garthk/unzip:latest',
                           command=['unzip', '-o', filename, '-d', exdir])

데이터 변환 컴포넌트

이번에는 [transform.py](<http://transform.py>) 라는 파이썬 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.

train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path 파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path 파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.

주피터 노트북 셀에서 %%writefile transform.py 을 이용하여transform.py 파일을 생성합니다.

In []:

%%writefile transform.py
import argparse
import os
from tempfile import TemporaryDirectory

import pandas as pd
from sklearn.impute import SimpleImputer

def transform(input_path, output_path):
    train_data = pd.read_csv(os.path.join(input_path, 'train.csv'))
    test_data = pd.read_csv(os.path.join(input_path, 'test.csv'))

    train_data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object'])
    train_X = train_data.drop(['SalePrice'], axis=1)
    train_y = train_data['SalePrice']
    test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object'])

    imputer = SimpleImputer(strategy='median')
    imputer.fit(train_X)

    train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index)
    train_data = pd.concat([train_X, train_y], axis=1)
    test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index)
    
    train_data = train_data.astype({'Id': int})
    test_data = test_data.astype({'Id': int})

    
    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    from minio import Minio
    minio_client = Minio('s3.amazonaws.com',
                 access_key=access_key,
                 secret_key=secret_key)
    
    from urllib.parse import urlparse
    url = urlparse(output_path, allow_fragments=False)
    bucket_name = url.netloc
    object_name = url.path.lstrip('/')
    
    with TemporaryDirectory() as tmpdir:
        tmp_train_data = os.path.join(tmpdir, 'train.csv')
        tmp_test_data = os.path.join(tmpdir, 'test.csv')
        train_data.to_csv(tmp_train_data, index=False)
        test_data.to_csv(tmp_test_data, index=False)
        minio_client.fput_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data)
        minio_client.fput_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_path', default='./kaggle', type=str)
    parser.add_argument('--output_path', default='./input', type=str)
    args = parser.parse_args()

    transform(args.input_path, args.output_path)

변환이 완료된 데이터를 S3에 저장하기 위하여, minio 라이브러리를 사용하였습니다. 그리고 S3에 접속하기 위한 자격 증명 정보는 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 라는 환경 변수를 이용하여 넘겨받습니다.

페어링을 이용하여, transform.py 파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.

In []:

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils
from kubeflow.fairing.preprocessors import base
from kubeflow.fairing.builders.append import append

CONTAINER_REGISTRY = 'kangwoo'

preprocessor = base.BasePreProcessor(executable="transform.py")

builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-transform",
                           base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor)
builder.build()

transform_image = builder.image_tag
print(transform_image)

데이터 전처리 파이프라인 생성하기

PVC와 캐글 토큰 확인하기

파이프라인 컴포넌트에서 이 PVC를 이용하여, 데이터를 저장하고 읽어 올 것입니다. PVC는 dsl.VolumeOp() 를 이용하여 생성한 후, 작업이 끝나면 삭제하도록 하겠습니다.

그리고 캐글 API 사용을 위한 토큰도 앞서 “타이타닉 생존 예측”에서 생성한 kaggle-secret 을 사용하겠습니다.

파이프라인 작성하기

주피터 노트북으로 돌아가서 파이프라인을 작성해 보겠습니다.

In []:

import os
import string

import kfp
import kfp.dsl as dsl
from kfp import components
from kfp import onprem
from kfp import aws
from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount

@dsl.pipeline(
    name='House Prices Transofrm Pipeline',
    description='House Prices Transofrm Pipeline'
)
def house_prices_transform_pipeline():
    
    secret_name = "aws-secret"
    
    competition_name = 'house-prices-advanced-regression-techniques'

    kaggle_data_path = os.path.join('/data/competitions', competition_name, 'kaggle')
    version = 'v0.0.1'
    ouput_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', version)
    
    volume_task = dsl.VolumeOp(
        name="house-prices-volume",
        resource_name="house-prices-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    download_task = download_op(competition=competition_name, path=kaggle_data_path)\\
        .add_pvolumes({"/data": volume_task.volume})\\
        .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
        .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle'))

    with dsl.Condition(download_task.outputs['downloaded'] == 'True'):
        unzip_task = unzip_op(os.path.join(kaggle_data_path, competition_name + '.zip'), kaggle_data_path)\\
                         .add_pvolumes({"/data": download_task.pvolume})\\
    
    
    transform_task = dsl.ContainerOp(
        name='Transform',
        image=transform_image,
        command=['python', '/app/transform.py'],
        arguments=['--input_path', kaggle_data_path, '--output_path', ouput_data_path],
        pvolumes={"/data": unzip_task.pvolume}
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))

    volume_task.delete().after(transform_task)
  

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(house_prices_transform_pipeline, 'house-prices-transform.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='House Prices Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'House Prices Transform Pipeline', 'house-prices-transform.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

S3 저장소의 해당 버킷을 조회해 보면, 다음과 같이 2개의 데이터 파일이 생성된 것을 확인할 수 있습니다.


모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기

모델 학습 및 캐글 제출 파이프라인에서는 세개의 컴포넌트를 사용합니다.

  • Katib Launcher: 하이퍼파리미터 튜닝을 위하여, Experiment 를 생성해 줍니다.
  • HP Out: 하이퍼파리미터 튜닝 결과 값을 출력해 줍니다.
  • Train: 하이퍼파리미터 값을 입력 받아 모델을 학습합니다. 학습한 모델은 S3에 저장합니다.
  • Predict : S3에서 모델을 가져와서, 예측을 수행합니다. 예측 결과 값을 퍼시스턴스 볼륨에 저장합니다.
  • Submit : 예측 결가 값을 캐글에 제출합니다.

모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Submit 컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. 그리고 하이퍼파라미터 튜닝을 위한 Experiment 리소스를 파이프라인이 실행되는 kubeflow 네임스가 아닌, 별도의 admin 네임스페이스에 생성하였습니다.

파이프라인의 전체적인 흐름은 다음과 같습니다.

하이퍼 파리미터 튜닝 작업이 admin 네임스페이스 실행되므로, 해당 작업들이 S3에 접근할 수 있도록, admin 네임스페이스에도 aws-secret 을 생성해줘야합니다.

export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY>

kubectl -n admin create secret generic aws-secret \\
    --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\
    --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기

주피터 노트북을 생성합니다. 파일 이름은 train_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

모델 학습과 예측 컴포넌트

train.py 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.

변환된 train.csv 데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path 파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 그리고 하이퍼 파라미터로서 n_estimatorslearning_rate 를 입력 받아, 모델 학습시 사용합니다. 그리고 mode 를 입력 받아, train 일 경우에는 모델 학습을, predict 일 경우에는 예측을 실행합니다. S3에서 데이터를 읽고, 저장하기 위하여 minio 라이브러리를 사용하였습니다.

주피터 노트북 셀에서 %%writefile train.py 을 이용하여 train.py 파일을 생성합니다.

In []:

%%writefile train.py
import argparse
import json
import os
from datetime import datetime, timezone
from tempfile import TemporaryDirectory

import pandas as pd
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor

def read_train_input(input_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(input_path)
    with TemporaryDirectory() as tmpdir:
        tmp_train_data_file = os.path.join(tmpdir, 'train.csv')
        minio_client.fget_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data_file)
        data = pd.read_csv(tmp_train_data_file)
        
    X = data.drop(['Id', 'SalePrice'], axis=1)
    y = data['SalePrice']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    return X_train, y_train, X_test, y_test

def read_test_input(input_path='./input'):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(input_path)
    with TemporaryDirectory() as tmpdir:
        tmp_test_data_file = os.path.join(tmpdir, 'test.csv')
        minio_client.fget_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data_file)
        data = pd.read_csv(tmp_test_data_file)
        
    X = data.drop(['Id'], axis=1)
    id = data['Id']

    return X, id

def train_model(X_train, y_train,
                X_test, y_test,
                n_estimators, learning_rate, early_stopping_rounds):
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(X_train, y_train,
              early_stopping_rounds=early_stopping_rounds, eval_set=[(X_test, y_test)])

    print('Best RMSE on eval: {} with {} rounds'.format(model.best_score, model.best_iteration + 1))
    return model

def eval_model(model, X_test, y_test):
    predictions = model.predict(X_test)
    local_time = datetime.now(timezone.utc).astimezone().isoformat()
    score = r2_score(predictions, y_test)
    print('{} r2_score={}'.format(local_time, score))
    return score

MODE_FILENAME = 'model.bst'

def load_model(model_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(model_path)
    model = XGBRegressor()
    with TemporaryDirectory() as tmpdir:
        tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
        minio_client.fget_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
        model.load_model(tmp_model_file)
        print('Load modelfrom ', os.path.join(model_path, MODE_FILENAME))
    return model

def save_model(model, model_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(model_path)
    with TemporaryDirectory() as tmpdir:
        tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
        model.save_model(tmp_model_file)
        minio_client.fput_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
        print('Save model to', os.path.join(model_path, MODE_FILENAME))

def get_minio_client():
    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']

    from minio import Minio
    return Minio('s3.amazonaws.com',access_key=access_key, secret_key=secret_key)

def get_bucket_name_and_object_name(path):
    from urllib.parse import urlparse
    url = urlparse(path, allow_fragments=False)
    bucket_name = url.netloc
    object_name = url.path.lstrip('/')
    return bucket_name, object_name

class ModelServe(object):
    def __init__(self, model_path=None, n_estimators=100, learning_rate=0.1, early_stopping_rounds=40):
        self.model_path = model_path
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.early_stopping_rounds = early_stopping_rounds
        print("model_path={}".format(self.model_path))
        print("n_estimators={}".format(self.n_estimators))
        print("learning_rate={}".format(self.learning_rate))
        print("early_stopping_rounds={}".format(self.early_stopping_rounds))

        self.model = None
        
    #         self._workspace = None
    #         self.exec = self.create_execution()

    def train(self, X_train, y_train, X_test, y_test):

        #         self.exec.log_input(metadata.DataSet(
        #             description="xgboost synthetic data",
        #             name="synthetic data",
        #             owner="someone@kubeflow.org",
        #             uri="file://path/to/dataset",
        #             version="v0.0.1"))

        model = train_model(X_train,
                            y_train,
                            X_test,
                            y_test,
                            self.n_estimators,
                            self.learning_rate,
                            self.early_stopping_rounds)

        score = eval_model(model, X_test, y_test)

        #         self.exec.log_output(metadata.Metrics(
        #             name="traing- valuation",
        #             owner="someone@kubeflow.org",
        #             description="training evaluation for xgboost synthetic",
        #             uri="file://path/to/metrics",
        #             metrics_type=metadata.Metrics.VALIDATION,
        #             values={"mean_absolute_error": mae}))

        if self.model_path:
            save_model(model, self.model_path)

    #         self.exec.log_output(metadata.Model(
    #             name="model",
    #             description="prediction model using synthetic data",
    #             owner="someone@kubeflow.org",
    #             uri='file://path/to/model',
    #             model_type="linear_regression",
    #             training_framework={
    #                 "name": "xgboost",
    #                 "version": "0.90"
    #             },
    #             hyperparameters={
    #                 "learning_rate": self.learning_rate,
    #                 "n_estimators": self.n_estimators
    #             },
    #             version=datetime.utcnow().isoformat("T")))

    def predict(self, X, feature_names):
        if not self.model:
            self.model = load_model(self.model_path)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return prediction

#     @property
#     def workspace(self):
#         if not self._workspace:
#             self._workspace = create_workspace()
#         return self._workspace

#     def create_execution(self):
#         r = metadata.Run(
#             workspace=self.workspace,
#             name="xgboost-synthetic-run" + datetime.utcnow().isoformat("T"),
#             description="a notebook run")

#         return metadata.Execution(
#             name = "execution" + datetime.utcnow().isoformat("T"),
#             workspace=self.workspace,
#             run=r,
#             description="execution for training xgboost-synthetic")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_path', default='./input', type=str)
    parser.add_argument('--hyperparameters', required=False, type=str)
    parser.add_argument('--n_estimators', default='100', type=int)
    parser.add_argument('--learning_rate', default='0.1', type=float)
    parser.add_argument('--model_path', required=False, type=str)
    parser.add_argument('--mode', default='train', choices=['train', 'predict'])
    parser.add_argument('--submission_path', default='./', type=str)
    args = parser.parse_args()

    if args.hyperparameters:
        hp_json = json.loads(args.hyperparameters)
        print("use json hyperparameters", hp_json)
        hyperparameters = {}
        for n in hp_json:
            hyperparameters[n['name']] = n['value']

        n_estimators = int(hyperparameters['--n_estimators'])
        learning_rate = float(hyperparameters['--learning_rate'])
    else:
        n_estimators = args.n_estimators
        learning_rate = args.learning_rate

    if args.mode == 'predict':
        test, id = read_test_input(args.input_path)
        model = ModelServe(model_path=args.model_path)
        pred = model.predict(test, None)
        submission = pd.concat([id, pd.Series(pred, name='SalePrice')], axis=1)
        print(submission)
        if not os.path.exists(args.submission_path):
            os.makedirs(args.submission_path)
        submission.to_csv(os.path.join(args.submission_path, 'submission.csv'), index=False)
    else:
        X_train, y_train, X_test, y_test = read_train_input(args.input_path)

        model = ModelServe(model_path=args.model_path, n_estimators=n_estimators, learning_rate=learning_rate)
        model.train(X_train, y_train, X_test, y_test)

페어링을 이용하여, [train.py](<http://train.py>) 파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.

In []:

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils
from kubeflow.fairing.preprocessors import base
from kubeflow.fairing.builders.append import append

CONTAINER_REGISTRY = 'kangwoo'

preprocessor = base.BasePreProcessor(executable="train.py")

builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-train",
                           base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor)
builder.build()

train_image = builder.image_tag
print(train_image)

파이프라인 생성하기

파이프라인 작성하기

파이프라인을 작성해 보겠습니다. Kubeflow 에서 제공하는 katib-launcher 컴포넌트를 이용하여, 하이퍼 파리미터 튜닝 작업을 실행합니다. 튜닝 작업이 끝난 후, 가장 좋은 점수의 하이퍼 파라미터값을 넘겨 받습니다. 이 값을 이용하여 모델을 다시 학습한 후, 캐글에 제출할 예측 데이터를 생성합니다.

In []:

import os
import string

import kfp
import kfp.dsl as dsl
from kfp import components
from kfp import onprem
from kfp import aws
from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount

@dsl.pipeline(
    name='House Prices Train Pipeline',
    description='House Prices Train Pipeline'
)
def house_prices_train_pipeline(name="house-prices-train", namespace="admin",
            goal=0.95, parallel_trial_count=2, max_trial_count=12, experiment_timeout_minutes=60, delete_after_done=True):

    secret_name = "aws-secret"
    
    competition_name = 'house-prices-advanced-regression-techniques'
    data_version='v0.0.1'
    input_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', data_version)

    objective_config = {
      "type": "maximize",
      "goal": goal,
      "objectiveMetricName": "r2_score"
    }
    algorithm_config = {'algorithmName' : 'random'}
    parameters = [
      {"name": "--learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.2"}},
      {"name": "--n_estimators", "parameterType": "int", "feasibleSpace": {"min": "10", "max": "200"}}
    ]
    metrics_collector = {
        "collector": {
            "kind": "StdOut"
        }
    }

    trial_template_params = {'train_image':train_image,
    'secret_name': secret_name,
    'input_data_path': input_data_path}     

    trial_template = string.Template('''
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: $train_image
                  command:
                  - "python"
                  - "/app/train.py"
                  - "--input_path"
                  - $input_data_path
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                  env:
                  - name: AWS_ACCESS_KEY_ID
                    valueFrom:
                      secretKeyRef:
                        name: $secret_name
                        key: AWS_ACCESS_KEY_ID
                  - name: AWS_SECRET_ACCESS_KEY
                    valueFrom:
                      secretKeyRef:
                        name: $secret_name
                        key: AWS_SECRET_ACCESS_KEY
                restartPolicy: Never
    ''').safe_substitute(trial_template_params)

    katib_experiment_launcher_op = components.load_component_from_url('<https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml>')
    hp_task = katib_experiment_launcher_op(
              experiment_name=name,
              experiment_namespace=namespace,
              parallel_trial_count=parallel_trial_count,
              max_trial_count=max_trial_count,
              objective=str(objective_config),
              algorithm=str(algorithm_config),
              trial_template=str(trial_template),
              parameters=str(parameters),
              metrics_collector=str(metrics_collector),
              experiment_timeout_minutes=experiment_timeout_minutes,
              delete_finished_experiment=delete_after_done)
    
    
    hp_out_task = dsl.ContainerOp(
        name="hp out",
        image="library/bash:4.4.23",
        command=["sh", "-c"],
        arguments=["echo hyperparameter: %s" % hp_task.output],
    )

    model_version = 'v0.0.1'
    model_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'models', model_version)
    
    train_task = dsl.ContainerOp(
        name='train',
        image=train_image,
        command=['python', '/app/train.py'],
        arguments=['--input_path', input_data_path, '--hyperparameters',  hp_task.output, '--model_path', model_path]
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    
    
    submission_path = os.path.join('/data/competitions', competition_name, 'submissions', model_version)

    volume_task = dsl.VolumeOp(
        name="house-prices-volume",
        resource_name="house-prices-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )
    
    predict_task = dsl.ContainerOp(
        name='predict',
        image=train_image,
        command=['python', '/app/train.py'],
        arguments=['--input_path', input_data_path, '--mode',  'predict', '--model_path', model_path,
                  '--submission_path', submission_path],
        pvolumes={"/data": volume_task.volume}
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')).after(train_task)
    
    submit_op = components.load_component('../components/kaggle/competitions_submit/component.yaml')
    submit_task = submit_op(competition=competition_name, path=submission_path, message='RunId {{workflow.uid}}')\\
            .add_pvolumes({"/data": predict_task.pvolume})\\
            .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
            .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle')).after(predict_task)

    
    volume_task.delete().after(submit_task)

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(house_prices_train_pipeline, 'house-prices-train.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='House Prices Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'House Prices Train Pipeline', 'house-prices-train.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

Katib 모니터 화면에서 튜닝 결과도 확인할 수 있습니다.

“hp-out” 단계를 클릭하면, 다음과 같은 하이퍼파리미터 튜닝 결과 값을 확인해 볼 수 있습니다.

hyperparameter: [{name: --learning_rate, value: 0.1985601684459975}, {name: --n_estimators, value: 183}]

파이프라인 실행이 완료되면, 예측 결과가 캐글에 제출 됩니다.

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다