TFX – 시카고 택시

TFX의 시카고 택시의 예제를 이용하여 파이프라인을 실행해 보겠습니다.

TFX(TensorFlow Extended)는 프로덕션 머신러닝 파이프라인을 배포하기 위한 엔드 투 엔드 플랫폼으로서, 텐서플로우를 기반으로 하고 있습니다. 머신 러닝 시스템을 실행 하고 모니터링하는 데 필요한 프레임워크와 공통 라이브러리를 제공하고 있습니다.

TFX의 시카고 택시 예제는 엔드 투 엔드 워크플로우와 데이터 분석, 검증 및 변환, 모델 훈련, 성능 분석 및 서비스에 필요한 단계를 보여주고 있습니다. 그래서 워크플로우와 파이프라인을 구성하는데 좋은 참고가 될 수 있습니다.

이 예제에서는 다음과 같은 TFX 컴포넌트를 사용합니다.

  • ExampleGen 입력 데이터 세트의 수집 및 분할.
  • StatisticsGen 데이터 세트에 대한 통계 계산.
  • SchemaGen examines the statistics and creates a data schema.
  • ExampleValidator 통계를 검사하고 데이터 스키마를 생성함.
  • Transform 데이터 세트를 가지고 피처 엔지니어링을 수행.
  • Trainer TensorFlow Estimators 또는 Keras를 사용하여 모델을 학습.
  • Evaluator 학습 결과에 대한 심층 분석 수행.
  • Pusher 모델을 서비스 인프라에 배포.

다음 그림은 컴포넌트 간의 데이터 흐름을 보여 줍니다.

출처 : https://www.tensorflow.org/tfx

TFX 라이브러리

TFX는 라이브러리와 파이프라인 컴포넌트를 모두 포함하고 있습니다. 다음 그림은 TFX 라이브러리와 파이프라인 컴포넌트 사이의 관계를 설명하고 있습니다.

출처 : https://www.tensorflow.org/tfx

TFX는 파이프라인 컴포넌트를 만드는 데 사용할 수 있는 파이썬 라이브러리를 제공하고 있습니다. 이 라이브러리를 사용하여 파이프라인의 컴포넌트를 생성하면, 보다 쉽게 파이프라인을 구성할 수 있으며, 사용자는 사용자 코드를 작성하는데 보다 더 집중할 수 있습니다.

TFX 라이브러리는 다음과 같습니다.

  • TensorFlow Data Validation (TFDV) : TFDV는 머신러닝 데이터를 분석하고 검증하는 것을 도와주는 라이브러리 입니다. 확장성이 뛰어나고 텐서플로우, TFX와 잘 작동하도록 설계되었습니다. TFDV는 다음을 포함하고 있습니다.
    • 학습 및 테스트 데이터의 요약 통계에 대한 계산.
    • 데이터 분산 및 통계를 위한 뷰어와의 통합 및 데이터 세트 쌍(Facets)의 측면 비교.
    • 필요한 값, 범위 및 어휘와 같은 데이터에 대한 기대치를 설명하는 자동화된 데이터 스키마 생성
    • 스키마를 검사하는 데 도움이 되는 스키마 뷰어.
    • 누락된 피처, 범위를 벗어난 값 또는 잘못된 기능 유형과 같은 이상 징후를 식별하기 위한 이상 징후 탐지.
    • 이상 징후 뷰어를 통해 이상 징후가 있는 피처를 확인.
  • TensorFlow Transform (TFT) : TFT는 TensorFlow로 데이터를 사전 처리하는 라이브러리 입니다. TFT는 다음과 같이 전체 데이터를 변환할 때 유용합니다.
    • 평균 및 표준 편차로 입력 값을 정규화.
    • 모든 입력 값에 대한 어휘를 생성하여 문자열을 정수로 변환.
    • 관측된 데이터 분포를 기반으로 소수 데이터를 정수로 변환.
  • TensorFlow : 텐서플로우는 TFX에서 모델을 학습할 때 사용합니다. 학습 데이터와 모델링 코드를 가지고, SavedModel 이라는 결과를 생성합니다.
  • TensorFlow Model Analysis (TFMA) : TFMA는 텐서플로우 모델을 평가하기 위한 라이브러리 입니다. 텐서플로우로 EvalSavedModel을 생성하는데 사용되며, 이는 분석의 기반이 됩니다. 사용자들은 학습에 정의된 것과 동일한 지표를 사용하여, 분산된 방식으로 대량의 데이터에 대한 모델을 평가할 수 있습니다. 이러한 측정지표는 여러 조각의 데이터를 통해 계산할 수 있으며 주피터 노트북에서 시각화할 수 있습니다.
  • TensorFlow Metadata (TFMD) : TFMD는 텐서플로우로 머신러닝 모델을 학습할 때 유용한 메타데이터에 대한 표준 표현을 제공합니다. 메타데이터는 입력 데이터 분석 중에 생성할 수 있으며, 데이터 검증, 탐색 및 변환을 위해 사용될 수 있습니다. 메타데이터 직렬화 형식은 다음과 같습니다.
    • 표 형식의 데이터를 설명하는 스키마 (예 tf.Examples)
    • 데이터 세트에 대한 통계 요약 모음
  • ML Metadata (MLMD) : MLMD는 머신 러닝 개발자 및 데이터 사이언티스트가 워크플로우와 관련된 메타데이터를 기록하고 검색하기 위한 라이브러리입니다. 대부분의 메타데이터는 TFMD 표현을 사용합니다. MLMD는 SQL-Lite, MySQL 같은 데이터 저장소를 사용하여 데이터를 저장합니다.

데이터 세트

예제에서 사용한 데이터 세트는 시카고 시가 발표한 택시 여행 데이터 세트 입니다. 해당 사이트는 애플리케이션에서 데이터를 사용하기 위해, 원본 데이터를 수정한 데이터를 제공하고 있습니다. 원본 소스는 시카고 시의 공식 웹사이트인 www.cityofchicago.org 에서 확인할 수 있습니다.

Kubeflow에서 TFX 파이프라인 실행하기

TFX는 다양한 환경에서 실행할 수 있습니다. 로컬에서도 실행할 수 있고, Airflow 환경에서 실행할 수 있으며, Kubeflow 에서도 실행할 수 있습니다. 예제에서는 Kubeflow 환경에서 시카고 택시 파이프라인을 실행해 볼 것입니다. 만약 다양환 환경에 대해 관심 있으면, “Chicago Taxi Example” 페이지를 참고하시기 바랍니다.

TFX 컴포넌트 사용하기

설정

필요한 패키지를 추가하고, 사용할 데이터를 다운로드하겠습니다.

먼저 주피터 노트북을 생성하겠습니다.

패키지 설치하기

tfx 패키지를 설치합니다.

In []:

!pip install "tfx>=0.21.1,<0.22"  --user

패키지 추가하기

TFX 컴포넌트와 필요한 패키지를 추가합니다.

In []:

import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

라이브러리 버전을 확인해 보겠습니다.

In []:

print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.1.0
TFX version: 0.21.4

파이프라인 경로 설정 이동 or 삭제

In []:

# This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]

# This is the directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')

# This is the path where your model will be pushed for serving.

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

예제 데이터 다운로드

사용할 예제 데이터세트를 다운로드합니다.

In []:

_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = '<https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv>'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

InteractiveContext 작성

노트북에서 대화식으로 TFX 컴포넌트를 실행할 수 있는 InteractiveContext를 생성합니다.

In []:

context = InteractiveContext()

TFX 컴포넌트 실행

노트북 셀에서는 TFX 컴포넌트를 하나씩 작성하고 실행해보겠습니다.

ExampleGen

ExampleGen 컴포넌트는 대부분 TFX 파이프라인의 앞에 위치해 있습니다. 해당 컴포넌트는 다음과 같은 역할을 합니다.

  • 데이터를 학습 및 평가 세트로 분할(기본적으론 학습 세트에 2/3, 평가 세트에 1/3가 할당됩니다.)
  • 데이터를 tf.Example 형식으로 변환
  • 데이터를 다른 컴포넌트에서 접근할 수 있도록 _tfx_root 디렉토리로 복사

ExampleGen 은 데이터 원본의 경로를 입력값으로 사용합니다. 다음 예제의 경우 CSV 파일을 다운르도 받은 _data_root 경로를 입력값으로 사용합니다.

In []:

example_gen = CsvExampleGen(input=external_input(_data_root))
context.run(example_gen)

출력 아티팩트를 살펴보겠습니다. 이 컴포넌트는 학습(train)과 평가(eval) 나누어진 아티팩트를 출력합니다.

In []:

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)
["train", "eval"] /tmp/tfx-interactive-2020-05-07T09_40_15.820980-gawc9si3/CsvExampleGen/examples/1
This cell will be skipped during export to pipeline.

그리고 다음과 같이 학습에 사용될 데이터 세트를 조회해 볼 수 있습니다.

In []:

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)
features {
  feature {
    key: "company"
    value {
      bytes_list {
        value: "Taxi Affiliation Services"
      }
    }
  }
  feature {
    key: "dropoff_census_tract"
    value {
    }
  }
  feature {
    key: "dropoff_community_area"
    value {
    }
  }
  feature {
    key: "dropoff_latitude"
    value {
    }
  }
...

StatisticsGen

StatisticsGen 컴포넌트는 다운스트림 컴포넌트로서, 데이터 분석을 위해 데이터 세트에 대한 통계를 계산합니다. 텐서플로우 데이터 검증 라이브러리(TFDV:TensorFlow Data Validation)를 사용합니다.

StatisticsGen은 앞서 ExampleGen 에서 생성한 데이터 세트를 입력값으로 사용합니다.

In []:

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

StatisticsGen 의 실행이 끝나면, 출력된 통계를 시각화할 수 있습니다.

In []:

context.show(statistics_gen.outputs['statistics'])

‘train’ split:

‘eval’ split:

SchemaGen

SchemaGen 컴포넌트는 데이터 통계를 기반으로 스키마를 생성합니다. 스키마는 데이터 세트에 있는 형상의 예상 한계, 유형 및 속성을 정의합니다. SchemaGen 컴포넌트도 TFDV 라이브러리를 사용합니다.

SchemaGenStatisticsGen으로 생성한 통계를 입력으로 사용합니다.

In []:

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

SchemaGen이 실행이 끝나면, 생성된 스키마를 테이블로 시각화할 수 있습니다.

In []:

context.show(schema_gen.outputs['schema'])

데이터 세트의 각 피쳐는 스키마 테이블의 속성 옆에 행으로 표시합니다. 스키마는 또한 범주형 피처의 해당 영역에 있는 값들을 표시합니다.

ExampleValidator

ExampleValidator 컴포넌트는 스키마에 정의된 기대치를 기반으로, 데이터에서 이상 징후를 탐지합니다. ExampleValidator 컴포넌트도 TFDV 라이브러리를 사용합니다.

ExampleValidatorStatisticsGen의 통계와 SchemaGen의 스키마를 입력값으로 사용합니다.

In []:

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

ExampleValidator 의 실행이 끝나면, 이상 징후를 표로 시각화할 수 있습니다.

In []:

context.show(example_validator.outputs['anomalies'])

이상 징후 표에서, 회사(company) 피처가 학습 데이터 세트에 없는 값들을 가지는 것을 확인할 수 있습니다. 이러한 정보를 이용하여 모델 성능을 디버그하고, 데이터가 시간에 따라 어떻게 발전하는지 이해하며, 데이터 오류를 식별하는 데 사용할 수 있습니다.

Transform

Transform 컴포넌트는 학습과 서빙에 대한 피쳐 엔지니어링을 수행합니다. TensorFlow Transform 라이브러리를 사용합니다.

TransformExampleGen 에서 생성한 데이터와 SchemaGen의 스키마 그리고 사용자가 정의 변환 코드를 입력값으로 사용합니다.

다음은 사용자 정의 변환 코드 입니다. 먼저 피처 엔지니어링을 위하여 몇가지 상수를 정의하겠습니다.

In []:

_taxi_constants_module_file = 'taxi_constants.py'

In []:

%%writefile {_taxi_constants_module_file}

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

그리고 데이터를 입력 받아 변환 작업을 하는 코드를 작성하겠습니다.

In []:

_taxi_transform_module_file = 'taxi_transform.py'

In []:

%%writefile {_taxi_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

이제 변환 작업을 하는 코드를 Transform 컴포넌트에 전달하여, 데이터를 변환하겠습니다.

In []:

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform)

Transform는 두 개의 출력 아티팩트를 생성합니다.

  • transform_graph : transform_graph는 사전 처리 작업을 수행할 수 있는 그래프입니다.
  • transformed_examples : transformed_examples 는 사전 처리된 학습 데이터와 평가 데이터를 나타냅니다.

Trainer

Trainer 컴포넌트는 텐서플로우로 정의한 모델을 학습 시킵니다. TrainerSchemaGen의 스키마와 Transform 에서 변환된 데이터와 그래프, 학습 파라미터 그리고 사용자 정의 모델 코드를 입력값으로 사용합니다.

다음은 사용자 정의 모델 코드입니다.

In []:

_taxi_trainer_module_file = 'taxi_trainer.py'

In []:

%%writefile {_taxi_trainer_module_file}

from typing import List, Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = taxi_constants.MAX_CATEGORICAL_FEATURE_VALUES
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    transformed_features.pop(_transformed_name(_LABEL_KEY))

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  model = _build_keras_model(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

  # This log path might change in the future.
  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

작성한 모델 코드를 Trainer 컴포넌트에 전달하여 모델을 학습시키겠습니다.

In []:

trainer = Trainer(
    module_file=os.path.abspath(_taxi_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

Evaluator

Evaluator 컴포넌트는 평가 데이터 세트를 이용하여 모델 성능 지표를 계산합니다. TFMA(TensorFlow Model Analysis) 라이브러리를 사용합니다. Evaluator 는 새로 학습된 모델이 이전 모델보다 더 나은지 선택적으로 검증할 수 있습니다. 이는 매일 모델을 자동으로 학습하고 검증할 수 있는 생산 파이프라인 설정에서 유용합니다. 현재 노트북에서는 하나의 모델만 학습하므로 Evaluator가 자동으로 모델에 “good”라고 라벨을 붙입니다.

EvaluatorExampleGen 에서 생성한 데이터와 Trainer 가 학습한 모델 그리고 슬라이싱 설정을 입력값으로 사용합니다. 슬라이싱 설정을 통해 피처 값에 대한 메트릭을 슬라이스할 수 있습니다.

In []:

eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

Evaluator 에 설정값을 넘겨서 실행 시키겠습니다.

In []:

# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.

# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)
context.run(evaluator)

Pusher

Pusher 컴포넌트는 일반적으로 TFX 파이프라인의 마지막 단계에 위치해 있습니다. 모델이 유효성 검사를 통과했는지 여부를 확인하고, 합격하면 _serving_model_dir로 모델을 내보냅니다.

In []:

_serving_model_dir = os.path.join(tempfile.mkdtemp(), 'serving_model/taxi_simple')

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

파이프라인 생성하기

앞서 작성한 코드들을 이용하여, TFX 파이프라인을 생성해 보겠습니다. 그리고 KubeflowDagRunner 를 이용하여 TFX 파이프라인을 Kubeflow 파이프라인 패키지로 변환하겠습니다.

모듈 파일 생성하기

TFX 파이프라인 컴포넌트에서 사용할 모듈 파일을 생성하겠습니다. 앞서 작성한 코드를 묶어 하나의 파일로 생성하겠습니다. Transform 컴포넌트에서 사용했던 taxi_constants.pytaxi_transform.py 파일 그리고 Trainer 컴포넌트에서 사용했던 taxi_trainer.py 파일을 합쳐서 taxi_utils.py 파일을 생성하겠습니다.

taxi_utils.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import List, Text

import absl
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

_CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 10

_BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10

_VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
_LABEL_KEY = 'tips'
_FARE_KEY = 'fare'

def _transformed_name(key):
  return key + '_xf'

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    # TODO(b/148082271): Remove this line once TFT 0.22 is used.
    transformed_features.pop(_transformed_name(_LABEL_KEY), None)

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  # TODO(b/144500510): SparseFeatures for feature columns + Keras.
  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]),
        _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

# TFX Trainer will call this function.
def trainer_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model(
        # Construct layers sizes with exponetial decay
        hidden_units=[
            max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
            for i in range(num_dnn_layers)
        ])

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

패키지 추가하기

새로운 주피터 노트북을 생성하고, TFX 컴포넌트와 필요한 패키지를 추가합니다.

In []:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from typing import Text

from kfp import onprem
import tensorflow_model_analysis as tfma

from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input

파이프라인 이름 정의하기

사용할 파이프라인 이름을 지정해 줍니다.

In[]:

_pipeline_name = 'chicago_taxi_pipeline_kubeflow_pvc'

파이프라인에서 사용할 PVC 정보를 지정해 줍니다.

In []:

_persistent_volume_claim = 'chicago-taxi-pvc'
_persistent_volume = 'chicago-taxi-pv'
_persistent_volume_mount = '/mnt'

사용할 경로들을 지정해 줍니다. _data_root 퍼시스턴스 볼륨에 저장된 학습 데이터의 경로이고, _pipeline_root 는 파이프라인에서 사용할 경로입니다.

In []:

# All input and output data are kept in the PV.
_input_base = os.path.join(_persistent_volume_mount, 'tfx')
_data_root = os.path.join(_input_base, 'data')

_output_base = os.path.join(_persistent_volume_mount, 'pipelines')
_tfx_root = os.path.join(_output_base, 'tfx')
_pipeline_root = os.path.join(_tfx_root, _pipeline_name)

TFX 컴포넌트에 추가할 사용자 정의 코드 파일의 경로를 지정합니다. Transform 컴포넌트와 Trainer 컴포넌트에서 사용하는 사용자 정의 코드입니다.

In []:

_module_file = os.path.join(_input_base, 'taxi_utils.py')

학습된 모델을 저장할 위치를 정의합니다. Pusher 컴포넌트에서 해당 위치로 모델을 저장합니다.

In []:

_serving_model_dir = os.path.join(_output_base, _pipeline_name, 'serving_model')

TFX 컴포넌트들을 이용하여 TFX 파이프라인 생성 코드를 작성합니다.

In []:

def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
                     module_file: Text, serving_model_dir: Text,
                     direct_num_workers: int) -> pipeline.Pipeline:
  """Implements the chicago taxi pipeline with TFX and Kubeflow Pipelines."""
  examples = external_input(data_root)

  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input=examples)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)

  # Performs anomaly detection based on statistics and data schema.
  example_validator = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)

  # Uses user-provided Python function that implements a model using TF-Learn
  # to train a model on Google Cloud AI Platform.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      train_args=trainer_pb2.TrainArgs(num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(num_steps=5000))
  

  # Uses TFMA to compute a evaluation statistics over features of a model and
  # perform quality validation of a candidate model (compared to a baseline).
  eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

  # Get the latest blessed model for model validation.
  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)

  # Performs infra validation of a candidate model to prevent unservable model
  # from being pushed. In order to use InfraValidator component, persistent
  # volume and its claim that the pipeline is using should be a ReadWriteMany
  # access mode.
  infra_validator = InfraValidator(
      model=trainer.outputs['model'],
      examples=example_gen.outputs['examples'],
      serving_spec=infra_validator_pb2.ServingSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServing(
              tags=['latest']),
          kubernetes=infra_validator_pb2.KubernetesConfig()),
      request_spec=infra_validator_pb2.RequestSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec())
  )

  # Checks whether the model passed the validation steps and pushes the model
  # to  Google Cloud AI Platform if check passed.
  pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      infra_blessing=infra_validator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          model_resolver,
          evaluator,
          infra_validator,
          pusher,
      ],
      beam_pipeline_args=['--direct_num_workers=%d' % direct_num_workers],
  )

