TFX – 시카고 택시

TFX의 시카고 택시의 예제를 이용하여 파이프라인을 실행해 보겠습니다.

TFX(TensorFlow Extended)는 프로덕션 머신러닝 파이프라인을 배포하기 위한 엔드 투 엔드 플랫폼으로서, 텐서플로우를 기반으로 하고 있습니다. 머신 러닝 시스템을 실행 하고 모니터링하는 데 필요한 프레임워크와 공통 라이브러리를 제공하고 있습니다.

TFX의 시카고 택시 예제는 엔드 투 엔드 워크플로우와 데이터 분석, 검증 및 변환, 모델 훈련, 성능 분석 및 서비스에 필요한 단계를 보여주고 있습니다. 그래서 워크플로우와 파이프라인을 구성하는데 좋은 참고가 될 수 있습니다.

이 예제에서는 다음과 같은 TFX 컴포넌트를 사용합니다.

  • ExampleGen 입력 데이터 세트의 수집 및 분할.
  • StatisticsGen 데이터 세트에 대한 통계 계산.
  • SchemaGen examines the statistics and creates a data schema.
  • ExampleValidator 통계를 검사하고 데이터 스키마를 생성함.
  • Transform 데이터 세트를 가지고 피처 엔지니어링을 수행.
  • Trainer TensorFlow Estimators 또는 Keras를 사용하여 모델을 학습.
  • Evaluator 학습 결과에 대한 심층 분석 수행.
  • Pusher 모델을 서비스 인프라에 배포.

다음 그림은 컴포넌트 간의 데이터 흐름을 보여 줍니다.

출처 : https://www.tensorflow.org/tfx

TFX 라이브러리

TFX는 라이브러리와 파이프라인 컴포넌트를 모두 포함하고 있습니다. 다음 그림은 TFX 라이브러리와 파이프라인 컴포넌트 사이의 관계를 설명하고 있습니다.

출처 : https://www.tensorflow.org/tfx

TFX는 파이프라인 컴포넌트를 만드는 데 사용할 수 있는 파이썬 라이브러리를 제공하고 있습니다. 이 라이브러리를 사용하여 파이프라인의 컴포넌트를 생성하면, 보다 쉽게 파이프라인을 구성할 수 있으며, 사용자는 사용자 코드를 작성하는데 보다 더 집중할 수 있습니다.

TFX 라이브러리는 다음과 같습니다.

  • TensorFlow Data Validation (TFDV) : TFDV는 머신러닝 데이터를 분석하고 검증하는 것을 도와주는 라이브러리 입니다. 확장성이 뛰어나고 텐서플로우, TFX와 잘 작동하도록 설계되었습니다. TFDV는 다음을 포함하고 있습니다.
    • 학습 및 테스트 데이터의 요약 통계에 대한 계산.
    • 데이터 분산 및 통계를 위한 뷰어와의 통합 및 데이터 세트 쌍(Facets)의 측면 비교.
    • 필요한 값, 범위 및 어휘와 같은 데이터에 대한 기대치를 설명하는 자동화된 데이터 스키마 생성
    • 스키마를 검사하는 데 도움이 되는 스키마 뷰어.
    • 누락된 피처, 범위를 벗어난 값 또는 잘못된 기능 유형과 같은 이상 징후를 식별하기 위한 이상 징후 탐지.
    • 이상 징후 뷰어를 통해 이상 징후가 있는 피처를 확인.
  • TensorFlow Transform (TFT) : TFT는 TensorFlow로 데이터를 사전 처리하는 라이브러리 입니다. TFT는 다음과 같이 전체 데이터를 변환할 때 유용합니다.
    • 평균 및 표준 편차로 입력 값을 정규화.
    • 모든 입력 값에 대한 어휘를 생성하여 문자열을 정수로 변환.
    • 관측된 데이터 분포를 기반으로 소수 데이터를 정수로 변환.
  • TensorFlow : 텐서플로우는 TFX에서 모델을 학습할 때 사용합니다. 학습 데이터와 모델링 코드를 가지고, SavedModel 이라는 결과를 생성합니다.
  • TensorFlow Model Analysis (TFMA) : TFMA는 텐서플로우 모델을 평가하기 위한 라이브러리 입니다. 텐서플로우로 EvalSavedModel을 생성하는데 사용되며, 이는 분석의 기반이 됩니다. 사용자들은 학습에 정의된 것과 동일한 지표를 사용하여, 분산된 방식으로 대량의 데이터에 대한 모델을 평가할 수 있습니다. 이러한 측정지표는 여러 조각의 데이터를 통해 계산할 수 있으며 주피터 노트북에서 시각화할 수 있습니다.
  • TensorFlow Metadata (TFMD) : TFMD는 텐서플로우로 머신러닝 모델을 학습할 때 유용한 메타데이터에 대한 표준 표현을 제공합니다. 메타데이터는 입력 데이터 분석 중에 생성할 수 있으며, 데이터 검증, 탐색 및 변환을 위해 사용될 수 있습니다. 메타데이터 직렬화 형식은 다음과 같습니다.
    • 표 형식의 데이터를 설명하는 스키마 (예 tf.Examples)
    • 데이터 세트에 대한 통계 요약 모음
  • ML Metadata (MLMD) : MLMD는 머신 러닝 개발자 및 데이터 사이언티스트가 워크플로우와 관련된 메타데이터를 기록하고 검색하기 위한 라이브러리입니다. 대부분의 메타데이터는 TFMD 표현을 사용합니다. MLMD는 SQL-Lite, MySQL 같은 데이터 저장소를 사용하여 데이터를 저장합니다.

데이터 세트

예제에서 사용한 데이터 세트는 시카고 시가 발표한 택시 여행 데이터 세트 입니다. 해당 사이트는 애플리케이션에서 데이터를 사용하기 위해, 원본 데이터를 수정한 데이터를 제공하고 있습니다. 원본 소스는 시카고 시의 공식 웹사이트인 www.cityofchicago.org 에서 확인할 수 있습니다.

Kubeflow에서 TFX 파이프라인 실행하기

TFX는 다양한 환경에서 실행할 수 있습니다. 로컬에서도 실행할 수 있고, Airflow 환경에서 실행할 수 있으며, Kubeflow 에서도 실행할 수 있습니다. 예제에서는 Kubeflow 환경에서 시카고 택시 파이프라인을 실행해 볼 것입니다. 만약 다양환 환경에 대해 관심 있으면, “Chicago Taxi Example” 페이지를 참고하시기 바랍니다.

TFX 컴포넌트 사용하기

설정

필요한 패키지를 추가하고, 사용할 데이터를 다운로드하겠습니다.

먼저 주피터 노트북을 생성하겠습니다.

패키지 설치하기

tfx 패키지를 설치합니다.

In []:

!pip install "tfx>=0.21.1,<0.22"  --user

패키지 추가하기

TFX 컴포넌트와 필요한 패키지를 추가합니다.

In []:

import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

라이브러리 버전을 확인해 보겠습니다.

In []:

print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.1.0
TFX version: 0.21.4

파이프라인 경로 설정 이동 or 삭제

In []:

# This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]

# This is the directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')

# This is the path where your model will be pushed for serving.

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

예제 데이터 다운로드

사용할 예제 데이터세트를 다운로드합니다.

In []:

_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = '<https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv>'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

InteractiveContext 작성

노트북에서 대화식으로 TFX 컴포넌트를 실행할 수 있는 InteractiveContext를 생성합니다.

In []:

context = InteractiveContext()

TFX 컴포넌트 실행

노트북 셀에서는 TFX 컴포넌트를 하나씩 작성하고 실행해보겠습니다.

ExampleGen

ExampleGen 컴포넌트는 대부분 TFX 파이프라인의 앞에 위치해 있습니다. 해당 컴포넌트는 다음과 같은 역할을 합니다.

  • 데이터를 학습 및 평가 세트로 분할(기본적으론 학습 세트에 2/3, 평가 세트에 1/3가 할당됩니다.)
  • 데이터를 tf.Example 형식으로 변환
  • 데이터를 다른 컴포넌트에서 접근할 수 있도록 _tfx_root 디렉토리로 복사