TFX 에서 사용할 메타데이터 저장소의 정보를 지정합니다. Kubeflow의 메타데이터를 사용하겠습니다.

In []:

from tfx.orchestration.kubeflow.proto import kubeflow_pb2

metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
metadata_config.grpc_config.grpc_service_host.value = 'metadata-grpc-service'
metadata_config.grpc_config.grpc_service_port.value = '8080'

KubeflowDagRunner 를 사용하여, 작성한 TFX 파이프라인을 Kubeflow 파이프라인으로 생성합니다. 해당 셀이 실행되면, chicago_taxi_pipeline_kubeflow_pvc.tar.gz 라는 Kubeflow 파이프라인 패키지가 생성됩니다.

In []:

if __name__ == '__main__':
  # This pipeline automatically injects the Kubeflow TFX image if the
  # environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
  # cli tool exports the environment variable to pass to the pipelines.
  tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)

  runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
      kubeflow_metadata_config=metadata_config,
      # Specify custom docker image to use.
      tfx_image=tfx_image,
      pipeline_operator_funcs=(
          [
              onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
                               _persistent_volume_mount)
          ]))

  kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          module_file=_module_file,
          serving_model_dir=_serving_model_dir,
          # 0 means auto-detect based on the number of CPUs available during
          # execution time.
          direct_num_workers=0))

파이프라인 실행하기

PVC 생성하기

파이프라인에서 사용할 PVC를 생성합니다.

chicago-taxi-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: chicago-taxi-pvc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 500Mi
kubectl -n kubeflow apply -f chicago-taxi-pvc.yaml

PV에 파일 복사하기

파이프라인을 실행하기에 앞서, 필요한 파일을 퍼시스턴스 볼륨에 복사하도록 하겠습니다. 학습을 위한 데이터 파일과 사용자 정의 코드가 들어 있는 파이썬 파일을 복사합니다.

퍼시스턴스 볼륨에 파일을 업로드하는 방법은 다양하게 존재하기 때문에, 편하신 방법을 사용하시면 됩니다. 예제에서는 퍼시스턴스 볼륨에 파일을 업로드하기 위해서, “PHP File Manager” 를 사용하였습니다. 파일 매니저를 POD로 실행한 다음, 웹 브라우저를 이용해서 파일을 업로드하겠습니다.

먼저 파일 매니저 POD 매니페스트를 작성합니다. chicago-taxi-pvc 를 이용하여 볼륨을 마운트 합니다.

다음은 파일 매니저 POD의 매니페스트 입니다.

filemanager.yaml

apiVersion: v1
kind: Pod
metadata:
  name: filemanager
spec:
  containers:
  - image: smokserwis/filemanager
    imagePullPolicy: IfNotPresent
    name: filemanager
    ports:
    - containerPort: 80
      name: http
      protocol: TCP
    volumeMounts:
    - mountPath: /var/www/mount
      name: chicago-taxi
  volumes:
  - name: chicago-taxi
    persistentVolumeClaim:
      claimName: chicago-taxi-pvc

다음 명령어를 실행하여 파일 매니저 POD를 생성합니다.

kubectl -n kubeflow apply -f filemanager.yaml

파일 매니저에 접근하기 위하여 포트포워딩을 사용합니다.

kubectl -n kubeflow port-forward filemanager 8080:80

웹 브라우저를 이용하여 “http://localhost:8080” 에 접속하면 다음과 같은 로그인 화면을 볼 수 있습니다. 사용자명과 비밀번호의 기본값은 fm_admin / fm_admin 입니다.

https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv 파일을 다운로드 받아서 /tfx/data/data.csv 경로로 파일을 업로드 합니다.

그리고 앞서 생성한 taxi_utils.py 파일을 /tfx/taxi_utils.py 경로로 업로드 합니다.

포트 포워딩을 종료하고, 파일 매니저를 삭제하겠습니다.

kubectl -n kubeflow delete pod filemanager

파이프라인 실행하기

KFP SDK를 이용하여, 생성한 파이프라인 패키지를 실행해 보겠습니다.

In []:

import kfp

run_result = kfp.Client(
    host=None  # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.
).create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow_pvc.tar.gz', arguments={})

다음은 파이프라인의 화면입니다.

캐글 – 집 값 예측

머신 러닝 파이프라인을 만들기 위하여, 캐글의 집 값 데이터를 사용해 보도록 하겠습니다.

집 값 예측은, 미국 아이오와 주의 에임스에 있는 주거용 주택의 정보를 이용하여 주택의 판매가격을 예측하는 Competition입니다.

이 장에서는 집 값 예측 결과를 캐글에 제출하기 위한 파이프라인을 구성해 볼 것입니다. 그리고 파이프라인에서 Katib를 이용하여 모델의 하이퍼 파라미터 튜닝을 할 것입니다. 데이터를 저장하기 위해서, PVC와 S3 두 개 모두 사용하겠습니다.

데이터 분석 보다는 전체적인 파이프라인을 만드는 것에 중점을 두고 설명하겠습니다.

사전 준비

House Prices: Advanced Regression Techniques 접속

예제에서 사용할 데이터가 있는 집 값예측 Competition 의 주소는 다음과 같습니다.

<https://www.kaggle.com/c/house-prices-advanced-regression-techniques>

해당 주소로 직접 접속하셔도 되고, 상단에 있는 검색바를 이용하여 검색하셔도 됩니다.

다음은 검색바를 이용하여, “House Prices: Advanced Regression Techniques” 을 검색한 결과입니다.

“House Prices: Advanced Regression Techniques” Competition 페이지 접속하면 다음과 같은 화면을 볼 수 있습니다.

데이터는 “Data” 탭에서 받을 수 있습니다. “Data” 탭을 클릭하면, 데이터에 대한 자세한 설명과 다운로드 받는 방법을 볼 수 있습니다.

주피터 노트북

주피터에서 새로운 Terminal 을 엽니다.

주피터 노트북에서 타이타닉 데이터 다운로드하기

작업 디렉토리를 생성한 다음, 타이타닉 데이터를 다운로드 합니다. house-prices-advanced-regression-techniques.zip 라는 파일이 다운로드 됩니다.

mkdir -p ~/workspace/house-prices/kaggle
cd ~/workspace/house-prices/kaggle
kaggle competitions download -c house-prices-advanced-regression-techniques
Downloading house-prices-advanced-regression-techniques.zip to /home/jovyan/workspace/house-prices/kaggle
  0%|                                                                                          | 0.00/34.1k [00:00<?, ?B/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 34.1k/34.1k [00:00<00:00, 842kB/s]

다운로드한 파일의 압축을 풀겠습니다. 모델 학습을 위한 train.csv 파일과, 예측에 사용할 데이터인 test.csv 그리고, 캐글에 제출할 파일의 형식을 보여주는 sample_submission.csv 파일이 생성됩니다.

unzip house-prices-advanced-regression-techniques.zip
Archive:  house-prices-advanced-regression-techniques.zip
  inflating: data_description.txt
  inflating: sample_submission.csv
  inflating: test.csv
  inflating: train.csv

파이썬 패키지 설치

머신 러닝 모델 코드를 작성하기 위한 패키지들을 설치합니다. 이 장에서는 xgboost 패키지와 pandas_profiling 패키지를 추가로 사용합니다. 만약 설치되어 있지 않다면, 다음 명령어를 실행하여 패키지를 설치합니다.

pip install xgboost pandas_profiling  --user

데이터 전처리와 모델 작성

문제 정의하기

주어진 데이터를 바탕으로, 집 값을 예측하는 문제입니다.

데이터 전처리

머신 러닝 모델에서 데이터를 사용할 수 있도록, 데이터를 전처리 해 보겠습니다. 먼저 캐글에서 제공한 데이터를 살펴보도록 하겠습니다. 이번에는 pandas_profiling 라는 패키지를 사용해 보겠습니다.

집 값 데이터 분석 및 검증

먼저 집 값 데이터에 대해 이해하기 위하여, 간단히 탐색해보도록 하겠습니다.

주피터 노트북을 생성합니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

주피터 노트북을 이용하여 데이터를 분석 및 검증해 보겠습니다.

판다스를 이용하여, 다운받은 캐글 데이터를 읽어 오겠습니다.

In[] :

import os
import pandas as pd

input_path='./kaggle'
train_data = pd.read_csv(os.path.join(input_path, 'train.csv'))
test_data = pd.read_csv(os.path.join(input_path, 'test.csv'))

pandas_profiling 을 이용하여, 데이터의 프로파일링 리포트를 출력하겠습니다.

In[] :

import pandas_profiling
train_data.profile_report()

프로파일링 리포트의 결과는 다음과 같습니다. 단 한 줄의 명령어로 데이터를 분석해 볼 수 있습니다.

리포트 결과를 보면 알 수 있듯이, 여러 타입의 데이터가 존재하고, 결측값도 존재하는 것을 확인할 수 있습니다.

데이터 처리

캐글에서 좋은 점수를 받는게 목적이 아니기 때문에, 과감히 ojbect 타입의 데이터를 삭제하겠습니다.

In []:

train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object'])
train_X = train_data.drop(['SalePrice'], axis=1)
train_y = train_data['SalePrice']
test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object'])

결측값들도 SimpleImputer 를 이용하여 간단히 처리하겠습니다. strategy='median' 을 지정하였기 때문에 결측값들은 중간값으로 대치됩니다.

In []:

imputer = SimpleImputer(strategy='median')
imputer.fit(train_X)

train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index)
train_data = pd.concat([train_X, train_y], axis=1)
test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index)

train_data = train_data.astype({'Id': int})
test_data = test_data.astype({'Id': int})

모델 학습 (Train)

준비한 데이터를 이용하여 모델을 학습해 보겠습니다.

먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearntrain_test_split() 를 사용하면, 간단히 나눌 수 있습니다.

In []:

from sklearn.model_selection import train_test_split

X = train_data.drop(['Id', 'SalePrice'], axis=1)
y = train_data['SalePrice']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

xgboostXGBRegressor을 사용하여, 모델을 학습 시켜 보겠습니다.

In []:

from xgboost import XGBRegressor

model = XGBRegressor()
model.fit(X_train, y_train, eval_set=[(X_test, y_test)])

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

[0]	validation_0-rmse:180608
[1]	validation_0-rmse:164048
...
[98]	validation_0-rmse:25150.7
[99]	validation_0-rmse:25156.1

이제 캐글에서 제공한 test 데이터를 가지고 예측해 보도록 하겠습니다.

pred = model.predict(test_data.drop(['Id'], axis=1))

예측한 결과물을 가지고, 캐글에 제공할 submission.csv 파일을 생성해 보겠습니다.

submission = pd.DataFrame({'Id': test_data['Id'], 'SalePrice': pred})
submission.to_csv('submission.csv', index=False)

submission 을 조회해 보겠습니다.

submission.head()

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

생성한 submission.csv 파일을 캐글에 제출하겠습니다. kaggle 명령어를 사용하여 제출합니다. kaggle 명령어가 PATH 에 포함되어 있지 않기 때문에 전체 경로를 적어 주었습니다.

!/home/jovyan/.local/bin/kaggle competitions submit -c house-prices-advanced-regression-techniques -f submission.csv -m "Message"

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

파이프라인 만들기

앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.

파이프라인의 단계는 다음과 같습니다.

  • 데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
  • 데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
  • 데이터 변환 : train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다.
  • 모델 학습 : 변환된 train.csv 데이터를 이용하여 모델을 학습니다.
  • 예측 : 변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
  • 캐글 제출 : 생성된 submission.csv 파일을 캐글에 제출합니다.

기본 이미지 만들기

데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.

먼저 베이스 이미지 디렉토리를 만듭니다.

mkdir -p ~/workspace/base/xgboost
cd ~/workspace/base/xgboost

필요한 파이썬 패키지 목록을 requirements.txt 파일로 작성합니다.

requirements.txt

scikit-learn
joblib
numpy
pandas
fire
xgboost

컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt 파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN apt-get update -y && \\
    apt-get install -y libgomp1

WORKDIR /app
COPY requirements.txt /app

RUN pip --no-cache-dir install -r requirements.txt

컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.

build_image.sh

#!/bin/bash -e

image_name=kangwoo/xgboost
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim

cd "$(dirname "$0")"

docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"

build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.

chmod +x build_image.sh
./build_image.sh

데이터 전처리 파이프라인 작성하기

데이터 전처리 파이프라인에서는 세개의 컴포넌트를 사용합니다.

  • Download : 캐글에서 데이터를 다운로드 받습니다.
  • Unzip : 압축을 풉니다.
  • Transofrm : 데이터를 전처리한 후 S3에 저장합니다.

모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Download 컴포넌트와 Unzip 컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. AWS 접속하여 사용할 S3 버킷을 생성합니다.

파이프라인의 전체적인 흐름은 다음과 같습니다.

파이프라인의 데이터 변환 작업이 kubeflow 네임스페이스 실행되므로, 해당 작업이 S3에 접근할 수 있도록, kubeflow 네임스페이스에 aws-secret 을 생성해줘야합니다.

export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY>

kubectl -n kubeflow create secret generic aws-secret \\
    --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\
    --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

데이터 전처리 파이프라인 컴포넌트 작성하기

주피터 노트북을 생성합니다. 파일 이름은 transform_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

패키지 추가

파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.

In []:

from kfp import components
from kfp import dsl

캐글 데이터 다운로드 컴포넌트

캐글에서 데이터를 다운로드 하는 컴포넌트를 정의합니다. 앞서 “타이타닉 생존 예측”에서 만들 컴포넌트를 재사용 합니다.

In []:

download_op = components.load_component_from_file('../components/kaggle/competitions_downloader/component.yaml')

데이터 압축풀기 컴포넌트

unzip 명령어를 사용하여 압축을 풀겠습니다.

In []:

def unzip_op(filename, exdir):
    return dsl.ContainerOp(name='Unzip',
                           image='garthk/unzip:latest',
                           command=['unzip', '-o', filename, '-d', exdir])

데이터 변환 컴포넌트