ExampleGen 은 데이터 원본의 경로를 입력값으로 사용합니다. 다음 예제의 경우 CSV 파일을 다운르도 받은 _data_root 경로를 입력값으로 사용합니다.

In []:

example_gen = CsvExampleGen(input=external_input(_data_root))
context.run(example_gen)

출력 아티팩트를 살펴보겠습니다. 이 컴포넌트는 학습(train)과 평가(eval) 나누어진 아티팩트를 출력합니다.

In []:

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)
["train", "eval"] /tmp/tfx-interactive-2020-05-07T09_40_15.820980-gawc9si3/CsvExampleGen/examples/1
This cell will be skipped during export to pipeline.

그리고 다음과 같이 학습에 사용될 데이터 세트를 조회해 볼 수 있습니다.

In []:

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)
features {
  feature {
    key: "company"
    value {
      bytes_list {
        value: "Taxi Affiliation Services"
      }
    }
  }
  feature {
    key: "dropoff_census_tract"
    value {
    }
  }
  feature {
    key: "dropoff_community_area"
    value {
    }
  }
  feature {
    key: "dropoff_latitude"
    value {
    }
  }
...

StatisticsGen

StatisticsGen 컴포넌트는 다운스트림 컴포넌트로서, 데이터 분석을 위해 데이터 세트에 대한 통계를 계산합니다. 텐서플로우 데이터 검증 라이브러리(TFDV:TensorFlow Data Validation)를 사용합니다.

StatisticsGen은 앞서 ExampleGen 에서 생성한 데이터 세트를 입력값으로 사용합니다.

In []:

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

StatisticsGen 의 실행이 끝나면, 출력된 통계를 시각화할 수 있습니다.

In []:

context.show(statistics_gen.outputs['statistics'])

‘train’ split:

‘eval’ split:

SchemaGen

SchemaGen 컴포넌트는 데이터 통계를 기반으로 스키마를 생성합니다. 스키마는 데이터 세트에 있는 형상의 예상 한계, 유형 및 속성을 정의합니다. SchemaGen 컴포넌트도 TFDV 라이브러리를 사용합니다.

SchemaGenStatisticsGen으로 생성한 통계를 입력으로 사용합니다.

In []:

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

SchemaGen이 실행이 끝나면, 생성된 스키마를 테이블로 시각화할 수 있습니다.

In []:

context.show(schema_gen.outputs['schema'])

데이터 세트의 각 피쳐는 스키마 테이블의 속성 옆에 행으로 표시합니다. 스키마는 또한 범주형 피처의 해당 영역에 있는 값들을 표시합니다.

ExampleValidator

ExampleValidator 컴포넌트는 스키마에 정의된 기대치를 기반으로, 데이터에서 이상 징후를 탐지합니다. ExampleValidator 컴포넌트도 TFDV 라이브러리를 사용합니다.

ExampleValidatorStatisticsGen의 통계와 SchemaGen의 스키마를 입력값으로 사용합니다.

In []:

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

ExampleValidator 의 실행이 끝나면, 이상 징후를 표로 시각화할 수 있습니다.

In []:

context.show(example_validator.outputs['anomalies'])

이상 징후 표에서, 회사(company) 피처가 학습 데이터 세트에 없는 값들을 가지는 것을 확인할 수 있습니다. 이러한 정보를 이용하여 모델 성능을 디버그하고, 데이터가 시간에 따라 어떻게 발전하는지 이해하며, 데이터 오류를 식별하는 데 사용할 수 있습니다.

Transform

Transform 컴포넌트는 학습과 서빙에 대한 피쳐 엔지니어링을 수행합니다. TensorFlow Transform 라이브러리를 사용합니다.

TransformExampleGen 에서 생성한 데이터와 SchemaGen의 스키마 그리고 사용자가 정의 변환 코드를 입력값으로 사용합니다.

다음은 사용자 정의 변환 코드 입니다. 먼저 피처 엔지니어링을 위하여 몇가지 상수를 정의하겠습니다.

In []:

_taxi_constants_module_file = 'taxi_constants.py'

In []:

%%writefile {_taxi_constants_module_file}

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

그리고 데이터를 입력 받아 변환 작업을 하는 코드를 작성하겠습니다.

In []:

_taxi_transform_module_file = 'taxi_transform.py'

In []:

%%writefile {_taxi_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

이제 변환 작업을 하는 코드를 Transform 컴포넌트에 전달하여, 데이터를 변환하겠습니다.

In []:

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform)

Transform는 두 개의 출력 아티팩트를 생성합니다.

  • transform_graph : transform_graph는 사전 처리 작업을 수행할 수 있는 그래프입니다.
  • transformed_examples : transformed_examples 는 사전 처리된 학습 데이터와 평가 데이터를 나타냅니다.

Trainer

Trainer 컴포넌트는 텐서플로우로 정의한 모델을 학습 시킵니다. TrainerSchemaGen의 스키마와 Transform 에서 변환된 데이터와 그래프, 학습 파라미터 그리고 사용자 정의 모델 코드를 입력값으로 사용합니다.

다음은 사용자 정의 모델 코드입니다.

In []:

_taxi_trainer_module_file = 'taxi_trainer.py'

In []:

%%writefile {_taxi_trainer_module_file}

from typing import List, Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = taxi_constants.MAX_CATEGORICAL_FEATURE_VALUES
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    transformed_features.pop(_transformed_name(_LABEL_KEY))

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  model = _build_keras_model(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

  # This log path might change in the future.
  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

작성한 모델 코드를 Trainer 컴포넌트에 전달하여 모델을 학습시키겠습니다.

In []:

trainer = Trainer(
    module_file=os.path.abspath(_taxi_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

Evaluator

Evaluator 컴포넌트는 평가 데이터 세트를 이용하여 모델 성능 지표를 계산합니다. TFMA(TensorFlow Model Analysis) 라이브러리를 사용합니다. Evaluator 는 새로 학습된 모델이 이전 모델보다 더 나은지 선택적으로 검증할 수 있습니다. 이는 매일 모델을 자동으로 학습하고 검증할 수 있는 생산 파이프라인 설정에서 유용합니다. 현재 노트북에서는 하나의 모델만 학습하므로 Evaluator가 자동으로 모델에 “good”라고 라벨을 붙입니다.

EvaluatorExampleGen 에서 생성한 데이터와 Trainer 가 학습한 모델 그리고 슬라이싱 설정을 입력값으로 사용합니다. 슬라이싱 설정을 통해 피처 값에 대한 메트릭을 슬라이스할 수 있습니다.

In []:

eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

Evaluator 에 설정값을 넘겨서 실행 시키겠습니다.

In []:

# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.

# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)
context.run(evaluator)

Pusher

Pusher 컴포넌트는 일반적으로 TFX 파이프라인의 마지막 단계에 위치해 있습니다. 모델이 유효성 검사를 통과했는지 여부를 확인하고, 합격하면 _serving_model_dir로 모델을 내보냅니다.

In []:

_serving_model_dir = os.path.join(tempfile.mkdtemp(), 'serving_model/taxi_simple')

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

파이프라인 생성하기

앞서 작성한 코드들을 이용하여, TFX 파이프라인을 생성해 보겠습니다. 그리고 KubeflowDagRunner 를 이용하여 TFX 파이프라인을 Kubeflow 파이프라인 패키지로 변환하겠습니다.

모듈 파일 생성하기

TFX 파이프라인 컴포넌트에서 사용할 모듈 파일을 생성하겠습니다. 앞서 작성한 코드를 묶어 하나의 파일로 생성하겠습니다. Transform 컴포넌트에서 사용했던 taxi_constants.pytaxi_transform.py 파일 그리고 Trainer 컴포넌트에서 사용했던 taxi_trainer.py 파일을 합쳐서 taxi_utils.py 파일을 생성하겠습니다.

taxi_utils.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import List, Text

import absl
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

_CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 10

_BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10

_VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
_LABEL_KEY = 'tips'
_FARE_KEY = 'fare'

def _transformed_name(key):
  return key + '_xf'

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    # TODO(b/148082271): Remove this line once TFT 0.22 is used.
    transformed_features.pop(_transformed_name(_LABEL_KEY), None)

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  # TODO(b/144500510): SparseFeatures for feature columns + Keras.
  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]),
        _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

# TFX Trainer will call this function.
def trainer_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model(
        # Construct layers sizes with exponetial decay
        hidden_units=[
            max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
            for i in range(num_dnn_layers)
        ])

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

패키지 추가하기

새로운 주피터 노트북을 생성하고, TFX 컴포넌트와 필요한 패키지를 추가합니다.

In []:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from typing import Text

from kfp import onprem
import tensorflow_model_analysis as tfma

from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input

파이프라인 이름 정의하기

사용할 파이프라인 이름을 지정해 줍니다.

In[]:

_pipeline_name = 'chicago_taxi_pipeline_kubeflow_pvc'

파이프라인에서 사용할 PVC 정보를 지정해 줍니다.

In []:

_persistent_volume_claim = 'chicago-taxi-pvc'
_persistent_volume = 'chicago-taxi-pv'
_persistent_volume_mount = '/mnt'

사용할 경로들을 지정해 줍니다. _data_root 퍼시스턴스 볼륨에 저장된 학습 데이터의 경로이고, _pipeline_root 는 파이프라인에서 사용할 경로입니다.

In []:

# All input and output data are kept in the PV.
_input_base = os.path.join(_persistent_volume_mount, 'tfx')
_data_root = os.path.join(_input_base, 'data')

_output_base = os.path.join(_persistent_volume_mount, 'pipelines')
_tfx_root = os.path.join(_output_base, 'tfx')
_pipeline_root = os.path.join(_tfx_root, _pipeline_name)

TFX 컴포넌트에 추가할 사용자 정의 코드 파일의 경로를 지정합니다. Transform 컴포넌트와 Trainer 컴포넌트에서 사용하는 사용자 정의 코드입니다.

In []:

_module_file = os.path.join(_input_base, 'taxi_utils.py')

학습된 모델을 저장할 위치를 정의합니다. Pusher 컴포넌트에서 해당 위치로 모델을 저장합니다.

In []:

_serving_model_dir = os.path.join(_output_base, _pipeline_name, 'serving_model')

TFX 컴포넌트들을 이용하여 TFX 파이프라인 생성 코드를 작성합니다.

In []:

def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
                     module_file: Text, serving_model_dir: Text,
                     direct_num_workers: int) -> pipeline.Pipeline:
  """Implements the chicago taxi pipeline with TFX and Kubeflow Pipelines."""
  examples = external_input(data_root)

  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input=examples)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)

  # Performs anomaly detection based on statistics and data schema.
  example_validator = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)

  # Uses user-provided Python function that implements a model using TF-Learn
  # to train a model on Google Cloud AI Platform.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      train_args=trainer_pb2.TrainArgs(num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(num_steps=5000))
  

  # Uses TFMA to compute a evaluation statistics over features of a model and
  # perform quality validation of a candidate model (compared to a baseline).
  eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

  # Get the latest blessed model for model validation.
  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)

  # Performs infra validation of a candidate model to prevent unservable model
  # from being pushed. In order to use InfraValidator component, persistent
  # volume and its claim that the pipeline is using should be a ReadWriteMany
  # access mode.
  infra_validator = InfraValidator(
      model=trainer.outputs['model'],
      examples=example_gen.outputs['examples'],
      serving_spec=infra_validator_pb2.ServingSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServing(
              tags=['latest']),
          kubernetes=infra_validator_pb2.KubernetesConfig()),
      request_spec=infra_validator_pb2.RequestSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec())
  )

  # Checks whether the model passed the validation steps and pushes the model
  # to  Google Cloud AI Platform if check passed.
  pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      infra_blessing=infra_validator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          model_resolver,
          evaluator,
          infra_validator,
          pusher,
      ],
      beam_pipeline_args=['--direct_num_workers=%d' % direct_num_workers],
  )