이번에는 [transform.py](<http://transform.py>) 라는 파이썬 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.

train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path 파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path 파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.

주피터 노트북 셀에서 %%writefile transform.py 을 이용하여transform.py 파일을 생성합니다.

In []:

%%writefile transform.py
import argparse
import os
from tempfile import TemporaryDirectory

import pandas as pd
from sklearn.impute import SimpleImputer

def transform(input_path, output_path):
    train_data = pd.read_csv(os.path.join(input_path, 'train.csv'))
    test_data = pd.read_csv(os.path.join(input_path, 'test.csv'))

    train_data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object'])
    train_X = train_data.drop(['SalePrice'], axis=1)
    train_y = train_data['SalePrice']
    test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object'])

    imputer = SimpleImputer(strategy='median')
    imputer.fit(train_X)

    train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index)
    train_data = pd.concat([train_X, train_y], axis=1)
    test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index)
    
    train_data = train_data.astype({'Id': int})
    test_data = test_data.astype({'Id': int})

    
    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    from minio import Minio
    minio_client = Minio('s3.amazonaws.com',
                 access_key=access_key,
                 secret_key=secret_key)
    
    from urllib.parse import urlparse
    url = urlparse(output_path, allow_fragments=False)
    bucket_name = url.netloc
    object_name = url.path.lstrip('/')
    
    with TemporaryDirectory() as tmpdir:
        tmp_train_data = os.path.join(tmpdir, 'train.csv')
        tmp_test_data = os.path.join(tmpdir, 'test.csv')
        train_data.to_csv(tmp_train_data, index=False)
        test_data.to_csv(tmp_test_data, index=False)
        minio_client.fput_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data)
        minio_client.fput_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_path', default='./kaggle', type=str)
    parser.add_argument('--output_path', default='./input', type=str)
    args = parser.parse_args()

    transform(args.input_path, args.output_path)

변환이 완료된 데이터를 S3에 저장하기 위하여, minio 라이브러리를 사용하였습니다. 그리고 S3에 접속하기 위한 자격 증명 정보는 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 라는 환경 변수를 이용하여 넘겨받습니다.

페어링을 이용하여, transform.py 파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.

In []:

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils
from kubeflow.fairing.preprocessors import base
from kubeflow.fairing.builders.append import append

CONTAINER_REGISTRY = 'kangwoo'

preprocessor = base.BasePreProcessor(executable="transform.py")

builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-transform",
                           base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor)
builder.build()

transform_image = builder.image_tag
print(transform_image)

데이터 전처리 파이프라인 생성하기

PVC와 캐글 토큰 확인하기

파이프라인 컴포넌트에서 이 PVC를 이용하여, 데이터를 저장하고 읽어 올 것입니다. PVC는 dsl.VolumeOp() 를 이용하여 생성한 후, 작업이 끝나면 삭제하도록 하겠습니다.

그리고 캐글 API 사용을 위한 토큰도 앞서 “타이타닉 생존 예측”에서 생성한 kaggle-secret 을 사용하겠습니다.

파이프라인 작성하기

주피터 노트북으로 돌아가서 파이프라인을 작성해 보겠습니다.

In []:

import os
import string

import kfp
import kfp.dsl as dsl
from kfp import components
from kfp import onprem
from kfp import aws
from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount

@dsl.pipeline(
    name='House Prices Transofrm Pipeline',
    description='House Prices Transofrm Pipeline'
)
def house_prices_transform_pipeline():
    
    secret_name = "aws-secret"
    
    competition_name = 'house-prices-advanced-regression-techniques'

    kaggle_data_path = os.path.join('/data/competitions', competition_name, 'kaggle')
    version = 'v0.0.1'
    ouput_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', version)
    
    volume_task = dsl.VolumeOp(
        name="house-prices-volume",
        resource_name="house-prices-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    download_task = download_op(competition=competition_name, path=kaggle_data_path)\\
        .add_pvolumes({"/data": volume_task.volume})\\
        .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
        .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle'))

    with dsl.Condition(download_task.outputs['downloaded'] == 'True'):
        unzip_task = unzip_op(os.path.join(kaggle_data_path, competition_name + '.zip'), kaggle_data_path)\\
                         .add_pvolumes({"/data": download_task.pvolume})\\
    
    
    transform_task = dsl.ContainerOp(
        name='Transform',
        image=transform_image,
        command=['python', '/app/transform.py'],
        arguments=['--input_path', kaggle_data_path, '--output_path', ouput_data_path],
        pvolumes={"/data": unzip_task.pvolume}
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))

    volume_task.delete().after(transform_task)
  

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(house_prices_transform_pipeline, 'house-prices-transform.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='House Prices Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'House Prices Transform Pipeline', 'house-prices-transform.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

S3 저장소의 해당 버킷을 조회해 보면, 다음과 같이 2개의 데이터 파일이 생성된 것을 확인할 수 있습니다.


모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기

모델 학습 및 캐글 제출 파이프라인에서는 세개의 컴포넌트를 사용합니다.

  • Katib Launcher: 하이퍼파리미터 튜닝을 위하여, Experiment 를 생성해 줍니다.
  • HP Out: 하이퍼파리미터 튜닝 결과 값을 출력해 줍니다.
  • Train: 하이퍼파리미터 값을 입력 받아 모델을 학습합니다. 학습한 모델은 S3에 저장합니다.
  • Predict : S3에서 모델을 가져와서, 예측을 수행합니다. 예측 결과 값을 퍼시스턴스 볼륨에 저장합니다.
  • Submit : 예측 결가 값을 캐글에 제출합니다.

모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Submit 컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. 그리고 하이퍼파라미터 튜닝을 위한 Experiment 리소스를 파이프라인이 실행되는 kubeflow 네임스가 아닌, 별도의 admin 네임스페이스에 생성하였습니다.

파이프라인의 전체적인 흐름은 다음과 같습니다.

하이퍼 파리미터 튜닝 작업이 admin 네임스페이스 실행되므로, 해당 작업들이 S3에 접근할 수 있도록, admin 네임스페이스에도 aws-secret 을 생성해줘야합니다.

export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY>

kubectl -n admin create secret generic aws-secret \\
    --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\
    --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기

주피터 노트북을 생성합니다. 파일 이름은 train_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

모델 학습과 예측 컴포넌트

train.py 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.

변환된 train.csv 데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path 파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 그리고 하이퍼 파라미터로서 n_estimatorslearning_rate 를 입력 받아, 모델 학습시 사용합니다. 그리고 mode 를 입력 받아, train 일 경우에는 모델 학습을, predict 일 경우에는 예측을 실행합니다. S3에서 데이터를 읽고, 저장하기 위하여 minio 라이브러리를 사용하였습니다.

주피터 노트북 셀에서 %%writefile train.py 을 이용하여 train.py 파일을 생성합니다.

In []:

%%writefile train.py
import argparse
import json
import os
from datetime import datetime, timezone
from tempfile import TemporaryDirectory

import pandas as pd
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor

def read_train_input(input_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(input_path)
    with TemporaryDirectory() as tmpdir:
        tmp_train_data_file = os.path.join(tmpdir, 'train.csv')
        minio_client.fget_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data_file)
        data = pd.read_csv(tmp_train_data_file)
        
    X = data.drop(['Id', 'SalePrice'], axis=1)
    y = data['SalePrice']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    return X_train, y_train, X_test, y_test

def read_test_input(input_path='./input'):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(input_path)
    with TemporaryDirectory() as tmpdir:
        tmp_test_data_file = os.path.join(tmpdir, 'test.csv')
        minio_client.fget_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data_file)
        data = pd.read_csv(tmp_test_data_file)
        
    X = data.drop(['Id'], axis=1)
    id = data['Id']

    return X, id

def train_model(X_train, y_train,
                X_test, y_test,
                n_estimators, learning_rate, early_stopping_rounds):
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(X_train, y_train,
              early_stopping_rounds=early_stopping_rounds, eval_set=[(X_test, y_test)])

    print('Best RMSE on eval: {} with {} rounds'.format(model.best_score, model.best_iteration + 1))
    return model

def eval_model(model, X_test, y_test):
    predictions = model.predict(X_test)
    local_time = datetime.now(timezone.utc).astimezone().isoformat()
    score = r2_score(predictions, y_test)
    print('{} r2_score={}'.format(local_time, score))
    return score

MODE_FILENAME = 'model.bst'

def load_model(model_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(model_path)
    model = XGBRegressor()
    with TemporaryDirectory() as tmpdir:
        tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
        minio_client.fget_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
        model.load_model(tmp_model_file)
        print('Load modelfrom ', os.path.join(model_path, MODE_FILENAME))
    return model

def save_model(model, model_path):
    minio_client = get_minio_client()
    bucket_name, object_name = get_bucket_name_and_object_name(model_path)
    with TemporaryDirectory() as tmpdir:
        tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
        model.save_model(tmp_model_file)
        minio_client.fput_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
        print('Save model to', os.path.join(model_path, MODE_FILENAME))

def get_minio_client():
    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']

    from minio import Minio
    return Minio('s3.amazonaws.com',access_key=access_key, secret_key=secret_key)

def get_bucket_name_and_object_name(path):
    from urllib.parse import urlparse
    url = urlparse(path, allow_fragments=False)
    bucket_name = url.netloc
    object_name = url.path.lstrip('/')
    return bucket_name, object_name

class ModelServe(object):
    def __init__(self, model_path=None, n_estimators=100, learning_rate=0.1, early_stopping_rounds=40):
        self.model_path = model_path
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.early_stopping_rounds = early_stopping_rounds
        print("model_path={}".format(self.model_path))
        print("n_estimators={}".format(self.n_estimators))
        print("learning_rate={}".format(self.learning_rate))
        print("early_stopping_rounds={}".format(self.early_stopping_rounds))

        self.model = None
        
    #         self._workspace = None
    #         self.exec = self.create_execution()

    def train(self, X_train, y_train, X_test, y_test):

        #         self.exec.log_input(metadata.DataSet(
        #             description="xgboost synthetic data",
        #             name="synthetic data",
        #             owner="someone@kubeflow.org",
        #             uri="file://path/to/dataset",
        #             version="v0.0.1"))

        model = train_model(X_train,
                            y_train,
                            X_test,
                            y_test,
                            self.n_estimators,
                            self.learning_rate,
                            self.early_stopping_rounds)

        score = eval_model(model, X_test, y_test)

        #         self.exec.log_output(metadata.Metrics(
        #             name="traing- valuation",
        #             owner="someone@kubeflow.org",
        #             description="training evaluation for xgboost synthetic",
        #             uri="file://path/to/metrics",
        #             metrics_type=metadata.Metrics.VALIDATION,
        #             values={"mean_absolute_error": mae}))

        if self.model_path:
            save_model(model, self.model_path)

    #         self.exec.log_output(metadata.Model(
    #             name="model",
    #             description="prediction model using synthetic data",
    #             owner="someone@kubeflow.org",
    #             uri='file://path/to/model',
    #             model_type="linear_regression",
    #             training_framework={
    #                 "name": "xgboost",
    #                 "version": "0.90"
    #             },
    #             hyperparameters={
    #                 "learning_rate": self.learning_rate,
    #                 "n_estimators": self.n_estimators
    #             },
    #             version=datetime.utcnow().isoformat("T")))

    def predict(self, X, feature_names):
        if not self.model:
            self.model = load_model(self.model_path)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return prediction

#     @property
#     def workspace(self):
#         if not self._workspace:
#             self._workspace = create_workspace()
#         return self._workspace

#     def create_execution(self):
#         r = metadata.Run(
#             workspace=self.workspace,
#             name="xgboost-synthetic-run" + datetime.utcnow().isoformat("T"),
#             description="a notebook run")

#         return metadata.Execution(
#             name = "execution" + datetime.utcnow().isoformat("T"),
#             workspace=self.workspace,
#             run=r,
#             description="execution for training xgboost-synthetic")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_path', default='./input', type=str)
    parser.add_argument('--hyperparameters', required=False, type=str)
    parser.add_argument('--n_estimators', default='100', type=int)
    parser.add_argument('--learning_rate', default='0.1', type=float)
    parser.add_argument('--model_path', required=False, type=str)
    parser.add_argument('--mode', default='train', choices=['train', 'predict'])
    parser.add_argument('--submission_path', default='./', type=str)
    args = parser.parse_args()

    if args.hyperparameters:
        hp_json = json.loads(args.hyperparameters)
        print("use json hyperparameters", hp_json)
        hyperparameters = {}
        for n in hp_json:
            hyperparameters[n['name']] = n['value']

        n_estimators = int(hyperparameters['--n_estimators'])
        learning_rate = float(hyperparameters['--learning_rate'])
    else:
        n_estimators = args.n_estimators
        learning_rate = args.learning_rate

    if args.mode == 'predict':
        test, id = read_test_input(args.input_path)
        model = ModelServe(model_path=args.model_path)
        pred = model.predict(test, None)
        submission = pd.concat([id, pd.Series(pred, name='SalePrice')], axis=1)
        print(submission)
        if not os.path.exists(args.submission_path):
            os.makedirs(args.submission_path)
        submission.to_csv(os.path.join(args.submission_path, 'submission.csv'), index=False)
    else:
        X_train, y_train, X_test, y_test = read_train_input(args.input_path)

        model = ModelServe(model_path=args.model_path, n_estimators=n_estimators, learning_rate=learning_rate)
        model.train(X_train, y_train, X_test, y_test)

페어링을 이용하여, [train.py](<http://train.py>) 파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.

In []:

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils
from kubeflow.fairing.preprocessors import base
from kubeflow.fairing.builders.append import append

CONTAINER_REGISTRY = 'kangwoo'

preprocessor = base.BasePreProcessor(executable="train.py")

builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-train",
                           base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor)
builder.build()

train_image = builder.image_tag
print(train_image)

파이프라인 생성하기

파이프라인 작성하기

파이프라인을 작성해 보겠습니다. Kubeflow 에서 제공하는 katib-launcher 컴포넌트를 이용하여, 하이퍼 파리미터 튜닝 작업을 실행합니다. 튜닝 작업이 끝난 후, 가장 좋은 점수의 하이퍼 파라미터값을 넘겨 받습니다. 이 값을 이용하여 모델을 다시 학습한 후, 캐글에 제출할 예측 데이터를 생성합니다.

In []:

import os
import string

import kfp
import kfp.dsl as dsl
from kfp import components
from kfp import onprem
from kfp import aws
from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount

@dsl.pipeline(
    name='House Prices Train Pipeline',
    description='House Prices Train Pipeline'
)
def house_prices_train_pipeline(name="house-prices-train", namespace="admin",
            goal=0.95, parallel_trial_count=2, max_trial_count=12, experiment_timeout_minutes=60, delete_after_done=True):

    secret_name = "aws-secret"
    
    competition_name = 'house-prices-advanced-regression-techniques'
    data_version='v0.0.1'
    input_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', data_version)

    objective_config = {
      "type": "maximize",
      "goal": goal,
      "objectiveMetricName": "r2_score"
    }
    algorithm_config = {'algorithmName' : 'random'}
    parameters = [
      {"name": "--learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.2"}},
      {"name": "--n_estimators", "parameterType": "int", "feasibleSpace": {"min": "10", "max": "200"}}
    ]
    metrics_collector = {
        "collector": {
            "kind": "StdOut"
        }
    }

    trial_template_params = {'train_image':train_image,
    'secret_name': secret_name,
    'input_data_path': input_data_path}     

    trial_template = string.Template('''
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: $train_image
                  command:
                  - "python"
                  - "/app/train.py"
                  - "--input_path"
                  - $input_data_path
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                  env:
                  - name: AWS_ACCESS_KEY_ID
                    valueFrom:
                      secretKeyRef:
                        name: $secret_name
                        key: AWS_ACCESS_KEY_ID
                  - name: AWS_SECRET_ACCESS_KEY
                    valueFrom:
                      secretKeyRef:
                        name: $secret_name
                        key: AWS_SECRET_ACCESS_KEY
                restartPolicy: Never
    ''').safe_substitute(trial_template_params)

    katib_experiment_launcher_op = components.load_component_from_url('<https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml>')
    hp_task = katib_experiment_launcher_op(
              experiment_name=name,
              experiment_namespace=namespace,
              parallel_trial_count=parallel_trial_count,
              max_trial_count=max_trial_count,
              objective=str(objective_config),
              algorithm=str(algorithm_config),
              trial_template=str(trial_template),
              parameters=str(parameters),
              metrics_collector=str(metrics_collector),
              experiment_timeout_minutes=experiment_timeout_minutes,
              delete_finished_experiment=delete_after_done)
    
    
    hp_out_task = dsl.ContainerOp(
        name="hp out",
        image="library/bash:4.4.23",
        command=["sh", "-c"],
        arguments=["echo hyperparameter: %s" % hp_task.output],
    )

    model_version = 'v0.0.1'
    model_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'models', model_version)
    
    train_task = dsl.ContainerOp(
        name='train',
        image=train_image,
        command=['python', '/app/train.py'],
        arguments=['--input_path', input_data_path, '--hyperparameters',  hp_task.output, '--model_path', model_path]
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    
    
    submission_path = os.path.join('/data/competitions', competition_name, 'submissions', model_version)

    volume_task = dsl.VolumeOp(
        name="house-prices-volume",
        resource_name="house-prices-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )
    
    predict_task = dsl.ContainerOp(
        name='predict',
        image=train_image,
        command=['python', '/app/train.py'],
        arguments=['--input_path', input_data_path, '--mode',  'predict', '--model_path', model_path,
                  '--submission_path', submission_path],
        pvolumes={"/data": volume_task.volume}
    ).apply(aws.use_aws_secret(secret_name,
            aws_access_key_id_name='AWS_ACCESS_KEY_ID',
            aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')).after(train_task)
    
    submit_op = components.load_component('../components/kaggle/competitions_submit/component.yaml')
    submit_task = submit_op(competition=competition_name, path=submission_path, message='RunId {{workflow.uid}}')\\
            .add_pvolumes({"/data": predict_task.pvolume})\\
            .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
            .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle')).after(predict_task)

    
    volume_task.delete().after(submit_task)

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(house_prices_train_pipeline, 'house-prices-train.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='House Prices Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'House Prices Train Pipeline', 'house-prices-train.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

Katib 모니터 화면에서 튜닝 결과도 확인할 수 있습니다.

“hp-out” 단계를 클릭하면, 다음과 같은 하이퍼파리미터 튜닝 결과 값을 확인해 볼 수 있습니다.

hyperparameter: [{name: --learning_rate, value: 0.1985601684459975}, {name: --n_estimators, value: 183}]

파이프라인 실행이 완료되면, 예측 결과가 캐글에 제출 됩니다.

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

캐글 – 타이타닉 생존 예측 파이프라인 만들기

머신 러닝 파이프라인을 만들기 위하여, 타이타닉 데이터를 사용해 보도록 하겠습니다.

타이타닉 데이터는 캐글(Kaggle)에서 입문자용으로 가장 많이 사용하는 예제입니다. 캐글은 데이터 사이언스나 머신 러닝을 공부하는 사람들이 많이 사용하는 데이터 분석 경연 플랫폼입니다.

타이타닉 데이터는 다양한 사람들이 다양한 관점에서 다양한 방법으로 데이터를 분석하고 있기 때문에, 이 데이터를 가지고 공부를 하다보면, 데이터 분석의 전반적인 과정을 이해하는데 많은 도움이 될 것입니다.

이 장에서는 타이타낵 생존 예측 결과를 캐글에 제출하기 위한 파이프라인을 구성해 볼 것입니다. 그리고 재사용 컴포넌트를 만들고, 파이프라인에서 재사용 컴포넌트를 사용할 것입니다. 데이터 분석 보다는 전체적인 파이프라인을 만드는 것에 중점을 두고 설명하겠습니다.

사전 준비

캐글 접속하기

캐글 사이트의 주소는 https://www.kaggle.com/ 입니다. 원활한 사용을 위하여, 회원 가입이 필요합니다. 회원 가입을 하시고, 로그인하시기 바랍니다.

Titanic: Machine Learning from Disaster 접속

예제에서 사용할 데이터가 있는 타이나닉 예측 Competition 의 주소는 다음과 같습니다.

[<https://www.kaggle.com/c/titanic>](<https://www.kaggle.com/c/titanic>)

해당 주소로 직접 접속하셔도 되고, 상단에 있는 검색바를 이용하여 검색하셔도 됩니다.

다음은 검색바를 이용하여, “Titanic: Machine Learning from Disaster” 을 검색한 결과입니다.

“Titanic: Machine Learning from Disaster” Competition 페이지 접속하면 다음과 같은 화면을 볼 수 있습니다.

데이터는 “Data” 탭에서 받을 수 있습니다. “Data” 탭을 클릭하면, 데이터에 대한 자세한 설명과 다운로드 받는 방법을 볼 수 있습니다.

주피터 노트북

주피터에서 새로운 Terminal 을 엽니다.

kaggle 을 이용하여 데이터를 받기 위해서, kaggle 패키지를 설치합니다.

pip install kaggle --user

정상적으로 설치되면, 다음과 같은 응답 결과를 확인할 수 있습니다.

Collecting kaggle
  Downloading <https://files.pythonhosted.org/packages/62/ab/bb20f9b9e24f9a6250f95a432f8d9a7d745f8d24039d7a5a6eaadb7783ba/kaggle-1.5.6.tar.gz> (58kB)
....
Successfully built kaggle python-slugify
Installing collected packages: urllib3, tqdm, text-unidecode, python-slugify, kaggle
  WARNING: The script tqdm is installed in '/home/jovyan/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script slugify is installed in '/home/jovyan/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script kaggle is installed in '/home/jovyan/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed kaggle-1.5.6 python-slugify-4.0.0 text-unidecode-1.3 tqdm-4.45.0 urllib3-1.24.3

kaggle 명령어를 실행하기 위해서 PATH 에 추가해 줍니다. kaggleuser 디렉토리에 설치하였기 때문에, PATH 에 안잡혀 있을 수 있습니다. 사용하는 주피터 노트북 이미지에 따라서 다르기 때문에, 이미 PATH 에 추가되어 있다면, 다음 단계로 넘어가셔도 됩니다.

export PATH=${PATH}:/home/jovyan/.local/bin

캐글 API 토큰 생성하기

주피터 터미널에서 kaggle 명령어를 이용하여, 데이터를 받으려면, API 토큰이 필요합니다. 캐글 사이트의 우측 상단에 있는 “My Profile”을 클릭합니다.

My Account

“My Profile” 페이지 접속하면, 토큰을 생성할 수 있습니다.

“Create New API Token”을 클릭하여 토큰을 생성합니다. 토큰이 생성되면 “kaggle.json” 파일로 자동 다운로드 됩니다.

주피터 노트북에 캐글 API 토큰 추가하기

해당 파일을 주피터 노트북의 ~/.kaggle/kaggle.json 로 복사해 줍니다.

mkdir ~/.kaggle
cd ~/.kaggle
cat << EOF > kaggle.json
{"username":"USERNAME","key":"12345678901234567890"}
EOF

주피터 노트북에서 타이타닉 데이터 다운로드하기

작업 디렉토리를 생성한 다음, 타이타닉 데이터를 다운로드 합니다. [titanic.zip](<http://titanic.zip>) 라는 파일이 다운로드 됩니다.

mkdir -p ~/workspace/titanic/kaggle
cd ~/workspace/titanic/kaggle
kaggle competitions download -c titanic
Downloading titanic.zip to /home/jovyan/workspace/titanic/kaggle
  0%|                                                                                          | 0.00/34.1k [00:00<?, ?B/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 34.1k/34.1k [00:00<00:00, 842kB/s]

다운로드한 파일의 압축을 풀겠습니다. 모델 학습을 위한 train.csv 파일과, 예측에 사용할 데이터인 test.csv 그리고, 캐글에 제출할 파일의 형식을 보여주는 gender_submission.csv 파일이 생성됩니다.

unzip titanic.zip
Archive:  titanic.zip
  inflating: gender_submission.csv
  inflating: test.csv
  inflating: train.csv

파이썬 패키지 설치

머신 러닝 모델 코드를 작성하기 위한 패키지들을 설치합니다.

pip install pandas seaborn sklearn --user

Kubeflow 파이프라인을 작성하기 위한 패키지를 설치합니다.

pip install kfp --user

데이터 전처리와 모델 작성

문제 정의하기

주어진 데이터를 바탕으로, 타이타닉호에 탑승했던 승객이 타이타닉 침몰에서 살아 남았는지를 예측하는 문제입니다.

데이터 수집(Data Ingestion)

Data Ingestion은 사용하거나 저장하기 위해서 데이터를 입수하고 가져오는 과정입니다. 간단히 얘기해서 머신 러닝에 사용할 데이터를 가져온다고 할 수 있습니다. 예제에서는 캐글에서 제공하는 데이터를 다운로드 받아 사용합니다.

데이터 분석 및 검증 (Data Analysis and Validation)

데이터 분석이란 데이터의 분포를 이해한다는 것을 의미합니다. 데이터에 대한 통계 정보들, 예를 들어, 각 컬럼들이 어떤한 값들을 얼마 만큼 가지고 있는지, 어떤 컬럼이 포함하거나 포함하지 않는 값들 얼마만큼 가지고 있는지 같은 정보를 파악하는것을 의미합니다. 이런 분석 작업을 통하여 데이터를 검증하게 됩니다.

데이터 검증은 데이터의 품질을 높이기 위하여, 데이터의 오류를 파악하여 수정하는 것을 의미합니다. 데이터 분석에서 얻은 정보들을 기반으로 데이터 검증이 이루어 집니다.

데이터 분석과 검증을 통하여 유효하지 않거나, 유실된 데이터를 처리해야만 데이터의 품질이 좋아질 수 있습니다.

데이터의 품질을 높이기 위해서는 다음과 같은 여러 가지 기능을 사용합니다.

  • 기본 사항(선택, 필터, 중복 제거 등)
  • 샘플링(균형, 계층화 등)
  • 데이터 파티셔닝(학습 세트 + 검증 세트 + 테스트 세트)
  • 변환(일반화, 표준화, 스케일링, 피벗, 등)
  • Binning (결측값 처리 등)

타이타닉 데이터 분석 및 검증

먼저 타이타닉 데이터에 대해 이해하기 위하여, 캐글에서 제공하고 있는 Data DictionaryVariable Notes를 살펴 보겠습니다. Data 탭의 Data Description 부분을 펼치면 다음과 같은 내용을 확인할 수 있습니다.

Data Dictionary

변수 노트

각 변수에 대해서 좀 더 자세한 내용이 적혀 있습니다.

  • pclass :사회 경제적 지위 (SES)
    • 1st = Upper
    • 2nd = Middle
    • 3rd = Lower
  • age : 나이가 1보다 작은 경우는 분수입니다. 추정된 나이일 경우에는 xx.5 형식입니다.
  • sibsp : 데이터 세트는 다음과 같은 방식으로 가족 관계를 정의합니다.
    • 형제 자매 = 형제, 자매, 의붓 형제, 이복 누이
    • 배우자 = 남편, 아내 (정부와 약혼자는 무시 합니다.)
  • parch : 데이터 세트는 다음과 같은 방식으로 가족 관계를 정의합니다.
    • 부모 = 어머니, 아버지
    • 아이 = 딸, 아들, 의붓 딸, 의붓 아들
    • 보모와 같이 여행한 어린이는 parch=0 으로 처리합니다.

이제 각 변수들에 대한 의미를 알게되었으니, 주어진 데이터에 대해 간략하게 살펴보겠습니다.

캐글에서 제공하는 타이타닉 데이터 세트는 분석 모델을 만드는데에는 아직 적합하지 않습니다. 결측 값들이 존재하고, 학습에 사용하기 어려운 문자열 값들이 존재하고 있습니다. 이러한 값들을 잘 처리하여야만, 머신 러닝 알고리즘을 적용할 때 최상의 결과를 얻을 수 있습니다.

주피터 노트북을 생성합니다. 노트북의 위치는 ~/workspace/titanic 입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/titanic 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

주피터 노트북을 이용하여 데이터를 분석 및 검증해 보겠습니다.

판다스를 이용해서 데이터를 읽어 오겠습니다.

In []:

import pandas as pd
train = pd.read_csv('~/workspace/titanic/kaggle/train.csv')
test = pd.read_csv('~/workspace/titanic/kaggle/test.csv')

head() 메소드를 이용하여 학습용 데이터를 조회해 봅니다.

In []:

train.head()

Cabin 에 Nan(Not a Number) 가 존재하는 것을 알 수가 있습니다.

테스트 데이터도 조회해 봅니다.

In[]:

test.head()

테스트 데이터에는 Survived 컬럼이 없다는 것을 알 수 있습니다. 파일명이 test 이지만 테스트에는 사용할 수 없습니다. 이 데이터는 캐글에 예측 결과값을 제출할때 사용하는 입력 데이터 입니다.

그리고 info()메소드를 이용하에 데이터의 정보를 조회해 봅니다.

In[]:

train.info()

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  891 non-null    int64  
 1   Survived     891 non-null    int64  
 2   Pclass       891 non-null    int64  
 3   Name         891 non-null    object 
 4   Sex          891 non-null    object 
 5   Age          714 non-null    float64
 6   SibSp        891 non-null    int64  
 7   Parch        891 non-null    int64  
 8   Ticket       891 non-null    object 
 9   Fare         891 non-null    float64
 10  Cabin        204 non-null    object 
 11  Embarked     889 non-null    object 
dtypes: float64(2), int64(5), object(5)
memory usage: 83.7+ KB

Non-Null 의 값들이 다른 것을 알 수 있습니다. 즉 Null 인 값들이 존재한다는 것입니다.

Null 값들의 개수를 조회해 보겠습니다.

train.isnull().sum()

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

PassengerId      0
Survived         0
Pclass           0
Name             0
Sex              0
Age            177
SibSp            0
Parch            0
Ticket           0
Fare             0
Cabin          687
Embarked         2
dtype: int64

AgeCabin 그리고 Embarked 에 결측값이 존재하는 것을 알 수 있습니다.

데이터에 대한 간단한 통계 정보를 보고 싶으면 describe() 메소드를 사용할 수 도 있습니다.

train.describe(include='all')

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

조회해 본 결과를 통해서 알 수 있듯이, Age, Cabin, Embarked 는 결측값이 존재하는 것을 확인할 수 있습니다. 그리고 여러 유형의 문자열 데이터가 존재하는 것도 확인할 수 있습니다.

데이터는 크게 숫자형 데이터(Numerical Type)와 범주형 데이터(Categorical Type)로 나눌 수 있습니다. 숫자형 데이터는 연속성을 가지는 숫자로 이루어진 데이터를 의미합니다. 예제에서는 AgeFare 같은 것이 여기에 속합니다. 범주형 데이터는 연속적이지 않는 값을 갖는 데이터를 의미합니다. 대부분의 경우 문자형 데이터가 여기에 속합니다. 하지만, 어떤 경우에는 숫자형 데이터도 개념적으로 범주형으로 처리해야 할 경우도 있습니다. 예제에서는 Sex, Embarked, Pclass 를 범주형 데이터라고 볼 수 있습니다. Pclass 의 경우 숫자형 데이터로 보이지만, 개념적으로 범주형으로 처리하겠습니다.

데이터 변환 (Data Transformation )

머신 러닝 모델을 학습할 때 사용할 수 있도록 데이터를 변환하고, 결측값들을 처리하도록 하겠습니다

범주형 데이터 변환

One-Hot Encoding 을 사용하겠습니다. 판다스에서는 get_dummies() 메소드를 이용하면 One-Hot Encoding 을 손쉽게 할 수 있습니다.

One-Hot Encoding 은 문자를 숫자로 바꾸어 주는 방법 중의 하나로서, 가변수(dummy variable)을 0과 1로 이루어진 가변수를 만들어 주는 것입니다. 1은 있다는 것을, 0은 없다는것을 나타냅니다.

예를 들어 과일이라는 컬럼이 있습니다. 해당 컬럼은 사과, 바나나, 체리라는 세가지 종류의 값을 가지고 있습니다. 이 값을 One-Hot Encoding 할 경우 사과라는 값은 “1,0,0” 같은 형태로 변환시킬수 있습니다.

그림 출처 : (?)

판다스의 get_dummies() 메소드를 이용하여 Sex, Embarked, Pclass 를 One-Hot Encoding 하겠습니다.

train = pd.get_dummies(train, columns=['Sex', 'Embarked', 'Pclass'])
test = pd.get_dummies(test, columns=['Sex', 'Embarked', 'Pclass'])

Sex, Embarked, Pclass 컬럼들이 아래처럼 변환것을 확인할 수 있습니다.

결측 데이터 처리

결측값들을 처리하는 방법은 크게 두 가지가 있습니다. 결측값이 포함된 데이터를 삭제하거나, 다른값으로 치환하는 것입니다. 판다스에서 결측값이 포함된 데이터를 삭제하고 싶으면 dropna() 메소드를 사용하면 됩니다. 만약 다른값으로 치환하고 싶다면 fillna() 메소드를 사용하면 됩니다.

Age 의 결측값을 처리해 보겠습니다. 간단하게 생존자와 사망자의 나이 평균값을 구한다음, 그 값으로 치환하겠습니다.

survived_1_age_mean = train[(train['Survived'] == 1)]['Age'].mean()
survived_0_age_mean = train[(train['Survived'] == 0)]['Age'].mean()

Age 의 결측값을 생존자와 사망자의 나이 평균값을로 치환합니다.

train.loc[train['Survived'] == 1, 'Age'] = train[train['Survived'] == 1].fillna(survived_1_age_mean)
train.loc[train['Survived'] == 0, 'Age'] = train[train['Survived'] == 0].fillna(survived_1_age_mean)
test['Age'].fillna(test['Age'].mean(), inplace=True)

다시 한번 데이터의 정보를 조회해 보겠습니다.

train.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 17 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  891 non-null    int64  
 1   Survived     891 non-null    int64  
 2   Name         891 non-null    object 
 3   Age          891 non-null    float64
 4   SibSp        891 non-null    int64  
 5   Parch        891 non-null    int64  
 6   Ticket       891 non-null    object 
 7   Fare         891 non-null    float64
 8   Cabin        204 non-null    object 
 9   Sex_female   891 non-null    uint8  
 10  Sex_male     891 non-null    uint8  
 11  Embarked_C   891 non-null    uint8  
 12  Embarked_Q   891 non-null    uint8  
 13  Embarked_S   891 non-null    uint8  
 14  Pclass_1     891 non-null    uint8  
 15  Pclass_2     891 non-null    uint8  
 16  Pclass_3     891 non-null    uint8  
dtypes: float64(2), int64(4), object(3), uint8(8)
memory usage: 69.7+ KBtrain['Cabin'].unique()

이제 Cabin 을 제외하고는 결측값을 없습니다.

Cabin 에 어떤 값들이 존재하는 확인해 보겠습니다.

In [] :

train['Cabin'].unique()

Out[] :

array([nan, 'C85', 'C123', 'E46', 'G6', 'C103', 'D56', 'A6',
       'C23 C25 C27', 'B78', 'D33', 'B30', 'C52', 'B28', 'C83', 'F33',
       'F G73', 'E31', 'A5', 'D10 D12', 'D26', 'C110', 'B58 B60', 'E101',
       'F E69', 'D47', 'B86', 'F2', 'C2', 'E33', 'B19', 'A7', 'C49', 'F4',
       'A32', 'B4', 'B80', 'A31', 'D36', 'D15', 'C93', 'C78', 'D35',
       'C87', 'B77', 'E67', 'B94', 'C125', 'C99', 'C118', 'D7', 'A19',
       'B49', 'D', 'C22 C26', 'C106', 'C65', 'E36', 'C54',
       'B57 B59 B63 B66', 'C7', 'E34', 'C32', 'B18', 'C124', 'C91', 'E40',
       'T', 'C128', 'D37', 'B35', 'E50', 'C82', 'B96 B98', 'E10', 'E44',
       'A34', 'C104', 'C111', 'C92', 'E38', 'D21', 'E12', 'E63', 'A14',
       'B37', 'C30', 'D20', 'B79', 'E25', 'D46', 'B73', 'C95', 'B38',
       'B39', 'B22', 'C86', 'C70', 'A16', 'C101', 'C68', 'A10', 'E68',
       'B41', 'A20', 'D19', 'D50', 'D9', 'A23', 'B50', 'A26', 'D48',
       'E58', 'C126', 'B71', 'B51 B53 B55', 'D49', 'B5', 'B20', 'F G63',
       'C62 C64', 'E24', 'C90', 'C45', 'E8', 'B101', 'D45', 'C46', 'D30',
       'E121', 'D11', 'E77', 'F38', 'B3', 'D6', 'B82 B84', 'D17', 'A36',
       'B102', 'B69', 'E49', 'C47', 'D28', 'E17', 'A24', 'C50', 'B42',
       'C148'], dtype=object)

Cabin 즉 객실 번호는 생존 여부에 영향을 미칠 수 있을거 같습니다. 하지만 결측값이 너무 많기 때문에, 예제에서는 값들을 사용하지 않도록 하겠습니다. NameTicket 도 삭제하도록 하겠습니다. 좋은 모델을 만드는게 목적이 아니라, Kubeflow 파이프라인을 만드는것이 목적이므로, 과감히 삭제 하도록 하겠습니다.

Cabin ,Name , Ticket 컬럼을 삭제하겠습니다.

train = train.drop(columns=['Cabin', 'Name', 'Ticket'], axis=1)
test = test.drop(columns=['Cabin', 'Name', 'Ticket'], axis=1)

test 데이터의 정보도 조회해 보겠습니다.

test.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 418 entries, 0 to 417
Data columns (total 13 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  418 non-null    int64  
 1   Age          418 non-null    float64
 2   SibSp        418 non-null    int64  
 3   Parch        418 non-null    int64  
 4   Fare         417 non-null    float64
 5   Sex_female   418 non-null    uint8  
 6   Sex_male     418 non-null    uint8  
 7   Embarked_C   418 non-null    uint8  
 8   Embarked_Q   418 non-null    uint8  
 9   Embarked_S   418 non-null    uint8  
 10  Pclass_1     418 non-null    uint8  
 11  Pclass_2     418 non-null    uint8  
 12  Pclass_3     418 non-null    uint8  
dtypes: float64(2), int64(3), uint8(8)
memory usage: 19.7 KB

Fare 에 결측값이 있는 것을 확인할 수 있겠습니다. 평균값으로 치환하겠습니다.

In []

test['Fare'].fillna(test['Fare'].mean(), inplace=True)

각 데이터들간에 어떤 관련성이 있는지를 분석하기 위하여, 피어슨 상관 계수(Pearson correlation coefficient)를 사용하겠습니다.

In []:

corr = train.corr(method='pearson')
print(corr)

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

PassengerId  Survived       Age     SibSp     Parch      Fare  \\
PassengerId     1.000000 -0.005007  0.034016 -0.057527 -0.001652  0.012658   
Survived       -0.005007  1.000000 -0.065915 -0.035322  0.081629  0.257307   
Age             0.034016 -0.065915  1.000000 -0.233212 -0.173876  0.095674   
SibSp          -0.057527 -0.035322 -0.233212  1.000000  0.414838  0.159651   
Parch          -0.001652  0.081629 -0.173876  0.414838  1.000000  0.216225   
Fare            0.012658  0.257307  0.095674  0.159651  0.216225  1.000000   
Sex_female     -0.042939  0.543351 -0.081785  0.114631  0.245489  0.182333   
Sex_male        0.042939 -0.543351  0.081785 -0.114631 -0.245489 -0.182333   
Embarked_C     -0.001205  0.168240  0.030613 -0.059528 -0.011069  0.269335   
Embarked_Q     -0.033606  0.003650 -0.027873 -0.026354 -0.081228 -0.117216   
Embarked_S      0.022148 -0.155660 -0.017186  0.070941  0.063036 -0.166603   
Pclass_1        0.034303  0.285904  0.323163 -0.054582 -0.017633  0.591711   
Pclass_2       -0.000086  0.093349  0.013967 -0.055932 -0.000734 -0.118557   
Pclass_3       -0.029486 -0.322308 -0.289806  0.092548  0.015790 -0.413333   

             Sex_female  Sex_male  Embarked_C  Embarked_Q  Embarked_S  \\
PassengerId   -0.042939  0.042939   -0.001205   -0.033606    0.022148   
Survived       0.543351 -0.543351    0.168240    0.003650   -0.155660   
Age           -0.081785  0.081785    0.030613   -0.027873   -0.017186   
SibSp          0.114631 -0.114631   -0.059528   -0.026354    0.070941   
Parch          0.245489 -0.245489   -0.011069   -0.081228    0.063036   
Fare           0.182333 -0.182333    0.269335   -0.117216   -0.166603   
Sex_female     1.000000 -1.000000    0.082853    0.074115   -0.125722   
Sex_male      -1.000000  1.000000   -0.082853   -0.074115    0.125722   
Embarked_C     0.082853 -0.082853    1.000000   -0.148258   -0.778359   
Embarked_Q     0.074115 -0.074115   -0.148258    1.000000   -0.496624   
Embarked_S    -0.125722  0.125722   -0.778359   -0.496624    1.000000   
Pclass_1       0.098013 -0.098013    0.296423   -0.155342   -0.170379   
Pclass_2       0.064746 -0.064746   -0.125416   -0.127301    0.192061   
Pclass_3      -0.137143  0.137143   -0.153329    0.237449   -0.009511   

             Pclass_1  Pclass_2  Pclass_3  
PassengerId  0.034303 -0.000086 -0.029486  
Survived     0.285904  0.093349 -0.322308  
Age          0.323163  0.013967 -0.289806  
SibSp       -0.054582 -0.055932  0.092548  
Parch       -0.017633 -0.000734  0.015790  
Fare         0.591711 -0.118557 -0.413333  
Sex_female   0.098013  0.064746 -0.137143  
Sex_male    -0.098013 -0.064746  0.137143  
Embarked_C   0.296423 -0.125416 -0.153329  
Embarked_Q  -0.155342 -0.127301  0.237449  
Embarked_S  -0.170379  0.192061 -0.009511  
Pclass_1     1.000000 -0.288585 -0.626738  
Pclass_2    -0.288585  1.000000 -0.565210  
Pclass_3    -0.626738 -0.565210  1.000000

분석 결과를 보기 좋게 그래프로 만들어 보겠습니다.

In []:

import seaborn as sns
sns.set(context='paper', style='whitegrid', palette='muted', font_scale=1, color_codes=True, rc=None)
sns.heatmap(corr,linewidths=.5)

In []:

sns.barplot(x=corr.Survived,y=corr.columns)

모델의 성능은 데이터의 품질에 좌우됩니다. 그래서 위의 상관 그래프나 컬럼별 통계분포 등을 파악하여 보다 많은 데이터의 정제 과정을 거쳐야 합니다. 그리고 피쳐 엔지니어링 과정을 통해서 데이터의 품질을 높여야합니다. 하지만 해당 내용들은 이 책의 범위에서 벗어나기 때문에 다루지 않겠습니다.

모델 학습 (Train)

준비한 데이터를 이용하여 모델을 학습해 보겠습니다.

먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearntrain_test_split() 를 사용하면, 간단히 나눌 수 있습니다.

In []:

from sklearn.model_selection import train_test_split

X = train.drop('Survived', axis=1)
y = train['Survived']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=30)

sklearnLogisticRegression 을 사용하여, 모델을 학습 시켜 보겠습니다.

In []:

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

model = LogisticRegression()

model.fit(X_train, y_train)
prediction = model.predict(X_test)
cr = classification_report(y_test, prediction, output_dict=True)
print('accuracy =', cr['accuracy'])

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

accuracy = 0.7892376681614349

이제 캐글에서 제공한 test 데이터를 가지고 예측해 보도록 하겠습니다.

pred = model.predict(test)

예측한 결과물을 가지고, 캐글에 제공할 submission.csv 파일을 생성해 보겠습니다.

submission = pd.DataFrame({'PassengerId': test['PassengerId'], 'Survived': pred})
submission.to_csv('submission.csv', index=False)

submission 을 조회해 보겠습니다.

submission.head()

정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

생성한 submission.csv 파일을 캐글에 제출하겠습니다. kaggle 명령어를 사용하여 제출합니다. kaggle 명령어가 PATH 에 포함되어 있지 않기 때문에 전체 경로를 적어 주었습니다.

!/home/jovyan/.local/bin/kaggle competitions submit -c titanic -f submission.csv -m "Message"

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

파이프라인 만들기

앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.

파이프라인의 단계는 다음과 같습니다.

  • 데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
  • 데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
  • 데이터 변환 : train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다.
  • 모델 학습 : 변환된 train.csv 데이터를 이용하여 모델을 학습니다.
  • 예측 : 변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
  • 캐글 제출 : 생성된 submission.csv 파일을 캐글에 제출합니다.

파이프라인의 전체적인 흐름은 다음과 같습니다.

기본 이미지 만들기

데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.

먼저 베이스 이미지 디렉토리를 만듭니다.

mkdir -p ~/workspace/base/sklearn
cd ~/workspace/base/sklearn

필요한 파이썬 패키지 목록을 requirements.txt 파일로 작성합니다.

requirements.txt

scikit-learn
joblib
numpy
pandas
fire

컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt 파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.

Dockerfile

FROM python:3.6-slim

WORKDIR /app
COPY requirements.txt /app

RUN pip --no-cache-dir install -r requirements.txt

컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.

build_image.sh

#!/bin/bash -e

image_name=kangwoo/sklearn
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim

cd "$(dirname "$0")"

docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"

build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.

chmod +x build_image.sh
./build_image.sh

컴포넌트 만들기

캐글 데이터 다운로드 컴포넌트

캐글에서 데이터 다운로드하는 컴포넌트를 만들어 보겠습니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.

먼저 컴포넌트 디렉토리를 만듭니다.

mkdir -p ~/workspace/components/kaggle/competitions_download
mkdir -p ~/workspace/components/kaggle/competitions_download/src
cd ~/workspace/components/kaggle/competitions_download

컴포넌트 디렉토리의 하위 src 디렉토리에, 컴포넌트에서 사용할 애플리케이션 코드인 [download.py](<http://download.py>) 파일을 작성합니다. kaggle 명령어를 실행하여, 데이터를 다운로드 합니다. 이미 데이터가 저장 경로에 존재하면 다운로드 하지 않습니다. 데이터를 다운로드 했는지 여부를 출력 결과값으로 반환합니다.

src/download.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os
import subprocess
from distutils.util import strtobool

parser = argparse.ArgumentParser()
parser.add_argument('--competition', required=True, type=str)
parser.add_argument('--path', default='.', type=str)
parser.add_argument('--force', default='False', type=strtobool)
parser.add_argument('--downloaded', default='/tmp/outputs/downloaded', type=str)
args = parser.parse_args()

if not os.path.exists(args.path):
    os.makedirs(args.path)

kaggle_args = ['kaggle', 'competitions', 'download', '--path', args.path]
if args.force:
    kaggle_args.append('--force')
kaggle_args.append(args.competition)
print(kaggle_args)
result = subprocess.check_output(kaggle_args, encoding='utf-8')
print('result:', result)

downloaded = False
if 'Downloading' in result:
    downloaded = True
print('downloaded:', downloaded)

if not os.path.exists(os.path.dirname(args.downloaded)):
    os.makedirs(os.path.dirname(args.downloaded))

with open(args.downloaded, 'w') as writer:
    writer.write(str(downloaded))

컴포넌트 디렉토리에 컴포넌트 설정 정보가 있는 component.yaml 파일을 작성합니다.

component.yaml

name: Kaggle - Competitions downloader
description: Download competition files
inputs:
  - {name: competition, type: String, description: 'Competition URL suffix'}
  - {name: path, type: String, default: '.', description: 'Folder where file(s) will be download, defaults to current working directory'}
  - {name: force, type: String, default: 'False', description: 'Skip check whether local version of files is up to date, force file download'}
outputs:
  - {name: downloaded, type: String, description: 'Downloaded'}
implementation:
  container:
    image: kangwoo/kaggle-competitions-download@sha256:e0c585eaa50d880a0e0ab2245077c9ec487ffc7c5b8c910c7a88798314d6eab9
    command: ['python', 'download.py']
    args: [
      --competition, {inputValue: competition},
      --path, {inputValue: path},
      --force, {inputValue: force},
      --downloaded, {outputPath: downloaded},
    ]

컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬을 기본 이미지로 사용하고 있으며, 앞서 작성한 src/download.py 파일을 추가합니다.

Dockerfile

ARG BASE_IMAGE_TAG=3.6-slim
FROM python:$BASE_IMAGE_TAG

RUN pip install kaggle

WORKDIR /app
ADD src/download.py /app/

ENTRYPOINT ['python', 'download.py']

컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.

build_image.sh

#!/bin/bash -e

image_name=kangwoo/kaggle-competitions-download
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim

cd "$(dirname "$0")"

docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
# This name can be used by the subsequent steps to refer to the exact image that was built even if another image with the same name was pushed
image_name_with_digest=$(docker inspect --format="{{index .RepoDigests 0}}" "$full_image_name")
strict_image_name_output_file=./versions/image_digests_for_tags/$image_tag
mkdir -p "$(dirname "$strict_image_name_output_file")"
echo $image_name_with_digest | tee "$strict_image_name_output_file"

build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.

chmod +x build_image.sh
./build_image.sh

생성한 로컬 개발 환경의 ~/workspace/components/kaggle/competitions_downloader/component.yaml 파일을, 주피터 노트북의 ~/workspace/components/kaggle/competitions_downloader/component.yaml 로 복사합니다.

캐글 제출 컴포넌트

예측 결과를 캐글에 제출하는 컴포넌트를 만들어 보겠습니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.

먼저 컴포넌트 디렉토리를 만듭니다.

mkdir -p ~/workspace/components/kaggle/competitions_submit

컴포넌트 디렉토리의 하위 src 디렉토리에, 컴포넌트에서 사용할 애플리케이션 코드인 submit.py 파일을 작성합니다. kaggle 명령어를 실행하여, 예측 결과가 저장된 submission.csv 파일을 캐글에 제출 합니다.

src/submit.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os
import subprocess

parser = argparse.ArgumentParser()
parser.add_argument('--competition', required=True, type=str)
parser.add_argument('--path', default='.', type=str)
parser.add_argument('--filename', default='submission.csv', type=str)
parser.add_argument('--message', default='Message', type=str)
parser.add_argument('--submitted', default='/tmp/outputs/submitted', type=str)
args = parser.parse_args()

file = os.path.join(args.path, args.filename)
kaggle_args = ['kaggle', 'competitions', 'submit', '-c', args.competition, '-f', file, '-m', args.message]
print(kaggle_args)
result = subprocess.check_output(kaggle_args, encoding='utf-8')
print('result:', result)

submitted = False
if 'submitted' in result:
    submitted = True
print('submitted:', submitted)

if not os.path.exists(os.path.dirname(args.submitted)):
    os.makedirs(os.path.dirname(args.submitted))

with open(args.submitted, 'w') as writer:
    writer.write(str(submitted))

컴포넌트 디렉토리에 컴포넌트 설정 정보가 있는 component.yaml 파일을 작성합니다.

component.yaml

name: Kaggle - Competitions submitter
description: Submit competition file
inputs:
  - {name: competition, type: String, description: 'Competition URL suffix'}
  - {name: path, type: String, description: 'Path for upload'}
  - {name: filename, type: String, default: 'submission.csv', description: 'Filename for upload'}
  - {name: message, type: String, description: 'Message describing this submission'}
outputs:
  - {name: submitted, type: String, description: 'Submitted'}
implementation:
  container:
    image: kangwoo/kaggle-competitions-submit:0.0.1
    command: ['python', 'submit.py']
    args: [
      --competition, {inputValue: competition},
      --path, {inputValue: path},
      --filename, {inputValue: filename},
      --message, {inputValue: message},
      --submitted, {outputPath: submitted},
    ]

컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬을 기본 이미지로 사용하고 있으며, 앞서 작성한 src/submit.py 파일을 추가합니다.

Dockerfile

ARG BASE_IMAGE_TAG=3.6-slim
FROM python:$BASE_IMAGE_TAG

RUN pip install kaggle

WORKDIR /app
ADD src/submit.py /app/

ENTRYPOINT ['python', '/app/submit.py']

컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.

build_image.sh

#!/bin/bash -e

image_name=kangwoo/kaggle-competitions-submit
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim

cd "$(dirname "$0")"

docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
# This name can be used by the subsequent steps to refer to the exact image that was built even if another image with the same name was pushed
image_name_with_digest=$(docker inspect --format="{{index .RepoDigests 0}}" "$full_image_name")
strict_image_name_output_file=./versions/image_digests_for_tags/$image_tag
mkdir -p "$(dirname "$strict_image_name_output_file")"
echo $image_name_with_digest | tee "$strict_image_name_output_file"

build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.

chmod +x build_image.sh
./build_image.sh

생성한 로컬 개발 환경의 ~/workspace/components/kaggle/competitions_submit/component.yaml 파일을, 주피터 노트북의 ~/workspace/components/kaggle/competitions_submit/component.yaml 로 복사합니다.

컴포넌트 작성하기

주피터 노트북을 생성합니다. 파일 이름은 titanic_pipeline.ipynb 입니다. 노트북의 위치는 ~/workspace/titanic입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/titanic 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.

패키지 추가

파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.

In []:

import os
import kfp
from kfp import dsl
from kfp import onprem

from kfp import components
from kfp.components import func_to_container_op, InputPath, OutputPath

from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount

캐글 데이터 다운로드 컴포넌트

캐글에서 데이터를 다운로드 하는 컴포넌트를 만들어 보겠습니다. 이 기능은 필요에 따라 다른 곳에서도 사용기 가능할거 같기 때문에, 재사용 컴포넌트로 만들겠습니다.

In []:

download_op = components.load_component_from_file('../components/kaggle/competitions_downloader/component.yaml')

데이터 압축풀기 컴포넌트

unzip 명령어를 사용하여 압축을 풀겠습니다.

In []:

def unzip_op(filename, exdir):
    return dsl.ContainerOp(name='Unzip',
                           image='garthk/unzip:latest',
                           command=['unzip', '-o', filename, '-d', exdir])

데이터 변환 컴포넌트

train.csvtest.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path 파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path 파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.

In []:

def transform_op(input_path, output_path):
    import os
    import pandas as pd
    train = pd.read_csv(os.path.join(input_path, 'train.csv'))
    test = pd.read_csv(os.path.join(input_path, 'test.csv'))
    print(train.info())
    print("*"*40)
    print(test.info())
    train = pd.get_dummies(train, columns=['Sex', 'Embarked', 'Pclass'])
    test = pd.get_dummies(test, columns=['Sex', 'Embarked', 'Pclass'])
    survived_1_age_mean = train[(train['Survived'] == 1)]['Age'].mean()
    survived_0_age_mean = train[(train['Survived'] == 0)]['Age'].mean()
    train.loc[train['Survived'] == 1, 'Age'] = train[train['Survived'] == 1].fillna(survived_1_age_mean)
    train.loc[train['Survived'] == 0, 'Age'] = train[train['Survived'] == 0].fillna(survived_0_age_mean)
    test['Age'].fillna(test['Age'].mean(), inplace=True)
    train = train.drop(columns=['Cabin','Name','Ticket'], axis=1)
    test = test.drop(columns=['Cabin','Name','Ticket'], axis=1)
    test['Fare'].fillna(test['Fare'].mean(), inplace=True)
    
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    
    train.to_csv(os.path.join(output_path, 'train.csv'))
    test.to_csv(os.path.join(output_path, 'test.csv'))

transform_op = components.func_to_container_op(transform_op, base_image='kangwoo/sklearn:0.0.1')

모델 학습 컴포넌트

변환된 train.csv 데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path 파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 사용할 모델 이름은 model_name 파라미터로 입력 받게 하였습니다. 학습된 모델의 분류 결과를 classification_report.json 파일로 저장하고, 학습된 모델을 model.joblib 파일로 저장하였습니다. 각 파일든은 export_path 파라미터로 입력하 경로에 저장됩니다.

In []:

def transform_op(input_path, output_path):
    import os
    import pandas as pd
    train = pd.read_csv(os.path.join(input_path, 'train.csv'))
    test = pd.read_csv(os.path.join(input_path, 'test.csv'))
    print(train.info())
    print("*"*40)
    print(test.info())
    train = pd.get_dummies(train, columns=['Sex', 'Embarked', 'Pclass'])
    test = pd.get_dummies(test, columns=['Sex', 'Embarked', 'Pclass'])
    survived_1_age_mean = train[(train['Survived'] == 1)]['Age'].mean()
    survived_0_age_mean = train[(train['Survived'] == 0)]['Age'].mean()
    train.loc[train['Survived'] == 1, 'Age'] = train[train['Survived'] == 1].fillna(survived_1_age_mean)
    train.loc[train['Survived'] == 0, 'Age'] = train[train['Survived'] == 0].fillna(survived_0_age_mean)
    test['Age'].fillna(test['Age'].mean(), inplace=True)
    train = train.drop(columns=['Cabin','Name','Ticket'], axis=1)
    test = test.drop(columns=['Cabin','Name','Ticket'], axis=1)
    test['Fare'].fillna(test['Fare'].mean(), inplace=True)
    
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    
    train.to_csv(os.path.join(output_path, 'train.csv'))
    test.to_csv(os.path.join(output_path, 'test.csv'))

transform_op = components.func_to_container_op(transform_op, base_image='kangwoo/sklearn:0.0.1')

예측 컴포넌트

변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.

In []:

def predict_op(model_path, test_data_path, submission_path):
    import argparse
    import json
    import os

    import joblib
    import pandas as pd

    reports = {}
    model_names = os.listdir(model_path)
    for model_name in model_names:
        file = os.path.join(model_path, model_name, 'classification_report.json')
        print(file)
        if os.path.isfile(file):
            with open(file) as json_file:
                report = json.load(json_file)
                reports[model_name] = report

    print('{} found'.format(len(reports)))

    for item in reports.items():
        print('item :', item)
        print('{} :  accuracy={}'.format(item[0], item[1]['accuracy']))

    def accuracy(x):
        return reports[x]['accuracy']

    best_model = max(reports.keys(), key=accuracy)
    print('Best model is', best_model, reports[best_model]['accuracy'])

    model = joblib.load(os.path.join(model_path, best_model, 'model.joblib'))
    print(model)

    test = pd.read_csv(os.path.join(test_data_path, 'titanic-test.csv'))
    pred = model.predict(test)

    submission_file = os.path.join(submission_path, 'submission.csv')
    if not os.path.isdir(os.path.dirname(submission_file)):
        os.makedirs(os.path.dirname(submission_file))

    submission = pd.DataFrame({'PassengerId': test['PassengerId'], 'Survived': pred})
    submission.to_csv(submission_file, index=False)
    print("Saved submission :", submission_file)

predict_op= components.func_to_container_op(predict_op, base_image='kangwoo/sklearn:0.0.1')

캐글 제출 컴포넌트

생성된 submission.csv 파일을 캐글에 제출합니다.

In[] :

submit_op = components.load_component_from_file('../components/kaggle/competitions_submit/component.yaml')

파이프라인 생성하기

PVC 생성하기

PVC를 생성하기 위한 매니페스트를 작성합니다. 파이프라인 컴포넌트에서 이 PVC를 이용하여, 데이터를 저장하고 읽어 올 것입니다.

kaggle-pvc.yaml

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: kaggle-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 200Mi

다음은 kubeflow 네임스페이스에 PVC 리소스를 생성하는 명령어 입니다.

kubectl -n kubeflow apply -f kaggle-pvc.yaml

캐글 토큰 Secret 생성하기

캐글 API 토큰이 저장되어 있는 kaggle.json 파일을 이용하여, 쿠버네티스 Secret 리소스를 생성합니다. 생성한 Secret 리소스는 파이프라인의 “캐글에서 데이터 다운로드 컴포넌트”와 “캐글 제출 컴포넌트” 에서 사용될 것입니다.

다음은 kubeflow 네임스페이스에 kaggle-secret 이라는 리소스를 생성하는 명령어 입니다.

kubectl -n kubeflow create secret generic kaggle-secret --from-file=kaggle.json

파이프라인 작성하기

주피터 노트북으로 돌아가서 파이프라인을 작성해 보겠습니다.

In []:

@dsl.pipeline(
    name='Titanic pipeline',
    description='Titanic pipeline'
)
def titanic_pipeline():
    pvc_name = "kaggle-pvc"
    volume_name = 'pipeline'
    volume_mount_path = '/data'

    competition_name = 'titanic'
    kaggle_data_path = os.path.join('/data/competitions', competition_name, 'kaggle')
    input_data_path = os.path.join('/data/competitions', competition_name, 'input')

    download_task = download_op(competition=competition_name, path=kaggle_data_path)\\
        .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
        .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle'))

    with dsl.Condition(download_task.outputs['downloaded'] == 'True'):
        unzip_task = unzip_op(os.path.join(kaggle_data_path, 'titanic.zip'), kaggle_data_path).after(download_task)
        
    transform_task = transform_op(input_path=kaggle_data_path, output_path=input_data_path).after(unzip_task)

    models = ['sklearn.linear_model.LogisticRegression',
          'sklearn.linear_model.SGDClassifier',
          'sklearn.naive_bayes.GaussianNB',
          'sklearn.neighbors.KNeighborsClassifier',
          'sklearn.tree.DecisionTreeClassifier',
          'sklearn.svm.SVC',
          'sklearn.ensemble.AdaBoostClassifier',
          'sklearn.ensemble.RandomForestClassifier']
    
    export_path = os.path.join('/data/competitions', competition_name, 'models')
    submit_path = os.path.join('/data/competitions', competition_name, 'submit')
    
    with dsl.ParallelFor(models) as model:
        train_task = train_op(input_data_path, model, export_path).after(transform_task)

    predict_task = predict_op(export_path, input_data_path, submit_path).after(train_task)
    
    submit_task = submit_op(competition='titanic', path=submit_path, message='RunId : {{workflow.uid}}')\\
            .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\
            .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle')).after(predict_task)

    
    steps = [download_task, unzip_task, transform_task, train_task, predict_task, submit_task]
    for step in steps:
        step.apply(onprem.mount_pvc(pvc_name, volume_name=volume_name, volume_mount_path=volume_mount_path))
        

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(titanic_pipeline, 'titanic-pipeline.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Kaggle Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Titanic Pipeline', 'titanic-pipeline.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

파이프라인 실행이 완료되면, 예측 결과가 캐글에 제출 됩니다.

캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

Kubeflow 1.0.0에서 InferenceService 의 상태 갱신이 실패할때

Kubeflow 1.0.0 이나 1.0.1에서 kfserving이 정상적으로 작동하지 않는 분을 위해서 참고삼아 적어 봅니다.

Kubeflow 설치 환경이 워낙 다양하고, 설치 환경 파일도 다양하기 때문에 이 글이 의미가 없을 수도 있습니다.

그리고, 이 방법은 정상적인 방법이 아니기 때문에, 단순히 테스트 삼아 KFServing을 사용하는게 목적일때만 사용하시기 바랍니다.

혹시 업그레이드가 가능하신 분들은 최신 버전으로 업데이트해 보시기 바랍니다.


KFServing는 InferenceService 라는 사용자 리소스를 사용합니다. InferenceService 는 정의한 추론 서버를 실행시킨 다음, 서버가 정상적으로 구동되었는지를 체크하는데, 환경에 따라서 이 상태 체크가 정상적으로 안되는 문제가 있습니다. 그 이유는 istio 때문입니다. Kubeflow에서 인증/권한을 위해서 istio를 사용합니다. InferenceService 를 관리하는 컨트롤러에서 상태 체크를 위해서 URL을 호출하는데, istio에서 인증/권한 체크 부분 때문에 200 OK가 반환되지 않는 문제가 생기는 것이죠. 그래서 실패한 상태로 인식이 되고, InferenceService 가 정상작동하지 않습니다.

이 문제를 제대로 해결하기 위해서는, Kubeflow에서 해결해 줄때까지 기다리던지, 아니면, kfserving + istio 전문가를 부르면 됩니다.

하지만 그럴 여건이 안되거나, 단순히 취미 생활로 KFServing 을 사용할 예정이라면, 다음과 같이 작동하게는 바꿀 수 있습니다. (권장하는 방법은 아닙니다.)

KFServing 에서 사용하는 istio-ingressgateway를 만들고, 보안 설정 부분을 삭제하는 것입니다.

먼저 kfserving-ingressgateway 을 생성합니다. 만약 생성되어 있다면 무시하시면 됩니다. 보통 istio-system 네임스페이스나, knative-serving 네임스페이스 존재할 수 있습니다. DeploymentService 를 확인해 보시면 됩니다.

 $ kubectl -n istio-system get deploy
NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
cluster-local-gateway      1/1     1            1           54d
istio-citadel              1/1     1            1           54d
istio-galley               1/1     1            1           54d
istio-ingressgateway       1/1     1            1           54d
istio-pilot                1/1     1            1           54d
istio-policy               1/1     1            1           54d
istio-sidecar-injector     1/1     1            1           54d
istio-telemetry            1/1     1            1           54d
kfserving-ingressgateway   1/1     1            1           54d
prometheus                 1/1     1            1           54d
$ kubectl -n istio-system get service
NAME                       TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
authservice                ClusterIP      10.98.242.196    <none>        8080/TCP                                                                                                                                                                                  54d
cluster-local-gateway      ClusterIP      10.109.28.157    <none>        80/TCP,443/TCP,31400/TCP,15011/TCP,8060/TCP,15029/TCP,15030/TCP,15031/TCP,15032/TCP                                                                                                       54d
istio-citadel              ClusterIP      10.102.226.47    <none>        8060/TCP,15014/TCP                                                                                                                                                                        54d
istio-galley               ClusterIP      10.111.115.206   <none>        443/TCP,15014/TCP,9901/TCP                                                                                                                                                                54d
istio-ingressgateway       NodePort       10.103.205.239   <none>        15020:30536/TCP,80:31380/TCP,443:31390/TCP,31400:31400/TCP,15029:32168/TCP,15030:32077/TCP,15031:31505/TCP,15032:32021/TCP,15443:31546/TCP                                                54d
istio-pilot                ClusterIP      10.110.53.9      <none>        15010/TCP,15011/TCP,8080/TCP,15014/TCP                                                                                                                                                    54d
istio-policy               ClusterIP      10.106.248.16    <none>        9091/TCP,15004/TCP,15014/TCP                                                                                                                                                              54d
istio-sidecar-injector     ClusterIP      10.105.132.134   <none>        443/TCP,15014/TCP                                                                                                                                                                         54d
istio-telemetry            ClusterIP      10.105.24.245    <none>        9091/TCP,15004/TCP,15014/TCP,42422/TCP                                                                                                                                                    54d
kfserving-ingressgateway   LoadBalancer   10.101.141.37    <pending>     15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   54d
prometheus                 ClusterIP      10.101.81.54     <none>        9090/TCP                                                                                                                                                                                  54d

만약 없다면 생성해 줍니다.

kubeflow 네임스페이스에 있는 inferenceservice-config라는 ConfigMap을 수정해 줍니다. inferenceservice-configingress 부분의 ingressService 를 앞서 생성했거나, 존재하는 kfserving-ingressgateway 으로 수정해 줍니다. 주소 형식은 “서비스명.네임스페이스.svc.cluster.local” 입니다.

$ kubectl -n kubeflow edit cm inferenceservice-config 

apiVersion: v1
data:
...
  ingress: |-
    {
        "ingressGateway" : "knative-ingress-gateway.knative-serving",
        "ingressService" : "kfserving-ingressgateway.istio-system.svc.cluster.local"
    }
...

설정이 변경이 되었으면, 혹시 모르니 kfserving-controller-manager 를 재시작해 줍니다.

그리고, istio RBAC 설정이 담겨 있는 clusterrbacconfig 을 삭제합니다.

$ kubectl get clusterrbacconfig
$ kubectl delete clusterrbacconfig XXXX

이제 kfserving-ingressgateway 의 주소로 요청을 보낼 수 있고, 상태가 갱신되는것을 확인할 수 있습니다. 아.마.도….

Seldon Core – Tensorflow Serving

학습이 완료된 Tensorflow 모델을 저장 한 경우, Seldon의 사전 패키지 된 TensorFlow 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • REST의 경우 다음에 대한 파라미터를지정해야합니다.
    • signature_name
    • model_name
apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: tfserving
spec:
  name: mnist
  predictors:
  - graph:
      children: []
      implementation: TENSORFLOW_SERVER
      modelUri: pvc://seldon-models-pvc/tensorflow/mnist/model
      name: mnist-model
      parameters:
        - name: signature_name
          type: STRING
          value: serving_default
        - name: model_name
          type: STRING
          value: mnist-model
    name: default
    replicas: 1

  • GRPC의 경우 다음에 대한 파라미터를 지정해야합니다.
    • signature_name
    • model_name
    • model_input
    • model_output
kind: SeldonDeployment
metadata:
  name: tfserving
spec:
  name: mnist
  predictors:
  - graph:
      children: []
      implementation: TENSORFLOW_SERVER
      modelUri: pvc://seldon-models-pvc/tensorflow/mnist/modell
      name: mnist-model
      endpoint:
        type: GRPC
      parameters:
        - name: signature_name
          type: STRING
          value: serving_default
        - name: model_name
          type: STRING
          value: mnist-model
        - name: model_input
          type: STRING
          value: images
        - name: model_output
          type: STRING
          value: scores
    name: default
    replicas: 1

모델 생성

Tensorflow 서버를 테스트 하려면 먼저 파이썬을 사용하여 간단한 Tensorflow 모델을 생성해야 합니다.

텐서플로우 모델을 만들고 훈련하기 위한 고수준 API인 tf.keras를 사용합니다. 모델의 save() 메소드를 이용하여, 전체 모델을 지정한 위치에 저장합니다. 여기에는 가중치, 모델 구성 등이 포함됩니다. 모델을 저장할 때 주의해야할 점은 모델 저장 위치의 마지막 디렉토리에 모델의 버전이 포함되어야 합니다. 모델 버전은 숫자를 사용해야합니다.

케라스의 데이터셋 중의 하나인 mnist 데이터를 분류하는 모델을 작성해 보겠습니다.

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf

def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/tensorflow/mnist/model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))

if __name__ == '__main__':
    train()

생성 된 모델을 사용하여 Tensorflow 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 seldon-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

mnist 이미지를 분류하는 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf

def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/tensorflow/mnist/model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))

if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'tensorflow-mnist-job-{uuid.uuid4().hex[:4]}'

command = ["python", "tensorflow_mnist.py", "--model_path", "/mnt/pv/tensorflow/mnist/models"]
output_map = {
    "Dockerfile": "Dockerfile",
    "tensorflow_mnist.py": "tensorflow_mnist.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="tensorflow-mnist",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                k8s_utils.mounting_pvc(pvc_name='seldon-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

Tensorflow을 사용하는 SeldonDeployment 로 배포 하기

SeldonDeployment 생성

SeldonDeployment 매니페스트를 작성합니다. predictor의 구현체를 SKLEARN_SERVER 로 사용합니다. modelUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/sklearn/iris 이므로, pvc://kfserving-models-pvc/models/sklearn/iris 라고 지정해 줍니다.

기본적으로 모델 서버는 로드한 모델의 predict_proba 메소드를 호출합니다. 만약 다른 메소드를 사용하고 싶다면 파라미터로 변경할 수 있습니다. 예를 들어 predict 메소드를 호출하게 하라면, parameters 섹션에 method 란 이름으로 값을 지정해 주면 됩니다. 다음 예제는 predict 메소드를 호출하게 설정하였습니다.

tensorflow-mnist.yaml

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: tensorflow-mnist
spec:
  name: mnist
  predictors:
  - graph:
      children: []
      implementation: TENSORFLOW_SERVER
      modelUri: pvc://seldon-models-pvc/tensorflow/mnist/model
      name: mnist-model
      parameters:
        - name: signature_name
          type: STRING
          value: serving_default
        - name: model_name
          type: STRING
          value: mnist-model
    name: default
    replicas: 1

SeldonDeployment 를 생성합니다.

다음은 admin 네임스페이스 SeldonDeployment 를 생성하는 예제입니다.

kubectl -n admin apply -f tensorflow-mnist.yaml

생성한 SeldonDeployment를 조회해 보겠습니다.

kubectl -n admin get seldondeployment tensorflow-mnist -o yaml

SeldonDeployment 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  ...
spec:
  ...
status:
  deploymentStatus:
    mnist-default-725903e:
      availableReplicas: 1
      replicas: 1
  serviceStatus:
    mnist-default-mnist-model-seldonio-tfserving-proxy-rest-0-7:
      httpEndpoint: mnist-default-mnist-model-seldonio-tfserving-proxy-rest-0-7.admin:9000
      svcName: mnist-default-mnist-model-seldonio-tfserving-proxy-rest-0-7
    tensorflow-mnist-mnist-default:
      grpcEndpoint: tensorflow-mnist-mnist-default.admin:5001
      httpEndpoint: tensorflow-mnist-mnist-default.admin:8000
      svcName: tensorflow-mnist-mnist-default
  state: Available

SeldonDeploymentstateAvailable 이면 예측을 요청 할 수 있습니다.

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

mnist-input.json

{
  "data": {
    "ndarray": [
      [...],
       ...
      [...]
    ]
  }
}

다음은 admin 네임스페이스의 tensorflow-mnist SeldonDeployment 에 예측을 요청하는 예제입니다.

MODEL_NAME=tensorflow-mnist
NAMESPACE=admin

INPUT_PATH=@./mnist-input.json
curl -v -H "Content-Type: application/json" http://$CLUSTER_IP/seldon/${NAMESPACE}/${MODEL_NAME}/api/v1.0/predictions -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /seldon/admin/tensorflow-mnist/api/v1.0/predictions HTTP/1.1
> Host: 192.168.21.38:32380
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 5725
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< x-content-type-options: nosniff
< vary: Origin,Access-Control-Request-Method,Access-Control-Request-Headers
< content-type: application/json;charset=utf-8
< content-length: 470
< date: Thu, 09 Apr 2020 15:51:27 GMT
< x-envoy-upstream-service-time: 142
< server: istio-envoy
< 
{
  "meta": {
    "puid": "ufdopha1s5gnemt86h06d4jg5e",
    "tags": {
    },
    "routing": {
    },
    "requestPath": {
      "mnist-model": "seldonio/tfserving-proxy_rest:0.7"
    },
    "metrics": []
  },
  "data": {
    "names": ["t:0", "t:1", "t:2", "t:3", "t:4", "t:5", "t:6", "t:7", "t:8", "t:9"],
    "ndarray": [[3.01518681E-4, 1.00308341E-6, 4.13124333E-4, 0.00133548444, 4.15516388E-6, 7.8677E-5, 5.88266346E-7, 0.996478, 3.98369411E-5, 0.00134761049]]
  }
}

Seldon Core – MLflow Server

학습이 완료된 MLflow모델을 저장 한 경우, Seldon의 사전 패키지 된 MLflow 서버를 사용하여 간단히 배포 할 수 있습니다. 그리고 conda.yaml 파일을 이용하면, MLflow 서버의 초기화 중 Conda 환경을 만들 수 도 있습니다.

전제 조건

MLflow 서버를 사용하려면 다음 전제 조건이 충족되어야합니다.

MFLow에서 제공하는 모델 형태

MLFlow 에서 제공하고 있는 주요 모델 형태는 다음과 같습니다.

  • Python Function (python_function)
  • Keras (keras)
  • PyTorch (pytorch)
  • Scikit-learn (sklearn)
  • Spark MLlib (spark)
  • TensorFlow (tensorflow)
  • XGBoost (xgboost)

Conda 환경 생성

MLflow 서버는 여러가지 머신러닝 프레임워크를 지원합니다. 그래서 모델 서버를 실행할 때 필요한 패키지들을 설치할 필요가 있습니다. MLflow 서버는 conda.yaml 파일을 이용하여 모델 서버에 실행에 필요한 환경을 만듭니다. SeldonDeployment 에 의해서 모델 서버가 생성될 때, 모델 저장 위치에 있는 conda.yaml 파일을 읽어와서 Conda 환경을 구성합니다.

모델 생성

MLflow 서버를 사용하기 위해 파이썬을 사용한 간단한 scikit-learn 모델을 생성한 후, MLFlow를 이용하여 저장하겠습니다.

Scikit-learn의 기본적인 데이터셋 중의 하나인 아이리스 꽃 데이터를 사용하여, 아이리스 꽃을 분류하는 모델을 작성해 보겠습니다. mlflow.sklearn.save_model() 메소드를 이용하여 MLflow 형식으로 모델을 저장합니다.

from joblib import dump
from sklearn import datasets
from sklearn import svm

clf = svm.SVC(gamma='scale')
iris = datasets.load_iris()
X, y = iris.data, iris.target
clf.fit(X, y)
mlflow.sklearn.save_model(clf, path=model_path)

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 seldon-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

아이리스 꽃을 분류하는 간단한 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

mlflow_sklearn_iris.py

import argparse
import os

import mlflow
import mlflow.sklearn
from sklearn import datasets
from sklearn import svm

def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/mlflow/sklearn/iris/model', type=str)
    args = parser.parse_args()

    # if not (os.path.isdir(args.model_path)):
    #     os.makedirs(args.model_path)
    # os.rmdir(args.model_path)

    clf = svm.SVC(gamma='scale')
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    clf.fit(X, y)
    print('Finished Training')

    conda_env = {
        'name': 'mlflow-env',
        'channels': ['defaults'],
        'dependencies': [
            'python=3.7.0',
            'scikit-learn=0.20.3'
        ]
    }

    mlflow.sklearn.save_model(clf, path=args.model_path, conda_env=conda_env)

if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, scikit-learn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.7-slim

RUN pip install scikit-learn==0.20.3 mlflow

RUN mkdir -p /app
ADD mlflow_sklearn_iris.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'mlflow-sklean-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "mlflow_sklearn_iris.py", "--model_path", "/mnt/pv/mlflow/sklearn/iris/model"]
output_map = {
    "Dockerfile": "Dockerfile",
    "mlflow_sklearn_iris.py": "mlflow_sklearn_iris.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="mlflow-sklean-iris", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='seldon-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

MLflow을 사용하는 SeldonDeployment 로 배포 하기

SeldonDeployment 생성

SeldonDeployment 매니페스트를 작성합니다. predictor의 구현체를 MLFLOW_SERVER 로 사용합니다. modelUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 seldon-models-pvc 이고 저장 위치가 mlflow/sklearn/iris/model 이므로, pvc://seldon-models-pvc/mlflow/sklearn/iris/model 라고 지정해 줍니다.

mlflow-sklearn-iris.yaml

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: mlflow-sklearn-iris
spec:
  name: mlflow-sklearn-iris
  predictors:
    - graph:
        children: []
        implementation: MLFLOW_SERVER
        modelUri: pvc://seldon-models-pvc/mlflow/sklearn/iris/model
        name: classifier
      name: default
      replicas: 1

SeldonDeployment 를 생성합니다.

다음은 admin 네임스페이스 SeldonDeployment 를 생성하는 예제입니다.

kubectl -n admin apply -f mlflow-sklearn-iris.yaml

생성한 SeldonDeployment를 조회해 보겠습니다.

kubectl -n admin get seldondeployment mlflow-sklearn-iris -o yaml

SeldonDeployment 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  ...
spec:
  ...
status:
  deploymentStatus:
    mlflow-sklearn-iris-default-8c791aa:
      availableReplicas: 1
      replicas: 1
  serviceStatus:
    mlflow-sklearn-iris-mlflow-sklearn-iris-default:
      grpcEndpoint: mlflow-sklearn-iris-mlflow-sklearn-iris-default.admin:5001
      httpEndpoint: mlflow-sklearn-iris-mlflow-sklearn-iris-default.admin:8000
      svcName: mlflow-sklearn-iris-mlflow-sklearn-iris-default
    seldon-d9062e953c9534d009e3dc84a7d3707a:
      httpEndpoint: seldon-d9062e953c9534d009e3dc84a7d3707a.admin:9000
      svcName: seldon-d9062e953c9534d009e3dc84a7d3707a
  state: Available

SeldonDeploymentstateAvailable 이면 예측을 요청 할 수 있습니다.

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

iris-input.json

{
  "data": {
    "ndarray": [
      [6.8,  2.8,  4.8,  1.4],
      [6.0,  3.4,  4.5,  1.6]
    ]
  }
}

다음은 admin 네임스페이스의 sklearn-iris SeldonDeployment 에 예측을 요청하는 예제입니다.

MODEL_NAME=mlflow-sklearn-iris
NAMESPACE=admin

INPUT_PATH=@./iris-input.json
curl -v -H "Content-Type: application/json" http://$CLUSTER_IP/seldon/${NAMESPACE}/${MODEL_NAME}/api/v1.0/predictions -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /seldon/admin/mlflow-sklearn-iris/api/v1.0/predictions HTTP/1.1
> Host: 192.168.21.38:32380
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 96
> 
* upload completely sent off: 96 out of 96 bytes
< HTTP/1.1 200 OK
< x-content-type-options: nosniff
< vary: Origin,Access-Control-Request-Method,Access-Control-Request-Headers
< content-type: application/json;charset=utf-8
< content-length: 261
< date: Thu, 09 Apr 2020 16:02:46 GMT
< x-envoy-upstream-service-time: 100
< server: istio-envoy
< 
{
  "meta": {
    "puid": "geuhtnu2stgad08ngp9c0382oi",
    "tags": {
    },
    "routing": {
    },
    "requestPath": {
      "classifier": "seldonio/mlflowserver_rest:0.2"
    },
    "metrics": []
  },
  "data": {
    "names": [],
    "ndarray": [1, 1]
  }
}

Seldon Core – XGBoost Server

학습이 완료된 XGBoost 모델을 저장 한 경우에는 Seldon의 사전 패키지 된 XGBoost 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • 모델 피클 파일명은 model.bst 이어야 합니다.
  • xgboost v0.82 버전을 사용합니다.

모델 생성

XGBoost 서버를 테스트하려면 먼저 파이썬을 사용하여 간단한 XGBoost 모델을 생성해야 합니다.

Scikit-learn의 기본적인 데이터셋 중의 하나인 아이리스 꽃 데이터를 사용하여, 아이리스 꽃을 분류하는 모델을 작성해 보겠습니다. 모델 피클의 이름 model.bst 이어야 합니다.

import xgboost as xgb
from sklearn.datasets import load_iris

iris = load_iris()
X = iris['data']
y = iris['target']
dtrain = xgb.DMatrix(X, label=y)
param = {'max_depth': 6,
         'eta': 0.1,
         'silent': 1,
         'nthread': 4,
         'num_class': 10,
         'objective': 'multi:softmax'
         }
xgb_model = xgb.train(params=param, dtrain=dtrain)
xgb_model.save_model('model.bst')

생성 된 모델을 사용하여 XGBoost 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 seldon-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

아이리스 꽃을 분류하는 간단한 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

iris.py

import argparse
import os

from joblib import dump
from sklearn import datasets
from sklearn import svm

def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/xgboost/iris/model', type=str)
    args = parser.parse_args()

    if not (os.path.isdir(args.model_path)):
        os.makedirs(args.model_path)

    model_file = os.path.join(args.model_path, 'model.joblib')

    clf = svm.SVC(gamma='scale')
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    clf.fit(X, y)
    dump(clf, model_file)

if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, xgboostscikit-learn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install xgboost==0.82 scikit-learn

RUN mkdir -p /app
ADD xgboost_iris.py.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'xgboost-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "xgboost_iris.py", "--model_path", "/mnt/pv/xgboost/iris/model"]
output_map = {
    "Dockerfile": "Dockerfile",
    "xgboost_iris.py": "xgboost_iris.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="xgboost-iris", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='seldon-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

XGBoost를 사용하는 SeldonDeployment 로 배포 하기

SeldonDeployment 생성

SeldonDeployment 매니페스트를 작성합니다. predictor의 구현체를 XGBOOST_SERVER 로 사용합니다. modelUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 selon-models-pvc 이고 저장 위치가 xgboost/iris/model 이므로, pvc://seldon-models-pvc/xgboost/iris/model 라고 지정해 줍니다.

기본적으로 모델 서버는 로드한 모델의 predict_proba 메소드를 호출합니다. 만약 다른 메소드를 사용하고 싶다면 파라미터로 변경할 수 있습니다. 예를 들어 predict 메소드를 호출하게 하라면, parameters 섹션에 method 란 이름으로 값을 지정해 주면 됩니다. 다음 예제는 predict 메소드를 호출하게 설정하였습니다.

xgboost.yaml

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: xgboost-iris
spec:
  name: xgboost-iris
  predictors:
  - graph:
      children: []
      implementation: XGBOOST_SERVER
      modelUri: pvc://seldon-models-pvc/xgboost/iris/model
      name: classifier
    name: default
    replicas: 1

SeldonDeployment 를 생성합니다.

다음은 admin 네임스페이스 SeldonDeployment 를 생성하는 예제입니다.

kubectl -n admin apply -f xgboost.yaml

생성한 SeldonDeployment를 조회해 보겠습니다.

kubectl -n admin get seldondeployment xgboost-iris -o yaml

SeldonDeployment 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  ...
spec:
  ...
status:
  deploymentStatus:
    xgboost-iris-default-af1783b:
      availableReplicas: 1
      replicas: 1
  serviceStatus:
    xgboost-iris-default-classifier-seldonio-xgboostserver-rest-0-2:
      httpEndpoint: xgboost-iris-default-classifier-seldonio-xgboostserver-rest-0-2.admin:9000
      svcName: xgboost-iris-default-classifier-seldonio-xgboostserver-rest-0-2
    xgboost-iris-xgboost-iris-default:
      grpcEndpoint: xgboost-iris-xgboost-iris-default.admin:5001
      httpEndpoint: xgboost-iris-xgboost-iris-default.admin:8000
      svcName: xgboost-iris-xgboost-iris-default
  state: Available

SeldonDeploymentstateAvailable 이면 예측을 요청 할 수 있습니다.

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

iris-input.json

{
  "data": {
    "ndarray": [
      [6.8,  2.8,  4.8,  1.4],
      [6.0,  3.4,  4.5,  1.6]
    ]
  }
}

다음은 admin 네임스페이스의 sklearn-iris SeldonDeployment 에 예측을 요청하는 예제입니다.

MODEL_NAME=xgboost-iris
NAMESPACE=admin

INPUT_PATH=@./iris-input.json
curl -v -H "Content-Type: application/json" http://$CLUSTER_IP/seldon/${NAMESPACE}/${MODEL_NAME}/api/v1.0/predictions -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /seldon/admin/xgboost-iris/api/v1.0/predictions HTTP/1.1
> Host: 192.168.21.38:32380
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 96
> 
* upload completely sent off: 96 out of 96 bytes
< HTTP/1.1 200 OK
< x-content-type-options: nosniff
< vary: Origin,Access-Control-Request-Method,Access-Control-Request-Headers
< content-type: application/json;charset=utf-8
< content-length: 262
< date: Thu, 09 Apr 2020 14:48:15 GMT
< x-envoy-upstream-service-time: 162
< server: istio-envoy
< 
{
  "meta": {
    "puid": "td0j9lrb8e9gk9620bmqp2u8i4",
    "tags": {
    },
    "routing": {
    },
    "requestPath": {
      "classifier": "seldonio/xgboostserver_rest:0.2"
    },
    "metrics": []
  },
  "data": {
    "names": [],
    "ndarray": [1, 1]
  }
}

Seldon Core – SKLearn Server

학습이 완료된 SKLearn 모델을 피클(pickle)로 저장 한 경우에는 Seldon의 사전 패키지 된 SKLearn 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • 모델 피클은 joblib을 사용하여 저장해야 합니다. 그리고 파일명은 model.joblib 이어야 합니다.
  • 현재 sklearn 0.20.3 버전을 사용합니다. 피클 모델은 이 버전과 호환되어야 합니다.

모델 생성

SKLearn 서버를 사용하기 위해 파이썬을 사용한 간단한 scikit-learn 모델을 생성하겠습니다.

Scikit-learn의 기본적인 데이터셋 중의 하나인 아이리스 꽃 데이터를 사용하여, 아이리스 꽃을 분류하는 모델을 작성해 보겠습니다. 모델 피클은 joblib을 사용하여 저장해야 하고, 파일명은 model.joblib 이어야 합니다.

from joblib import dump
from sklearn import datasets
from sklearn import svm

clf = svm.SVC(gamma='scale')
iris = datasets.load_iris()
X, y = iris.data, iris.target
clf.fit(X, y)
dump(clf, 'model.joblib')

생성 된 모델을 사용하여 scikit-learn 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 seldon-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

아이리스 꽃을 분류하는 간단한 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

iris.py

import argparse
import os

from joblib import dump
from sklearn import datasets
from sklearn import svm

def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/sklearn/iris', type=str)
    args = parser.parse_args()

    if not (os.path.isdir(args.model_path)):
        os.makedirs(args.model_path)

    model_file = os.path.join(args.model_path, 'model.joblib')

    clf = svm.SVC(gamma='scale')
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    clf.fit(X, y)
    dump(clf, model_file)

if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, scikit-learn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install scikit-learn==0.20.3 joblib

RUN mkdir -p /app
ADD iris.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'sklean-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "iris.py", "--model_path", "/mnt/pv/models/sklearn/iris"]
output_map = {
    "Dockerfile": "Dockerfile",
    "iris.py": "iris.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="sklean-iris", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=False, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

SKLearn을 사용하는 SeldonDeployment 로 배포 하기

SeldonDeployment 생성

SeldonDeployment 매니페스트를 작성합니다. predictor의 구현체를 SKLEARN_SERVER 로 사용합니다. modelUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/sklearn/iris 이므로, pvc://kfserving-models-pvc/models/sklearn/iris 라고 지정해 줍니다.

기본적으로 모델 서버는 로드한 모델의 predict_proba 메소드를 호출합니다. 만약 다른 메소드를 사용하고 싶다면 파라미터로 변경할 수 있습니다. 예를 들어 predict 메소드를 호출하게 하라면, parameters 섹션에 method 란 이름으로 값을 지정해 주면 됩니다. 다음 예제는 predict 메소드를 호출하게 설정하였습니다.

sklearn.yaml

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: sklearn-iris
spec:
  name: sklearn-iris
  predictors:
  - graph:
      children: []
      implementation: SKLEARN_SERVER
      modelUri: pvc://seldon-models-pvc/sklearn/iris/model
      name: classifier
      parameters:
        - name: method
          type: STRING
          value: predict
    name: default
    replicas: 1

SeldonDeployment 를 생성합니다.

다음은 admin 네임스페이스 SeldonDeployment 를 생성하는 예제입니다.

kubectl -n admin apply -f sklearn.yaml

생성한 SeldonDeployment를 조회해 보겠습니다.

kubectl -n admin get seldondeployment sklearn-iris -o yaml

SeldonDeployment 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  ...
spec:
  ...
status:
  deploymentStatus:
    sklearn-iris-default-4903e3c:
      availableReplicas: 1
      replicas: 1
  serviceStatus:
    sklearn-iris-default-classifier-seldonio-sklearnserver-rest-0-2:
      httpEndpoint: sklearn-iris-default-classifier-seldonio-sklearnserver-rest-0-2.admin:9000
      svcName: sklearn-iris-default-classifier-seldonio-sklearnserver-rest-0-2
    sklearn-iris-sklearn-iris-default:
      grpcEndpoint: sklearn-iris-sklearn-iris-default.admin:5001
      httpEndpoint: sklearn-iris-sklearn-iris-default.admin:8000
      svcName: sklearn-iris-sklearn-iris-default
  state: Available

SeldonDeploymentstateAvailable 이면 예측을 요청 할 수 있습니다.

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

iris-input.json

{
  "data": {
    "ndarray": [
      [6.8,  2.8,  4.8,  1.4],
      [6.0,  3.4,  4.5,  1.6]
    ]
  }
}

다음은 admin 네임스페이스의 sklearn-iris SeldonDeployment 에 예측을 요청하는 예제입니다.

MODEL_NAME=sklearn-iris
NAMESPACE=admin

INPUT_PATH=@./iris-input.json
curl -v -H "Content-Type: application/json" http://$CLUSTER_IP/seldon/${NAMESPACE}/${MODEL_NAME}/api/v1.0/predictions -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /seldon/admin/sklearn-iris/api/v1.0/predictions HTTP/1.1
> Host: 192.168.21.38:32380
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 96
> 
* upload completely sent off: 96 out of 96 bytes
< HTTP/1.1 200 OK
< x-content-type-options: nosniff
< vary: Origin,Access-Control-Request-Method,Access-Control-Request-Headers
< content-type: application/json;charset=utf-8
< content-length: 262
< date: Thu, 09 Apr 2020 11:28:59 GMT
< x-envoy-upstream-service-time: 13
< server: istio-envoy
< 
{
  "meta": {
    "puid": "qnac0ge2lgb3m069fbk1pf04vq",
    "tags": {
    },
    "routing": {
    },
    "requestPath": {
      "classifier": "seldonio/sklearnserver_rest:0.2"
    },
    "metrics": []
  },
  "data": {
    "names": [],
    "ndarray": [1, 1]
  }
}

Seldon Core – 사전 패키지된 추론 서버들

Kubeflow 에 함께 포함된 Selcon Core 를 이용하여 추론 서버를 배포하는 방법에 대해서 알아 보겠습니다.

Istio IngressGateway에 접근하기

Kubeflow 에 함께 포함된 Selcon Core는 트래픽을 전달하기 위해서 Istio를 사용하고 있습니다. 그래서 추론 서버를 배포할 네임스페이스에 Istio와의 연결 통로인 게이트웨이를 먼저 만들어야합니다.

네임스페이스에 kubeflow-gateway라는 게이트웨이를 만듭니다. 이 게이트웨이를 통해서 요청을 전달 받게 됩니다. 게이트웨이를 정의할 때, selector 를 이용하여 실제 트래픽을 받을 ingressgateway 를 지정해줘야 합니다. 기본적으로는 istio-system 네임스페이스에 있는 istio-ingressgateway 포드를 사용합니다. 그래서 서비스에 정의된 레이블인 istio: ingressgatewayselector 를 통해서 지정하였습니다. 별도의 ingressgateway 를 사용하려면 selector 의 조회 조건을 변경하면 됩니다. 예를 들어 kfserving-ingressgateway를 사용하려면, selectorkfserving: ingressgateway 를 지정하면 됩니다.

kubeflow-gateway.yaml

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: kubeflow-gateway
spec:
  selector:
    istio: ingressgateway
  servers:
  - hosts:
    - '*'
    port:
      name: http
      number: 80
      protocol: HTTP

admin 네임스페이스에 Gateway 를 생성합니다.

kubectl -n admin apply -f kubeflow-gateway.yaml

kfserving-ingressgateway를 조회해 보겠습니다.

다음은 istio-system 네임스페이스에 있는 kfserving-ingressgateway을 조회하는 예제입니다.

kubectl -n istio-system get service kfserving-ingressgateway 

KFServing이 설치된 쿠버네티스 클러스터에 따라 결과가 다르게 나옵니다. 응답 결과에 따른 크게 세가지 방법으로 접근 할 수 있습니다.

  • LoadBalancer 를 통해서 접근하기
  • NodePort를 통해서 접근하기
  • kubectl port-forward를 통해서 접근하기

LoadBalancer

쿠버네티스 클러스터가 LoadBalancer 를 지원하면 다음과 같은 결과를 얻을 수 있습니다. 서비스의 타입이 LoadBalancer 이고, EXTERNAL-IP 에 IP가 할당되어 있습니다. 이럴 경우에는 EXTERNAL-IP 를 통해서 ingressgateway에 접근할 수 있습니다.

NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
kfserving-ingressgateway   LoadBalancer   10.101.141.37   10.201.121.4  15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   2d23h

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다. EXTERNAL-IP 주소를 사용합니다.

CLUSTER_IP=10.201.121.4

NodePort

쿠버네티스 클러스터가 LoadBalancer 를 지원하지 않거나, 서비스의 타입이 NodePort 인 경우 EXTERNAL-IP 의 값이 비어 있습니다. 이럴 경우에는 클러스터의 노드 IP 와 NodePort를 통해서 접근할 수 있습니다.

NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
kfserving-ingressgateway   LoadBalancer   10.101.141.37   <pending>     15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   2d23h

노드 IP는 노드를 조회하면 알 수 있습니다.

다음은 노드를 조회 하는 예제입니다.

kubectl get node -o wide

정상적으로 조회되면 다음과 같은 응답 결과가 나옵니다.

NAME     STATUS   ROLES    AGE   VERSION    INTERNAL-IP     EXTERNAL-IP   OS-IMAGE             KERNEL-VERSION      CONTAINER-RUNTIME
mortar   Ready    master   13d   v1.15.10   192.168.21.38   <none>        Ubuntu 18.04.3 LTS   4.15.0-91-generic   docker://18.9.9

노드가 한 개가 아닌 경우에는 여러개의 노드 정보가 출력됩니다. 해당 노드들 중에서 아무 노드의 INTERNAL-IP 를 사용하면 됩니다.

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다. 노드의 IP 와 80 PORT(80:32380/TCP)의 노드 포트인 32380을 포트로 사용합니다.

CLUSTER_IP=192.168.21.38:32380

port-forward

외부에서 쿠버네티스 클러스터의 서비스에 접근할 수 없는 경우, kubectl 의 port-forward를 사용할 수 있습니다. 접근 하려는 외부 시스템에서 다음 명령어 실행하면 로컬 포트를 경유 해서 쿠버네티스 서비스에 접근할 수 있습니다.

kubectl -n istio-system port-forward svc/kfserving-ingressgateway 8080:80

포트 포워딩이 정상적으로 실행되면, 로컬포트로 ingressgateay 서비스로 접근할 수 있습니다. http://localhost:8080 처럼 선언한 로컬 포트의 주소로 접근하면, 쿠버네티스 ingressgateway 의 80 포트로 포워딩 됩니다.

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다.

CLUSTER_IP=localhost:8080

PVC 생성하기

SeldonDeployment 에 사용할 모델은 PVC에 저장하겠습니다. 만약 클라우드 스토리지와 같은 다른 저장소를 사용하려면, “클라우드 저장소를 이용하여 InfeerneceService 배포와 예측”을 참조하시기 바랍니다.

seldon-models-pvc라는 PVC 매니페스트를 작성합니다.

seldon-models-pvc.yaml

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: seldon-models-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

다음 명령어를 실행하여, admin 네임스페이스에 seldon-models-pvc라는 PVC를 생성하겠습니다.

kubectl -n admin apply seldon-models-pvc.yaml

SKLearn Server

XGBoost Server

MLflow Server

Tensorflow Serving

Seldon Core – 설치

Seldon Core는 Kubeflow의 구성 요소로 포함되어 있습니다. 별도로 설치가 필요 없이 사용할 수 있습니다. 물론 Kubeflow 없이 독립적으로 설치해서 사용할 수도 있습니다.

Seldon Core 설치

Kubeflow와 함께 Seldon Core 설치

Selcon Core은 Kubeflow를 설치할때 기본적으로 설치됩니다. Kubeflow 매니페스트에 Selcon Core를 설치하는 부분이 포함되어 있습니다. Kubeflow와 함께 설치되는 KFServing의 경우 KFServing 컨트롤러는 kubeflow  네임스페이스에 배포됩니다.

Seldon Core를 사용하려면, 모델 서버를 생성할 네임스페이스가 다음과 같은지 확인해야합니다.

  • kubeflow-gateway라는 Istio 게이트웨이 가 있어야 합니다.
  • [serving.kubeflow.org/inferenceservice=enabled](<http://serving.kubeflow.org/inferenceservice=enabled>) 레이블이 추가 되어 있어야 합니다.

Kubeflow의 대시보드나 프로필 컨트롤러(Profile Controller)를 사용하여, 사용자 네임스페이스를 만드는 경우에는 Seldon Core에서 모델을 배포할 수 있도록 serving.kubeflow.org/inferenceservice: enabled 레이블이 자동으로 추가됩니다. 만약 네임스페이스를 직접 생성하는 경우에는 해당 네임스페이스에 serving.kubeflow.org/inferenceservice: enabled 레이블을 추가해야만, Selcon Core를 정상적으로 사용할 수 있습니다.

다음은 my-namespace 네임스페이스에 레이블을 추가하는 예제입니다.

kubectl label namespace my-namespace serving.kubeflow.org/inferenceservice=enabled

Kubeflow와 함게 설치된 Seldon Core는 istio를 사용하고 있습니다. Istio는 Seldon Core가 새롭게 배포한 대상들을 자동으로 연결할 수 있는 수신 게이트웨이를 제공합니다. kubeflow-gateway 라는 Istio 게이트웨이를 사용합니다.

다음은my-namespace라는 네임스페이스에 kubeflow-gateway라는 게이트웨이를 만드는 예제입니다.

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: kubeflow-gateway
  namespace: my-namespace
spec:
  selector:
    istio: ingressgateway
  servers:
  - hosts:
    - '*'
    port:
      name: http
      number: 80
      protocol: HTTP

독립형 Seldon Core 설치

독립형 Seldon Core는 Helm 3를 이용하여 간단히 설치할 수 있습니다.

먼저 Seldon Core를 설치할 네임스페이스를 생성합니다.

kubectl create namespace seldon-system

helm과 seldon-charts를 이용하여 Seldon Core를 설치합니다.

helm install seldon-core seldon-core-operator \\
    --repo <https://storage.googleapis.com/seldon-charts> \\
    --set usageMetrics.enabled=true \\
    --namespace seldon-system \\
    --set istio.enabled=true