TFX 에서 사용할 메타데이터 저장소의 정보를 지정합니다. Kubeflow의 메타데이터를 사용하겠습니다.

In []:

from tfx.orchestration.kubeflow.proto import kubeflow_pb2

metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
metadata_config.grpc_config.grpc_service_host.value = 'metadata-grpc-service'
metadata_config.grpc_config.grpc_service_port.value = '8080'

KubeflowDagRunner 를 사용하여, 작성한 TFX 파이프라인을 Kubeflow 파이프라인으로 생성합니다. 해당 셀이 실행되면, chicago_taxi_pipeline_kubeflow_pvc.tar.gz 라는 Kubeflow 파이프라인 패키지가 생성됩니다.

In []:

if __name__ == '__main__':
  # This pipeline automatically injects the Kubeflow TFX image if the
  # environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
  # cli tool exports the environment variable to pass to the pipelines.
  tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)

  runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
      kubeflow_metadata_config=metadata_config,
      # Specify custom docker image to use.
      tfx_image=tfx_image,
      pipeline_operator_funcs=(
          [
              onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
                               _persistent_volume_mount)
          ]))

  kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          module_file=_module_file,
          serving_model_dir=_serving_model_dir,
          # 0 means auto-detect based on the number of CPUs available during
          # execution time.
          direct_num_workers=0))

파이프라인 실행하기

PVC 생성하기

파이프라인에서 사용할 PVC를 생성합니다.

chicago-taxi-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: chicago-taxi-pvc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 500Mi
kubectl -n kubeflow apply -f chicago-taxi-pvc.yaml

PV에 파일 복사하기

파이프라인을 실행하기에 앞서, 필요한 파일을 퍼시스턴스 볼륨에 복사하도록 하겠습니다. 학습을 위한 데이터 파일과 사용자 정의 코드가 들어 있는 파이썬 파일을 복사합니다.

퍼시스턴스 볼륨에 파일을 업로드하는 방법은 다양하게 존재하기 때문에, 편하신 방법을 사용하시면 됩니다. 예제에서는 퍼시스턴스 볼륨에 파일을 업로드하기 위해서, “PHP File Manager” 를 사용하였습니다. 파일 매니저를 POD로 실행한 다음, 웹 브라우저를 이용해서 파일을 업로드하겠습니다.

먼저 파일 매니저 POD 매니페스트를 작성합니다. chicago-taxi-pvc 를 이용하여 볼륨을 마운트 합니다.

다음은 파일 매니저 POD의 매니페스트 입니다.

filemanager.yaml

apiVersion: v1
kind: Pod
metadata:
  name: filemanager
spec:
  containers:
  - image: smokserwis/filemanager
    imagePullPolicy: IfNotPresent
    name: filemanager
    ports:
    - containerPort: 80
      name: http
      protocol: TCP
    volumeMounts:
    - mountPath: /var/www/mount
      name: chicago-taxi
  volumes:
  - name: chicago-taxi
    persistentVolumeClaim:
      claimName: chicago-taxi-pvc

다음 명령어를 실행하여 파일 매니저 POD를 생성합니다.

kubectl -n kubeflow apply -f filemanager.yaml

파일 매니저에 접근하기 위하여 포트포워딩을 사용합니다.

kubectl -n kubeflow port-forward filemanager 8080:80

웹 브라우저를 이용하여 “http://localhost:8080” 에 접속하면 다음과 같은 로그인 화면을 볼 수 있습니다. 사용자명과 비밀번호의 기본값은 fm_admin / fm_admin 입니다.

https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv 파일을 다운로드 받아서 /tfx/data/data.csv 경로로 파일을 업로드 합니다.

그리고 앞서 생성한 taxi_utils.py 파일을 /tfx/taxi_utils.py 경로로 업로드 합니다.

포트 포워딩을 종료하고, 파일 매니저를 삭제하겠습니다.

kubectl -n kubeflow delete pod filemanager

파이프라인 실행하기

KFP SDK를 이용하여, 생성한 파이프라인 패키지를 실행해 보겠습니다.

In []:

import kfp

run_result = kfp.Client(
    host=None  # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.
).create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow_pvc.tar.gz', arguments={})

다음은 파이프라인의 화면입니다.