Kubeflow Pipelines – 파이프라인 메트릭

Kubeflow Pipelines는 스칼라 메트릭을 저장하고 시각화해서 보여주는 기능을 제공하고 있습니다. 단순히 메트릭을 정해진 형식대로 로컬 파일에 저장하기만 하면 사용할 수 있습니다. 메트릭을 지정된 형식에 맞게 로컬 파일에 저장하기만 하면, 파이프 라인 에이전트가 로컬 파일을 런타임 메트릭으로 자동 업로드합니다. 업로드 된 메트릭은 Kubeflow Pipelines UI의 실행 페이지에서 조회해 볼 수 있습니다.

메트릭 파일 출력하기

메트릭을 사용하려면 프로그램이 /mlpipeline-metrics.json이라는 파일을 작성해야합니다.

예를 들면 다음과 같습니다.

 accuracy = 0.9712345
 metrics = {
	 'metrics': [{
	   'name': 'accuracy', 
     'numberValue':  float(accuracy), 
     'format': "PERCENTAGE",
   }]
 }
 with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
	 json.dump(metrics, f)

메트릭 파일은 다음과 같은 규칙을 지켜야 합니다.

  • name 은 ^[a-z]([-a-z0-9]{0,62}[a-z0-9])?$ 패턴을 따라야합니다
  • numberValue 숫자 값이어야합니다.
  • format 은 PERCENTAGERAW 을 사용할 수 있고, 설정하지 않을 수 있습니다.

그리고 dsl.ContainerOp()의 output_artifact_paths 에 mlpipeline-metrics 를 추가해줘야 합니다.

dsl.ContainerOp(
    ...
    output_artifact_paths={'mlpipeline-metrics': '/mlpipeline-metrics.json'}
)

파이썬 함수를 이용하는 경량 컴포넌트의 경우 구문이 약간 다릅니다. 경량 컴포넌트에서 메트릭을 출력하려면, 다음과 같이 NamedTuple 을 사용해야 합니다.

def my_function() -> \\
        NamedTuple('output', [('mlpipeline_metrics', 'metrics')]):
    accuracy = 0.9712345
    metrics = {
        'metrics': [{
            'name': 'accuracy',
            'numberValue': float(accuracy),
            'format': "PERCENTAGE",
        }]
    }
    from collections import namedtuple

    output = namedtuple('output', ['mlpipeline_metrics'])
    return output(json.dumps(metrics))

컴포넌트에서 메트릭 파일 출력하기

파이프 라인을 구성하고 실행하기

파이프 라인 메트릭을 출력하는 파이프 라인을 만들어 보겠습니다.

텐서플로우 케라스를 사용한 mnist의 숫자 이미지 판별 모델의 accuracy 값과 loss 값을 메트릭으로 출력해 보겠습니다.

가. 애플리케이션 코드를 작성합니다.

모델 코드는 기존과 거의 동일합니다. 메트릭을 출력하기위해서, json 형태의 메트릭을 파일로 저장하는 코드를 추가합니다. name 은 소문자로 시작해야하며, 소문자와 숫자 그리고 ‘-‘를 사용할 수 있습니다. numberValue 는 숫자 값이어야 합니다. 이 규칙을 지키지 않으면, UI 화면에서 결과를 확인할 수 없습니다.

    metrics = {
        'metrics': [{
            'name': 'accuracy',
            'numberValue': float(accuracy),
            'format': "PERCENTAGE",
        }, {
            'name': 'loss',
            'numberValue': float(loss),
            'format': "RAW",
        }]
    }

    with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
        json.dump(metrics, f)

다음은 애플리케이션의 전체 코드입니다.

mnist-simple.py

from __future__ import absolute_import, division, print_function, unicode_literals

import json

import tensorflow as tf
from tensorflow.python.lib.io import file_io


def train():
    print("TensorFlow version: ", tf.__version__)

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, epochs=5, validation_split=0.2)

    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    loss = results[0]
    accuracy = results[1]
    metrics = {
        'metrics': [{
            'name': 'accuracy',
            'numberValue': float(accuracy),
            'format': "PERCENTAGE",
        }, {
            'name': 'loss',
            'numberValue': float(loss),
            'format': "RAW",
        }]
    }

    with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
        json.dump(metrics, f)


if __name__ == '__main__':
    train()

나. 애플리케이션 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD mnist-simple.py /app/
WORKDIR /app

CMD ["python", "/app/mnist-simple.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/mnist-kfp-metrics:kfp.

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/mnist-kfp-metrics:kfp

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

dsl.ContainerOp(
	name='mnist-kfp-metrics',
  image='kangwoo/mnist-kfp-metrics:kfp',
  output_artifact_paths={'mlpipeline-metrics': '/mlpipeline-metrics.json'}
)

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

@dsl.pipeline(
    name='Pipeline Metrics',
    description='Export and visualize pipeline metrics'
)
def pipeline_metrics_pipeline():
    dsl.ContainerOp(
        name='mnist-kfp-metrics',
        image='kangwoo/mnist-kfp-metrics:kfp',
        output_artifact_paths={'mlpipeline-metrics': '/mlpipeline-metrics.json'}
    )


pipeline_package_path = 'pipeline_metrics_pipeline.zip'
kfp.compiler.Compiler().compile(pipeline_metrics_pipeline, pipeline_package_path)

client = kfp.Client()
my_experiment = client.create_experiment(name='Sample Experiment')
my_run = client.run_pipeline(my_experiment.id, 'pipeline_metrics_pipeline', pipeline_package_path)

경랑 컴포넌트에서 메트릭 파일 출력하기

파이프 라인을 구성하고 실행하기

파이썬 함수를 사용하는 경량 컴포넌트에서 메트릭을 출력하는 방법에 대해서 알아보겠습니다.

메트릭이 저장된 파일 경로를 파이프라인 시스템에 알려주기 위해서 파이썬 함수에서 NamedTuple 을 사용하였습니다. 이 부분을 제외하면 기존과 동일하기 때문에 설명은 생략하겠습니다.

코드의 단순화를 위해서 accuracy 값과 loss 값을 하드 코딩 하였습니다.

pipeline_metrics_fn_pipeline.py

from typing import NamedTuple

import kfp
from kfp.components import func_to_container_op


@func_to_container_op
def train() -> \\
        NamedTuple('output', [('mlpipeline_metrics', 'metrics')]):
    import json
    loss = 0.812345
    accuracy = 0.9712345
    metrics = {
        'metrics': [{
            'name': 'accuracy',
            'numberValue': float(accuracy),
            'format': "PERCENTAGE",
        }, {
            'name': 'loss',
            'numberValue': float(loss),
            'format': "RAW",
        }]
    }
    from collections import namedtuple

    output = namedtuple('output', ['mlpipeline_metrics'])
    return output(json.dumps(metrics))


def pipeline_metrics_fn_pipeline():
    train()


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_metrics_fn_pipeline, arguments=arguments,
                                                        experiment_name='Sample Experiment')

메트릭 보기

시각화된 메트릭을 보려면, Kubeflow Pipelines UI에서 Experiments 페이지를 엽니다. 실험 중 하나를 클릭하면, 실행 페이지가 열리고 메트릭이 실행 목록 테이블에 표시됩니다. 메트릭은 각 실행에 대한 열로 나타납니다.

다음은 실행에 대한 정확도를 보여주는 화면입니다.

해당 실행 단계의 “Run output” 탭에서도 메트릭을 확인 할 수 있습니다.

Kubeflow Pipelines – 경량 파이썬 컴포넌트

파이썬 코드를 이용한 경량 컴포넌트를 사용하기

경량 파이썬 컴포넌트는 생성한 코드를 컨테이너 이미지로 빌드 하지 않아도 됩니다. 그래서 빠르게 반복하여 코드를 변경하는 경우 유용하게 사용할 수 있습니다.

경량 파이썬 컴포넌트

경량 컴포넌트를 빌드하려면, 먼저 독립형 파이썬 함수를 정의해야 합니다. 그런 다음 kfp.components.func_to_container_op()를 호출하여 파이썬 함수를 파이프 라인에서 사용할 수 있는 컴포넌트로 변환해야 합니다

경량 컴포넌트를 만들기 위해서는 몇 가지 제약 사항이 있습니다.

  • 파이썬 함수의 기능은 독립적이어야 합니다.
    • 정의한 함수 외부에서 선언한 코드를 사용해서는 안됩니다.
    • import 는 함수 내부에 선언해야합니다.
  • import 는 기본 이미지에서 사용 가능한 패키지만 가져올 수 있습니다.
    • 사용할 패키지가 기본 이미지에 없는 경우에는, 해당 패키지가 포함된 이미지를 사용해야 합니다.
  • 파이썬 함수의 파리미터로 숫자를 사용하려면, 파라미터에 타입 힌트가 있어야 합니다. 지원되는 타입은 int, float, bool입니다. 모든 파라미터는 문자열로 전달됩니다.
  • 출력값을 여러개 사용하려면, 파이썬의 typing.NamedTuple 타입 힌트를 사용해야합니다.

kfp.components.func_to_container_op(func)

이 함수는 입력 받은 파이썬 함수를 컴포넌트로 변환합니다. 변환한 컴포넌트는 파이프라인에서 사용할 수 있습니다.

def func_to_container_op(func, output_component_file=None, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False):

전달 인자

다음은 func_to_container_op()에서 사용하는 주요 전달 인자입니다.

  • base_image : 컴포넌트에서 사용할 기본 컨테이너 이미지입니다. 경량 컴포넌트의 경우 이미지에는 파이썬 3.5 이상이 설치 되어 있어야합니다. 기본값은 tensorflow/tensorflow:1.13.2-py3 입니다. (선택)
  • output_component_file: 컴포넌트 정의를 로컬 파일에 작성합니다. 이 파일은 공유할 때 사용할 수 있습니다. (선택)
  • packages_to_install: 사용자의 함수를 실행하기 전에 설치할 파이썬 패키지 목록입니다. (선택)

경량 파이썬 컴포넌트 만들기

하나의 값을 출력하는 경량 파이썬 컴포넌트 만들기

먼저 컴포넌트로에서 사용할 파이썬 함수를 작성합니다. 입력된 두 개의 값을 더한 값을 반환하는 간단한 함수입니다.

def add(a: float, b: float) -> float:
    return a + b

func_to_container_op() 호출하여 파이썬 함수를 컴포넌트로 변환합니다. 별도의 컨테이너 이미지를 지정하지 않으면, 기본 이미지인 tensorflow/tensorflow:1.13.2-py3 를 사용합니다.

add_op = comp.func_to_container_op(add)

DSL을 사용하여 파이프 라인을 구성합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 바로 실행하였습니다.

def lightweight_component_pipeline(a='10', b='20'):
    add_task = add_op(a, b)
    print_text(add_task.output)


if __name__ == '__main__':
    arguments = {'a': '1000', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(lightweight_component_pipeline, arguments=arguments, experiment_name='Basic Experiment')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

lightweight_component.py

import kfp
from kfp import dsl
from kfp import components
from kfp.components import func_to_container_op, InputPath, OutputPath


@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')


def add(a: float, b: float) -> float:
    return a + b


add_op = components.func_to_container_op(add)


def lightweight_component_pipeline(a='10', b='20'):
    add_task = add_op(a, b)
    print_text(add_task.output)


if __name__ == '__main__':
    arguments = {'a': '1000', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(lightweight_component_pipeline, arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


여러개의 값을 출력하는 경량 파이썬 컴포넌트 만들기

먼저 컴포넌트로에서 사용할 파이썬 함수를 작성합니다. 입력된 두 개의 값을 더한 값과 곱한 값 두 개를 반환하는 함수입니다. 출력값이 여러개 이기 때문에 파이썬의 typing.NamedTuple 타입 힌트를 사용해야합니다.

def add_multiply_two_numbers(a: float, b: float) \\
        -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
    return (a + b, a * b)

func_to_container_op() 함수를 호출하여 컴포넌트를 로드합니다. 별도의 컨테이너 이미지를 지정하지 않으면, 기본 이미지인 tensorflow/tensorflow:1.13.2-py3 를 사용합니다.

add_multiply_two_numbers_op = comp.func_to_container_op(add_multiply_two_numbers)

DSL을 사용하여 파이프 라인을 구성합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 바로 실행하였습니다.

@dsl.pipeline(
    name='Multiple outputs pipeline',
    description='A pipeline to showcase multiple outputs.'
)
def multiple_outputs_pipeline(a='10', b='20'):
    add_multiply_task = add_multiply_two_numbers_op(a, b)
    print_op('sum={}, product={}'.format(add_multiply_task.outputs['sum'],
                                         add_multiply_task.outputs['product']))


if __name__ == '__main__':
    arguments = {'a': '3', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(multiple_outputs_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

multiple_outputs.py

import kfp
from kfp import dsl
import kfp.components as comp
from typing import NamedTuple


def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


def add_multiply_two_numbers(a: float, b: float) \\
        -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
    return (a + b, a * b)


add_multiply_two_numbers_op = comp.func_to_container_op(add_multiply_two_numbers)


@dsl.pipeline(
    name='Multiple outputs pipeline',
    description='A pipeline to showcase multiple outputs.'
)
def multiple_outputs_pipeline(a='10', b='20'):
    add_multiply_task = add_multiply_two_numbers_op(a, b)
    print_op('sum={}, product={}'.format(add_multiply_task.outputs['sum'],
                                         add_multiply_task.outputs['product']))


if __name__ == '__main__':
    arguments = {'a': '3', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(multiple_outputs_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


경량 파이썬 컴포넌트에서 데이터 입력과 출력

컴포넌트에는 입력 및 출력이 있습니다. 한 작업의 출력값을 다른 작업의 입력값에 전달하여 데이터 입력과 출력을 연결하여 컴포넌트의 작업간에 데이터를 생성하고 소비하는 것입니다. 파이썬 함수를 컴포넌트로 변환하여 사용할 때, 입력 및 출력 데이터를 생성하고 소비하는 방법에 대해서 알아 보겠습니다.

작은 데이터

작은 데이터는 프로그램의 명령 줄 인수로 쉽게 전달할 수있는 데이터입니다. 작은 데이터 크기는 몇 킬로바이트를 초과하지 않아야 합니다. 숫자나, 작은 문자열, 그리고 URL 같은 것을 예로 들 수 있습니다. 작은 리스트, 사전 및 JSON 구조는 사용해도 괜찮지만, 데이터의 크기가 크다면 파일 기반의 데이터 전달 방법을 사용하는 것이 좋습니다.

작은 출력 데이터들은 문자열로 직렬화 됩니다. 그리고 입력 데이터로 전달할때 역직렬화 됩니다. 예를 들어 일반적인 유형인 str, int, float, bool, list, dict 유형들은 내장된 직렬화/역직렬화 변환가 있습니다. 하지만 다른 유형의 데이터를 사용하다면, 수동으로 직렬화 해야합니다. 유형 어노테이션을 올바르게 지정하지 않으면, 자동으로 직렬화 되지 않고 문자열로 전달합니다.

큰 데이터

큰 데이터는 파일을 통해서 전달합니다. 출력 데이터를 파일로 쓰고, 다른 컴포넌트를 파일을 읽어서 입력 데이터로 사용합니다. 입력 및 출력 파일의 경로는 시스템에 의해 결정되며, 함수에는 문자열 형태로 전달됩니다.

파일을 통해서 데이터를 전달하려면, InputPathOutputPath 파라미터 어노테이션을 사용하며 됩니다.

InputPath 파라미터 어노테이션은 함수가 해당 입력 데이터를 파일로 소비하고 싶다고 시스템에 알려줍니다. 시스템은 데이터를 다운로드하여 로컬 파일에 쓴 다음, 해당 파일의 경로(path)를 함수에 전달합니다.

OutputPath 파라미터 어노테이션은 함수가 해당 출력 데이터를 파일로 생성하고 싶다고 시스템에 알려줍니다. 시스템은 함수가 데이터를 출력해야하는 파일의 경로(paht) 준비해서 전달해줍니다. 함수가 종료되면 시스템은 스토리지에 데이터를 업로드하여 다운 스트림 컴포넌트로 전달합니다.

InputPathOutputPath 에 타입을 지정하여 소비/생산 데이터의 유형을 지정할 수 있습니다. 유형은 파이썬 유형이거나, 임의의 문자열일 수 있습니다. OutputPath('TFModel')은 함수가 파일에 쓴 데이터의 유형이 TFModel 임을 나타냅니다. InputPath('TFModel') 은 함수가 파일에서 읽는 데이터의 유형이 TFModel 을 나타냅니다. 파이프라인 작성자가 입력과 출력을 연결하면, 시스템이 해당 유형이 일치하는 확인합니다.

일반적으로 함수가 컴포넌트로 변환될 때 입력 및 출력 이름은 파라미터 이름을 따르지만, _path_file 같은 접미사가 사용된 경우에는 이름에서 제거됩니다. 예를들어 파라미터 가 number_file_path: InputPath(int) 인 경우에는 입력은 number: int 로 바뀌게 됩니다. 이것은 인자 전달을 보다 자연스럽게 보이게 하기 위함입니다.

InputPathOutputPath 사용하기

write_numbers() 함수는 OutputPath 어노테이션을 사용하여, 시스템에게 출력 데이터를 파일로 쓴다는 것을 알려줍니다. 파일을 저장할 경로인 numbers_path 를 시스템에게 전달받아서 생성한 숫자를 파일로 씁니다.

@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 1, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, start + count):
            writer.write(str(i) + '\\n')

sum_multiply_numbers() 함수는 InputPath 어노테이션을 사용하여, 시스템으로 부터 입력 데이터를 파일로 전달 받습니다. 입력 데이터가 저장된 파일의 경로인 numbers_path 를 전달받아서, 데이터를 읽어옵니다. 입력 데이터는 숫자가 저장되어 있습니다. 읽어온 숫자를 모두 더하고, 곱한 후에 OutputPath 어노테이션을 사용하여 파일로 저장합니다.

@func_to_container_op
def sum_multiply_numbers(
        numbers_path: InputPath(str),
        sum_path: OutputPath(str),
        product_path: OutputPath(str)):

    sum = 0
    product = 1
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
            product = product * int(line)
    with open(sum_path, 'w') as writer:
        writer.write(str(sum))
    with open(product_path, 'w') as writer:
        writer.write(str(product))

print_text() 함수는 InputPath 어노테이션을 사용하여, 시스템으로 부터 입력 데이터를 파일로 전달 받아서 화면에 출력합니다.

@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')

컴포넌트들을 사용하여, 파이프라인을 구성합니다. 앞서 얘기 했듯이, 함수가 컴포넌트로 변환될 때 입력 및 출력 이름은 파라미터 이름을 따르지만, _path_file 같은 접미사가 사용된 경우에는 이름에서 제거됩니다. sum_multiply_numbers() 함수에서 출력 파라미터 이름은 sum_pathproduct_path 였지만 출력에서 값을 가지고 오기 위해 사용한 이름은 sumproduct 입니다. 즉 _path 접미사가 이름에서 제거되었습니다.

def python_input_output_pipeline(count='10'):
    numbers_task = write_numbers(count=count)
    sum_multiply_task = sum_multiply_numbers(numbers_task.output)

    print_text(sum_multiply_task.outputs['sum'])
    print_text(sum_multiply_task.outputs['product'])

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

python_input_output_pipeline.py

import kfp
from kfp.components import func_to_container_op, InputPath, OutputPath


@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 1, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, start + count):
            writer.write(str(i) + '\\n')


@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')


@func_to_container_op
def sum_multiply_numbers(
        numbers_path: InputPath(str),
        sum_path: OutputPath(str),
        product_path: OutputPath(str)):

    sum = 0
    product = 1
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
            product = product * int(line)
    with open(sum_path, 'w') as writer:
        writer.write(str(sum))
    with open(product_path, 'w') as writer:
        writer.write(str(product))


def python_input_output_pipeline(count='10'):
    numbers_task = write_numbers(count=count)
    sum_multiply_task = sum_multiply_numbers(numbers_task.output)

    print_text(sum_multiply_task.outputs['sum'])
    print_text(sum_multiply_task.outputs['product'])


~~i~~f __name__ == '__main__':
    arguments = {'count': '10'}
    my_run = kfp.Client().create_run_from_pipeline_func(python_input_output_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “python_input_output_pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Kubeflow Pipelines – 재사용 가능한 컴포넌트

다양한 파이프 라인에서 재사용할 수 있는 컴포넌트에 대해서 알아 보겠습니다. 그리고 재사용 가능한 컴포넌트를 만들기 위한 컴포넌트 프로그램을 작성하는 방법과 컴포넌트를 정의하는 파일에 대해서 알아 보도록 하겠습니다.

재사용 가능한 컴포넌트 이해하기

재사용 가능한 컴포넌트를 만들면, 다양한 파이프라인에서 쉽게 가져다 사용할 수 있습니다. 재사용 가능한 컴포넌트를 만들기 위해서는 먼저 컴포넌트에서 사용할 프로그램을 작성하고 컨테이너화 합니다. 그리고 Kubeflow Pipelines 시스템에서 사용하는 컴포넌트 스펙을 정의하는 파일을 YAML 형식으로 작성하면 됩니다.

재사용 컴포넌트 만드는 단계

다음은 재사용 컴포넌트를 만드는 단계를 요약 한 것입니다.

  • 컴포넌트에서 사용할 프로그램을 작성하십시오. 프로그램은 파일이나 명령 행 인수를 사용하여 다른 컴퍼넌트와 데이터를 주고 받을 수 있습니다.
  • 프로그램을 컨테이너화 하십시오.
  • Kubeflow Pipelines 시스템의 컴포넌트를 설명하는 컴포넌트 스펙을 YAML 형식으로 작성하십시오.
  • Kubeflow Pipelines SDK 를 사용하여 파이프라인에서 사용할 컴포넌트를 로드하고, 파이프라인을 실행하십시오.

컴포넌트 프로그램

컴포넌트에서 사용할 프로그램을 작성해야 합니다. 합니다. 프로그램은 다른 컴포넌트로부터 데이터를 받기 위해서, 파일이나 명령행 인수를 사용해야 합니다.

컴포넌트 컨테이너화

컴포넌트를 만들기 위해서는 작성한 프로그램을 컨테이너 이미지로 패키지해야 합니다. 컨테이너화 된 컴포넌트는 파이프라인에서 독립적으로 실행됩니다. 컴포넌트는 동일한 프로세스에서 실행되지 않기 때문에, 메모리를 사용하여 컴포넌트간에 데이터를 직접 공유 할 수 없습니다. 그래서, 데이터를 네트워크를 통해 이동할 수 있도록 전달하는 데이터를 직렬화 해야합니다. 그러면, 다운스트림 컴포넌트에서 데이터를 역직렬화 해서 사용할 수 있습니다.

컴포넌트 스펙

Kubeflow Pipelines 시스템은 컴포넌트의 데이터 모델을 정의하기 위해서 YAML 형식의 파일을 사용합니다.

다음은 컴포넌트 정의의 주요 부분입니다.

  • Metadata : 이름, 설명등의 메타데이터를 정의합니다.
  • Interface : 입력과 출력을 위한 값들의 이름, 유형, 기본값등을 정의합니다.
  • Implementation : 제공된 컴포넌트 입력 값들을 가지고 컴포넌트를 실행하는 방법을 정의합니다. 컴포넌트의 실행이 완료되면 출력 값을 얻는 방법도 정의해야합니다.

다음은 컨퍼넌트 스펙을 정의한 YAML 파일입니다.

name: Keras - Train classifier
description: Trains classifier using Keras sequential model
inputs:
  - {name: training_set_features_path, type: {GcsPath: {data_type: TSV}}, description: 'Local or GCS path to the training set features table.'}
  - {name: training_set_labels_path, type: {GcsPath: {data_type: TSV}}, description: 'Local or GCS path to the training set labels (each label is a class index from 0 to num-classes - 1).'}
  - {name: output_model_uri, type: {GcsPath: {data_type: Keras model}}, description: 'Local or GCS path specifying where to save the trained model. The model (topology + weights + optimizer state) is saved in HDF5 format and can be loaded back by calling keras.models.load_model'} #Remove GcsUri and move to outputs once artifact passing support is checked in.
  - {name: model_config, type: {GcsPath: {data_type: Keras model config json}}, description: 'JSON string containing the serialized model structure. Can be obtained by calling model.to_json() on a Keras model.'}
  - {name: number_of_classes, type: Integer, description: 'Number of classifier classes.'}
  - {name: number_of_epochs, type: Integer, default: '100', description: 'Number of epochs to train the model. An epoch is an iteration over the entire `x` and `y` data provided.'}
  - {name: batch_size, type: Integer, default: '32', description: 'Number of samples per gradient update.'}
outputs:
  - {name: output_model_uri, type: {GcsPath: {data_type: Keras model}}, description: 'GCS path where the trained model has been saved. The model (topology + weights + optimizer state) is saved in HDF5 format and can be loaded back by calling keras.models.load_model'} #Remove GcsUri and make it a proper output once artifact passing support is checked in.
implementation:
  container:
    image: gcr.io/ml-pipeline/sample/keras/train_classifier
    command: [python3, /pipelines/component/src/train.py]
    args: [
      --training-set-features-path, {inputValue: training_set_features_path},
      --training-set-labels-path, {inputValue: training_set_labels_path},
      --output-model-path, {inputValue: output_model_uri},
      --model-config-json, {inputValue: model_config},
      --num-classes, {inputValue: number_of_classes},
      --num-epochs, {inputValue: number_of_epochs},
      --batch-size, {inputValue: batch_size},

      --output-model-path-file, {outputPath: output_model_uri},
    ]

컴포넌트 사용하기

Kubeflow Pipelines SDK를 이용하여 컴포넌트를 로드하면, 파이프 라인에서 컴포넌트를 사용할 수 있습니다.

#Load the component
train_op = comp.load_component(url='<https://raw.githubusercontent.com/Ark-kun/pipelines/Added-sample-component/components/sample/keras/train_classifier/component.yaml>')

#Use the component as part of the pipeline
def pipeline():
    train_task = train_op(
        training_set_features_path=os.path.join(testdata_root, 'training_set_features.tsv'),
        training_set_labels_path=os.path.join(testdata_root, 'training_set_labels.tsv'),
        output_model_uri=os.path.join(temp_dir_name, 'outputs/output_model/data'),
        model_config=Path(testdata_root).joinpath('model_config.json').read_text(),
        number_of_classes=2,
        number_of_epochs=10,
        batch_size=32,
    )

컴포넌트간에 데이터 전달하기

컴포넌트의 개념은 함수의 개념과 매우 유사합니다. 모든 컴포넌트는 입력 및 출력을 가질 수 있습니다. 컴포넌트 코드는 입력으로 전달 된 데이터를 가져 와서 출력용 데이터를 생성합니다. 파이프 라인은 실행된 컴포넌트의 출력을, 다른 컴포넌트의 입력으로 데이터를 전달하여 컴포넌트의 데이터를 서로 공유합니다. 이는 함수가 다른 함수를 호출하고 그 결과를 전달하는 방법과 매우 유사합니다. 파이프 라인 시스템은 실제 데이터 전달을 처리하는 반면 컴포넌트는 입력 데이터를 소비하고 출력 데이터를 생성합니다.

컨테이너화 된 프로그램들간의 데이터 전달

컴포넌트를 작성할 때 컴포넌트가 업스트림 및 다운 스트림 컴포넌트와 통신하는 방법에 대해 생각해야합니다. 즉, 입력 데이터를 소비하고 출력 데이터를 생성하는 방법입니다.

데이터 생성

데이터를 출력하려면 컴포넌트의 프로그램이 출력 데이터를 특정 위치에 기록하여, 시스템이 다른 컴포넌트에게 데이터를 전달할 수 있도록 해야합니다. 즉 프로그램은 데이터를 저장하는 경로를 하드 코딩해서는 안됩니다. 프로그램은 출력 데이터 경로를 명령행 인수로 받아서 처리해야합니다.

외부 시스템에서 데이터 생성

일부 시나리오에서는 컴포넌트의 출력 데이터를 외부 서비스에 저장할 수도 있습니다. 이 경우에 컴포넌트의 프로그램은 데이터를 외부 서비스에 저장한 후, 해당 데이터의 위치 식별자 정보를 출력해야합니다. 그래야 다른 컴포넌트에서 데이터를 가져올 수 있기 때문입니다. 이 경우에는 데이터가 Kubeflow Pipilines 시스템에 보관되지 않기 때문에, 파이프 라인 시스템은 데이터에 대한 일관 성 및 재현성을 보장 할 수 없습니다.

데이터 소비

명령행 프로그램이 데이터를 소비하는 방법은 일반적으로 두 가지가 있습니다.

  • 데이터가 작을 경우에는 보통 명령행 인수로 바로 전달합니다. program.py --param 100
  • 데이터 크거나, 이진 데이터일 경우에는 파일로 저장한 후, 파일 경로를 명령행 인수로 전달합니다. 파일로 저장한 데이터를 다른 컴포넌트에게 전달하려면, Kubeflow Pipelines 시스템이 이 정보를 알고 있어야 합니다.

재사용 가능한 컴포넌트 만들기

컴포넌트 파일 구성

다음은 Kubeflow Pipielins에서 권장하는, 컴포넌트 파일의 구성입니다. 반드시 이러한 방식으로 파일을 구성할 필요는 없습니다. 하지만 표준을 정해 놓으면, 이미지 작성, 테스트에 동일한 스크립트를 재사용할 수 있는 장정이 있습니다.

components/<component group>/<component name>/

    src/*            #Component source code files
    tests/*          #Unit tests
    run_tests.sh     #Small script that runs the tests
    README.md        #Documentation. Move to docs/ if multiple files needed

    Dockerfile       #Dockerfile to build the component container image
    build_image.sh   #Small script that runs docker build and docker push

    component.yaml   #Component definition in YAML format

프로그램 코드 작성

두 개의 입력 데이터와 하나의 출력 데이터가 있는 프로그램을 작성해 보겠습니다. 두 개의 입력 데이터는 명령행 인수로 바로 전달하는 작은 데이터와, 파일 경로를 명령행 인수로 전달하는 큰 데이테를 사용합니다. 이 예제는 파이썬 3 으로 작성되었습니다.

program.py

#!/usr/bin/env python3
import argparse
from pathlib import Path

def do_work(input1_file, output1_file, param1):
  for x in range(param1):
    line = next(input1_file)
    if not line:
      break
    _ = output1_file.write(line)

# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='Program description')
parser.add_argument('--input-path', type=str, help='Path of the local file containing the Input data.')
parser.add_argument('--param', type=int, default=100, help='Parameter.')
parser.add_argument('--output-path', type=str, help='Path of the local file where the Output data should be written.')
args = parser.parse_args()

Path(args.output1_path).parent.mkdir(parents=True, exist_ok=True)

with open(args.input1_path, 'r') as input1_file:
    with open(args.output1_path, 'w') as output1_file:
        do_work(input1_file, output1_file, args.param1)

이 프로그램의 명령행 호출은 다음과 같습니다

python3 program.py --input1-path <local file path to Input 1 data> \\
                   --param1 <value of Param1 input> \\
                   --output1-path <local file path for the Output 1 data>

Dockerfile을 작성하고, 프로그램을 컨테이너화하기

컨테이너 이미지를 만들기 위해서 Dockerfile 을 생성합니다. 파이썬 코드를 사용하고 있으므로, 베이스 이미지를 파이썬으로 하였습니다.

Dockerfile

FROM python:3.6

COPY ./src /pipelines/component/src

Dockerfile을 기반으로 컨테이너 이미지를 빌드하고, 해당 이미지를 컨테이너 이미지 레지스트리로 푸시하기 위해서 build_image.sh  스크립트를 작성하였습니다.

build_image.sh

#!/bin/bash -e
image_name=kangwoo/kfp-component # Specify the image name here
image_tag=latest
full_image_name=${image_name}:${image_tag}

cd "$(dirname "$0")"
docker build -t "${full_image_name}" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
docker inspect --format="{{index .RepoDigests 0}}" "${full_image_name}"

스크립트를 실행 가능하게 만듭니다.

chmod +x build_image.sh

build_image.sh  스크립트를 실행하면, 컨테이너 이미지가 빌드되고, 지정한 컨테이너 이미지 레시트리로 이미지가 푸시됩니다.

Sending build context to Docker daemon  7.168kB
Step 1/2 : FROM python:3.6
 ---> 1daf62e8cab5
Step 2/2 : COPY ./src /pipelines/component/src
 ---> Using cache
 ---> 2bc266c5c9d8
Successfully built 2bc266c5c9d8
Successfully tagged kangwoo/kfp-component:latest
The push refers to repository [docker.io/kangwoo/kfp-component]
cce013c10a7c: Preparing 
cce013c10a7c: Layer already exists 
... 
latest: digest: sha256:30ea205b7cb1253a36f82f1ec99f0eec87cadd95938317ee3c802f2b78cec368 size: 2424
kangwoo/kfp-component@sha256:30ea205b7cb1253a36f82f1ec99f0eec87cadd95938317ee3c802f2b78cec368

컴포넌트 정의 파일 작성

To create a component from your containerized program you need to write component specification in YAML format that describes the component for the Kubeflow Pipelines system.

컨테이너화 된 프로그램을 이용해서 컴포넌트로 만들려면, Kubeflow Pipelines 시스템에서 사용하는 YAML 형식의 컴포넌트 스펙을 작성해야합니다.

component.yaml파일을 생성하고, 컴포넌트의 구현(implementation) 섹션에서 사용할 컨테이너 이미지를 지정합니다. 그리고 명령(command) 섹션에서 컨테이에 포함된 프로그램을 실행하기 위해 명령을 지정합니다.

implementation:
  container:
    image: kangwoo/kfp-component:latest
    command: [
      python3, /pipelines/component/src/program.py,
      --input-path,  {inputPath:  input_1},
      --param,       {inputValue: parameter_1},
      --output-path, {outputPath: output_1},
    ]

command 섹션에는 앵글 괄호로 표현되는 플레이스홀더가 포함되어 있습니다. 플레이스홀더는 프로그램이 실행 되기 전에 특정 값 또는 경로로 대체됩니다. component.yaml 에서는 매핑 구문을 사용하여 플레이스홀더를 지정할 수 있습니다.

사용 가능한 플레이스홀더는 세 가지가 있습니다.

  • {inputValue: Some input name} : 이 플레이스홀더는 지정한 입력을 인수의 값으로 대체됩니다. 작은 데이터에 유용합니다.
  • {inputPath: Some input name} : 이 플레이스홀더는 입력 데이터를 컴포넌트로 전달하기 위해서 자동 생성된 로컬 파일의 경로로 대체됩니다. 즉, 파이프라인 시스템이 입력 인수 데이터를 파일로 쓰고, 해당 데이터 파일의 경로를 컴포넌트 프로그램에 전달하게 되는 것입니다.
  • {outputPath: Some output name}: 이 플레이스홀더는 프로그램이 출력 데이터를 저장해야 하는 자동 생성된 로컬 파일 경로로 대체됩니다.

명령행에 플레이스 홀더를 배치하는 것 외에도, 입력(inputs) 및 출력(outputs) 섹션에 해당 입력 및 출력 스펙을 추가해야합니다. 입력/출력 스펙에는 이름, 유형, 설명 및 기본값이 포함되어 있습니다. 이중에서 이름(name)은 반드시 포함되어야합니다. 입력/출력에 사용하는 이름은 자유로운 형식의 문자열이지만 YAML 문법을 따라야 합니다.

inputs:
- {name: input_1, type: String, description: 'Data for Input 1'}
- {name: parameter_1, type: Integer, default: '1', description: 'Parameter 1 description'}
outputs:
- {name: output_1, description: 'Output 1 data'}

컴포넌트의 이름과 설명 같은 메타 데이터를 추가합니다.

name: Multiply component
description: Multiplication.

component.yaml

name: Multiply component
description: Multiplication.
inputs:
- {name: input_1, type: String, description: 'Data for Input 1'}
- {name: parameter_1, type: Integer, default: '1', description: 'Parameter 1 description'}
outputs:
- {name: output_1, description: 'Output 1 data'}
implementation:
  container:
    image: kangwoo/kfp-component:latest
    command: [
      python3, /pipelines/component/src/program.py,
      --input-path,  {inputPath:  input_1},
      --param,       {inputValue: parameter_1},
      --output-path, {outputPath: output_1},
    ]

Kubeflow Pipelines SDK를 사용하여 파이프 라인에서 컴포넌트 사용하기

컴포넌트를 로드하고, 이를 사용하여 파이프 라인을 구성하는 방법을 알아보겠습니다. 예제 파이프라인은 3단계로 이루어져 있습니다.

1단계 number_op()는 1-10까지의 숫자를 출력합니다. 2단계인 multiply_op()는 출력된 숫자를 입력으로 받아서 곱하기 연산을 하고 그 결과를 출력합니다. 그리고 3단계인 print_op()는 출력된 곱하기 결과를 화면에 출력합니다.

2단계인 multiply_op()에서 직접 만든 재사용 컹포넌트를 사용하겠습니다.

1단계에 사용하는 number_op()는 1-10까지의 숫자를 출력합니다.

def number_op():
    return dsl.ContainerOp(
        name='Generate numbers',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "print(\\'1\\\\n2\\\\n3\\\\n4\\\\n5\\\\n6\\\\n7\\\\n8\\\\n9\\\\n10\\')" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

3단계에서 사용하는 print_op()는 입력된 결과를 화면에 출력합니다

def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

load_component_from_file 호출하여 컴포넌트를 로드합니다. 만약 파일이 아니라 URL을 사용한다면 load_component_from_url을 대신 사용할 수 있습니다. 컴포넌트를 로드하기 위해서는 component.yaml 파일에 접근할 수 있으면 됩니다. 그리고 파이프 라인이 실행되는 쿠버네티스 클러스터는 컴포넌트에 정의된 컨테이너 이미지에 접근할 수 있어야합니다.

component_root = './multiply'
multiply_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml'))
# multiply_op = kfp.components.load_component_from_url('https://....../component.yaml')

DSL을 사용하여 파이프 라인을 구성합니다. load_component_from_file을 호출하여 로드한 multiply_op 컴포넌트를 사용합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='My multiply component pipeline',
    description='A pipeline with my component.'
)
def multiply_pipeline():
    numbers = number_op()
    multiply_task = multiply_op(
        input_1=numbers.output,
        parameter_1='6',
    )
    print_op(multiply_task.outputs['output_1'])

multiply_op()의 출력은 multiply_task.outputs [ ‘output_1’] 형식으로 사용할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

multiply_pipeline.py

import os
import kfp
from kfp import dsl



def number_op():
    return dsl.ContainerOp(
        name='Generate numbers',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "print(\\'1\\\\n2\\\\n3\\\\n4\\\\n5\\\\n6\\\\n7\\\\n8\\\\n9\\\\n10\\')" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


component_root = './multiply'
multiply_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml'))
# multiply_op = kfp.components.load_component_from_url('https://....../component.yaml')

@dsl.pipeline(
    name='My multiply component pipeline',
    description='A pipeline with my component.'
)
def multiply_pipeline():
    numbers = number_op()
    multiply_task = multiply_op(
        input_1=numbers.output,
        parameter_1='6',
    )
    print_op(multiply_task.outputs['output_1'])


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(multiply_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Multiply component pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Kubeflow Pipelines – DSL 이해하기 #2

Retry를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

Retry를 사용하는 파이프 라인을 만들어 보겠습니다. Retry를 사용하면 작업이 실패로 끝났을 때, 재시도 할 수 있도록 할 수 있습니다. 재시도 횟수는 사용자가 설정할 수 있습니다.

입력받은 시스템 종료 코드들 중에서 하나를 랜덤으로 선택해서 반환합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

DSL을 사용하여 파이프 라인을 구성합니다. set_retry() 사용해서, 해당 작업이 실패했을 경우 재시작 하도록 설정하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Retry random failures',
    description='A pipeline with retry.'
)
def retry_pipeline():
    random_exit_op('0,1,2,3').set_retry(10)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

retry.py

import kfp
from kfp import dsl


def random_exit_op(exit_codes):
    return dsl.ContainerOp(
        name='random_failure',
        image='python:alpine3.6',
        command=['python', '-c'],
        arguments=['import random; import sys; exit_code = int(random.choice(sys.argv[1].split(","))); print(exit_code); sys.exit(exit_code)', exit_codes]
    )


@dsl.pipeline(
    name='Retry random failures',
    description='A pipeline with retry.'
)
def retry_pipeline():
    random_exit_op('0,1,2,3').set_retry(10)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(retry_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Retry pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Retry pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


쿠버네티스 리소스를 사용하는 파이프 라인 만들기

쿠버네티스 리소스를 사용하는 파이프 라인을 만들어 보겠습니다. 사용자는 쿠버네티스의 모든 리소스를 사용할 수 있습니다.

kfp.dsl.ResourceOp

이 클래스는 쿠버네티스 리소스를 사용하는 단계를 나타냅니다. Argo의 리소스 템플릿을 구현하고 있습니다.

classkfp.dsl.ResourceOp(k8s_resource=None, action: str = 'create', merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs)

사용자는 이 기능을 이용하여 쿠버네티스 리소스의 일부 작업(get, create, apply, delete, replace, place)을 수행할 수 있습니다. 그리고 해당 작업을 수행하는 단계의 성공 또는 실패 했는지를 조건을 통해서 설정할 수 있습니다.

전달 인자

다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.

  • k8s_resource: 쿠버네티시 리소스를 정의한 것입니다. (필수)
  • action: 수행할 작업의 종류입니다. 기본값은 create 입니다.
  • merge_strategy: 수행할 작업의 종류가 patch 일 때 사용할 병합 전략입니다. (선택)
  • success_condition: 단계의 성공을 판별하는 조건입니다.(선택)
  • failure_condition: 단계의 실패를 판별하는 조건입니다. (선택)
  • attribute_outputs: [kfp.dsl.ContainerOp](<https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp>)file_outputs 과 비슷합니다 . 출력 파리미터 이름을 쿠버네티스 객체의 JSON 경로에 매핑합니다.

출력

ResourceOps는 출력 파라미터를 생성 할 수 있습니다. 사용하는 쿠버네티스 리소스의 필드 값을 출력 할 수 있습니다. 예를 들면 다음과 같습니다.

job = kubernetes_client.V1Job(...)

rop = kfp.dsl.ResourceOp(
    name="create-job",
    k8s_resource=job,
    action="create",
    attribute_outputs={"name": "{.metadata.name}"}
)

기본적으로 ResourceOps는 리소스 이름과 리소스 사양을 출력합니다.

파이프 라인을 구성하고 실행하기

생성할 리소스의 매니페스트를 정의 하였습니다.

_job_manifest = """
{
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
        "generateName": "kfp"
    },
    "spec": {
        "template": {
            "metadata": {
                "name": "resource-pipeline"
            },
            "spec": {
                "containers": [{
                    "name": "mnist",
                    "image": "kangwoo/mnist-simple:job",
                    "command": ["python", "/app/mnist-simple.py"]
                }],
                "restartPolicy": "Never"
            }
        }   
    }
}
"""

DSL을 사용하여 파이프 라인을 구성합니다. dsl.ResourceOp() 사용해서, 쿠버네티스 리소소스를 생성하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Kubernetes Resource',
    description='A pipeline with resource.'
)
def resource_pipeline():
    op = dsl.ResourceOp(
        name='resource-job',
        k8s_resource=json.loads(_job_manifest),
        action='create'
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

resource.py

import kfp
from kfp import dsl
import json


_job_manifest = """
{
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
        "generateName": "kfp"
    },
    "spec": {
        "template": {
            "metadata": {
                "name": "resource-pipeline"
            },
            "spec": {
                "containers": [{
                    "name": "mnist",
                    "image": "kangwoo/mnist-simple:job",
                    "command": ["python", "/app/mnist-simple.py"]
                }],
                "restartPolicy": "Never"
            }
        }   
    }
}
"""

@dsl.pipeline(
    name='Kubernetes Resource',
    description='A pipeline with resource.'
)
def resource_pipeline():
    op = dsl.ResourceOp(
        name='resource-job',
        k8s_resource=json.loads(_job_manifest),
        action='create'
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(resource_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Resource pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Resource pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


퍼시스턴스 볼륨를 사용하는 파이프 라인 만들기

쿠버네티스 퍼시스턴스 볼륨을 사용하는 파이프 라인을 만들어 보겠습니다. dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.

dsl.VolumeOp

VolumeOp 클래스는 ResourceOp 에서 확장되었습니다. 이 클래스를 사용하면 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.

전달 인자

다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.

  • resource_name: 생성할 리소스에 사용할 이름입니다. 이 문자열 앞에 워크플로우 이름이 붙습니다. (필수)
  • size: 요청할 볼륨의 크기입니다. (필수)
  • storage_class: 사용할 스토리지 클래스입니다. (선택)
  • modes: 퍼시스턴스 볼륨의 접근 모드( accessModes) 입니다.기본 값을  VOLUME_MODE_RWM 입니다.
    • VOLUME_MODE_RWO["ReadWriteOnce"]
    • VOLUME_MODE_RWM["ReadWriteMany"]
    • VOLUME_MODE_ROM["ReadOnlyMany"]

출력

쿠버네티스 리소스의 이름과 사양 이외에도 바인딩된 퍼시스턴스 볼륨의 스토리지 크기를 step.outputs [“size”] 로 출력합니다. 하지만 스토리지 제공자가 WaitForFirstConsumer 바인딩 모드를 지원하는 경우, 비어 있을 수 있습니다. 이 값은 비어 있지 않으면, 항상 요청된 크기보다 크거나 같습니다.

파이프 라인을 구성하고 실행하기

dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 생성할 수 있습니다.

vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

DSL을 사용하여 파이프 라인을 구성합니다. dsl.dsl.ContainerOp() 의 pvolumes 파라미터를 이용하여 볼륨을 마운트 할 수 있습니다.

step1 에서는 dsl.VolumeOp()으로 생성한 볼륨을 마운트하였고, step2에서는 step1에 마운트된 볼륨을 그대로 다시 마운트 하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Volume pipeline',
    description='A pipeline with volume.'
)
def volume_pipeline():
    vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    step1 = dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                   'else \\'tails\\'; print(result)" | tee /data/output'],
        pvolumes={"/data": vop.volume}
    )

    step2 = dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['cat', '/data/output'],
        pvolumes={"/data": step1.pvolume}
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

volume.py

import kfp
from kfp import dsl


@dsl.pipeline(
    name='Volume pipeline',
    description='A pipeline with volume.'
)
def volume_pipeline():
    vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    step1 = dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                   'else \\'tails\\'; print(result)" | tee /data/output'],
        pvolumes={"/data": vop.volume}
    )

    step2 = dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['cat', '/data/output'],
        pvolumes={"/data": step1.pvolume}
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(volume_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Volume pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Volume pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


파이프라인에 환경 변수 사용하기

Kubeflow piepelins 에서 환경 변수를 설정하고 사용하는 방법에 대해서 알아 보겠습니다. 컴포넌트에서 환경 변수를 설정하려면, dsl.ContainerOp()의 add_env_variable() 메소드를 사용하면 됩니다. kubernetes.client.models 패키지에 있는 V1EnvVar 인스턴스를 생성한 후, add_env_variable() 메소드를 이용하여 환경 변수를 컴포넌트에 추가할 수 있습니다.

파이프 라인을 구성하고 실행하기

생성할 리소스의 매니페스트를 정의 하였습니다.

import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar

@dsl.pipeline(
  name='Env example',
  description='A pipline showing how to use environment variables'
)
def environment_pipeline():
  env_var = V1EnvVar(name='example_env', value='env_variable')
 
  container_op = logg_env_function_op().add_env_variable(env_var)

더 많은 환경 변수를 컴포넌트에 전달하려면 add_env_variable () 더 추가하면 됩니다.

컴포넌트에 추가한 환경 변수를 출력하기 위하여 echo를 사용하였습니다.

def print_env_op():
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['sh', '-c', 'echo $example_env'],
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

enviroment_variables.py

import kfp
from kfp import dsl

import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar


def print_env_op():
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['sh', '-c', 'echo $example_env'],
    )


@dsl.pipeline(
  name='Env example',
  description='A pipline showing how to use environment variables'
)
def environment_pipeline():
    env_var = V1EnvVar(name='example_env', value='env_variable')

    print_env_op().add_env_variable(env_var)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(environment_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Sample Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Environment pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Environment pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


ContainerOp에 사이드카 추가하기

파이프 라인을 구성하고 실행하기

dsl.Sidecar()를 이용하여 사이드카를 생성합니다.

	echo = dsl.Sidecar(
        name="echo",
        image="hashicorp/http-echo:latest",
        args=['-text="hello world"'],
    )

dsl.ContainerOp()의 sidcars 파라미터를 이용하여 생성한 사이드카를 추가합니다.

	op1 = dsl.ContainerOp(
        name="download",
        image="busybox:latest",
        command=["sh", "-c"],
        arguments=[
            "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec
        ],  # sleep for X sec and call the sidecar and save results to output
        sidecars=[echo],
        file_outputs={"downloaded": "/tmp/results.txt"},
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

sidecar.py

import kfp
import kfp.dsl as dsl

@dsl.pipeline(
    name="pipeline_with_sidecar", 
    description="A pipeline that demonstrates how to add a sidecar to an operation."
)
def pipeline_with_sidecar(sleep_sec: int = 30):

    # sidecar with sevice that reply "hello world" to any GET request
    echo = dsl.Sidecar(
        name="echo",
        image="hashicorp/http-echo:latest",
        args=['-text="hello world"'],
    )

    # container op with sidecar
    op1 = dsl.ContainerOp(
        name="download",
        image="busybox:latest",
        command=["sh", "-c"],
        arguments=[
            "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec
        ],  # sleep for X sec and call the sidecar and save results to output
        sidecars=[echo],
        file_outputs={"downloaded": "/tmp/results.txt"},
    )

    op2 = dsl.ContainerOp(
        name="echo",
        image="library/bash",
        command=["sh", "-c"],
        arguments=["echo %s" % op1.output],  # print out content of op1 output
    )

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(pipeline_with_sidecar, __file__ + '.yaml')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sidecar pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.



파이프 라인의 기본 Artifact 저장소를 변경하기

KFP 는 파이프 라인에서 사용하는 아티팩트들은 내부에 설치된 minio에 저장하고 있습니다. 아티팩트의 기본 저장소를 바꾸려면 argo workflow의 workflow-controller configmap을 수정하면 됩니다.

다음 명령어를 실행하면 아티팩트 저장소의 설정 정보를 수정할 수 있습니다.

kubectl -n kubeflow edit configmap workflow-controller-configmap

다음은 아티팩트 저장소의 설정 정보입니다.

...
data:
  config: |
    {
    artifactRepository:
    {
        s3: {
            bucket: mlpipeline,
            keyPrefix: artifacts,
            endpoint: minio-service.kubeflow:9000,
            insecure: true,
            accessKeySecret: {
                name: mlpipeline-minio-artifact,
                key: accesskey
            },
            secretKeySecret: {
                name: mlpipeline-minio-artifact,
                key: secretkey
            }
        }
    }
    }
...

자세한 사항은 https://github.com/argoproj/argo/blob/master/docs/configure-artifact-repository.md 를 참고 하실 수 있습니다.

Kubeflow Pipelines – DSL 이해하기 #1

순차적 처리를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

순차적으로 작업을 하는 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 파이썬 코드로 작성되었습니다. 랜덤 함수를 사용해서 0이면 앞면(heads), 1이면 뒷면(tails) 이라고 출력합니다. 그리고 출력 결과를 /tmp/output 파일에 저장합니다. 결과가 저장된 /tmp/output 파일을 다음 단계에서 사용하기 위해 file_outputs 으로 정의합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 단순히 입력 받은 메시지를 echo 명령어를 사용하여 화면에 출력합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 다음 print_op()가 실행됩니다. 그리고 flip_coin_op()의 출력 결과를 print_op()의 입력 파라미터로 사용하고 있습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

sequential.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sequential pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Graph 탭에서 flip-coin 단계를 선택하고, Input/Output 탭을 클릭하면 출력 결과를 볼 수 있습니다.


조건에 의해서 분기를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

조건에 의해서 분기를 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과에 따러 “승리” 또는 “패배” 를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 결과에 따라서 print_op()를 사용해서 “승리” 또는 “패배”를 출력하게 됩니다. 동전이 앞면인지 뒷면이지 판단하기 위해서 dsl.Condition() 을 사용하였습니다. 앞면이면 ‘YOUT WIN’을 출력하고, 뒷면이면 ‘YOU LOSE’를 출력합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

condition.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Condition pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


병렬 루프를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

정적 항목 세트에 대한 병렬 루프를 사용하는 파이프 라인을 만들어 보겠습니다. 동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coins_op() 는 동전을 여러번 던지는 단계입니다. 동전을 던진 결과 값을 json 형식으로 저정합니다.

def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coins_op()가 먼저 실행되고, 그 결과가 json 형식으로 반환됩니다. json에 포함된 아이템의 개수 만큼 print_op() 를 실행합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

loop.py

import kfp
from kfp import dsl


def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )



def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(loop_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Loop pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Loop pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


Exit Handler를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

Exit Handler 사용하는 파이프 라인을 만들어 보겠습니다. Exit Handler를 사용하면 작업 그룹이 종료 될 때 지정한 작업을 실행시킬 수 있습니다. 작업 그룹의 성공 여부와는 상관 없이 무조건 지정한 작업이 실행됩니다.

동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. dsl.ExitHandler()를 사용해서, 작업 그룹이 종료될때 실행할 작업을 지정할 수 있습니다. 작업 그룹은 flip_coin_op() 과 print_op(flip.output) 으로 구성되어 있습니다. 작업 그룹에 속한 작업들이 종료되면 dsl.ExitHandler(exit_op)에 지정한 exit_op 작업이 실행됩니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

exit_handler.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Exit Handler', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Exit Handler” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


파이프 라인 파라미터 사용하기

파이프 라인을 구성하고 실행하기

파라미터를 입력 받아서, 파이프 라인을 실행할 수 있는 파이프 라인을 만들어 보겠습니다.

파이프 라인을 실행할 때 앞면(heads) 또는 뒷면(tails)을 파라미터로 입력받아서, 동전을 던진 결과와 같으면 승링를 다르면 패배를 출력하게 하였습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. 파이프 라인 함수에 predict 파라미터를 입력 받을 수 있도록 추가하였습니다. 파이프 라인 함수가 호출되면 각 파라미터는 PipelineParam 객체가 됩니다.

@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다. 파이프 라인 실행 시 파라미터를 넘기기 위해서, params 파라미터를 사용하였습니다.

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파라미터는 Kubeflow Pipelines UI를 통해서 실행할 때도 입력할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

parameters.py

import kfp
from kfp import dsl
from kfp.dsl import PipelineParam


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Pipeline parameters” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


Kubeflow Pipelines – SDK를 사용해서 파이프라인 만들기

Kubeflow Pipelines SDK를 사용하여 파이프라인과 컴포넌트를 구성하고, 빌드하는 방법에 대해서 알아보겠습니다. 그리고 SDK를 사용하여 파이프라인을 실행하는 방법과, Kubeflow Pipelines UI를 사용하여 파이프라인을 실행하는 방법에 대해서 알아보겠습니다.


Pipelines SDK 소개

Kubeflow Pipelines SDK는 머신 러닝 워크 플로우를 정의하고, 실행시킬 수 있는 파이썬 패키지 세트를 제공합니다. 파이프 라인은 워크 플로우의 단계를 구성하는 컴포넌트들과, 각 컴포넌트들이 서로 상호 작용하는 방식을 정의해 놓은 것입니다.

Kubeflow Pipelines SDK는 파이프 라인을 컴파일하고 실행하는 등의 여러 상호 작용 기능을 제공하고 있습니다. 그리고 파이프 라인의 구성 요소인 컴포넌트를 만들고 로드 하는 등의 기능도 제공하고 있습니다. 컴포넌트에서 사용할 컨테이너 이미지를 빌드 하는 기능도 제공하고 있습니다. 다만 Kubeflow Pipelines SDK에 포함되어 있는 컨테이너 빌더 기능은 Google Cloud Platform (GCP) 환경에서만 원활하게 사용할 수 있습니다.

SDK 패키지

Kubeflow Pipelines SDK에는 다음과 같은 패키지가 포함되어 있습니다.

  • kfp.compiler : 파이프 라인을 컴파일 할 수 있는 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다
    • kfp.compiler.Compiler.compile : Python DSL 코드를 Kubeflow Pipelines 서비스가 처리 할 수 있는 단일 정적 구성 (YAML 형식)으로 컴파일합니다. Kubeflow Pipelines 서비스는 정적 구성을 실행을 위해 Kubernetes 리소스 세트로 변환합니다. (현재는 컴파일하면 Argo Workflows 형태로 변환합니다.)
  • kfp.component : 파이프 라인 컴포넌트와 상호 작용하기 위한 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.components.func_to_container_op : Python 함수를 파이프 라인 컴포넌트로 변환하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컨테이너에서 원래 함수를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
    • kfp.components.load_component_from_file : 파일에서 파이프 라인 컴포넌트를 로드하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컴포넌트 컨테이너 이미지를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
    • kfp.components.load_component_from_url : URL에서 파이프 라인 컴포넌트를 로드하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컴포넌트 컨테이너 이미지를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
  • kfp.containers : 컴포넌트 컨테이너 이미지를 빌드하는 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다
    • build_image_from_working_dir : 파이썬 작업 디렉토리를 사용하여 새 컨테이너 이미지를 빌드하고 푸시합니다. Python 컨테이너 이미지를 기본 이미지로 사용하는 Dockerfile을 생성하고, requirements.txt 파일 있는 경우 패키지를 설치하고 대상 Python 파일을 컨테이너 이미지에 복사합니다. 작업 디렉토리의 루트에 사용자 정의 Dockerfile을 만들어서 대체 할 수 있습니다. (현재는 Google Cloud Platform (GCP) 환경에서만 사용할 수 있습니다.)
  • kfp.dsl : 파이프 라인 및 컴포넌트를 정의하고 상호 작용하는 데 사용할 수있는 DSL (Domain-Specific Language)이 포함되어 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.dsl.ContainerOp : 컨테이너 이미지로 구현 된 파이프 라인 작업을 나타냅니다.
    • kfp.dsl.PipelineParam 한 파이프 라인 컴포넌트에서 다른 파이프 라인 컴포넌트로 전달할 수있는 파이프 라인 파라미터를 나타냅니다.
    • kfp.dsl.component : 파이프 라인 컴포넌트를 반환하는 DSL 함수의 데코레이터입니다. (ContainerOp).
    • kfp.dsl.pipeline : 파이프 라인을 반환하는 Python 함수의 데코레이터입니다.
    • kfp.dsl.python_component: 파이프 라인 컴포넌트 메타 데이터를 함수 객체에 추가하는 Python 함수의 데코레이터입니다.
    • kfp.dsl.types:  Kubeflow Pipelines SDK에서 사용하는 타입들이 정의되어 있습니다. 타입에는 String, Integer, Float 및 Bool과 같은 기본 타입과 GCPProjectID 및 GCRPath와 같은 도메인 별 타입이 있습니다. DSL 정적 유형 검사에 대해서는 안내서를 참조하실 수 있습니다.
    • kfp.dsl.ResourceOp : 쿠버네티스 리소스를 직접 조작할 수 작업을 나타냅니다.(creategetapply 등 ).
    • kfp.dsl.VolumeOp : 쿠버네티스 PersistentVolumeClaim 을 생성하는 파이프 라인 작업 을 나타냅니다.
    • kfp.dsl.VolumeSnapshotOp : 새로운 볼륨 스냅 샷을 생성하는 파이프 라인 작업을 나타냅니다.
    • kfp.dsl.PipelineVolume : 파이프 라인의 단계간에 데이터를 전달하기 위해 사용하는 볼륨을 나타냅니다.
  • kfp.Client : Kubeflow Pipelines API 용 Python 클라이언트 라이브러리가 포함되어 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.Client.create_experiment : 파이프 라인 experiment 을 만들고, experiment  개체를 반환합니다.
    • kfp.Client.run_pipeline 파이프 라인을 실행(run)하고 실행(run) 개체를 반환합니다.
  • KFP extension modules : Kubeflow Pipelines에서 사용할 수 있는 특정 플랫폼에 대한 기능을 가지고 있습니다. 온 프레미스, Google Cloud Platform (GCP), Amazon Web Services (AWS) 및 Microsoft Azure에 대한 유틸리티 기능을 제공하고 있습니다.

KFP CLI tool

KFP CLI 도구를 사용하면 커맨드 라인에서 직접 Kubeflow Pipelines SDK의 일부분을 사용할 수 있습니다. KFP CLI 도구는 다음과 같은 명령을 제공합니다.

  • kfp diagnose_me : 지정된 파라미터로 환경 진단을 실행합니다
    • --json : 명령 실행 결과를 JSON으로 반환하도록 합니다. 별도로 설정하지 않으면, 결과는 사람이 읽을 수 있는 형식으로 반환됩니다.
    • --namespace TEXT : 대상 쿠버네티스 네임스페이스를 지정합니다. 별도로 설정하지 않으면, 모든 네임스페이스를 대상으로 합니다.
  • kfp pipeline <COMMAND> : 파이프 라인을 관리하는 데 도움이 되는 명령을 제공합니다.
    • get : Kubeflow Pipelines 클러스터의 Kubeflow 파이프 라인에 대한 상세한 정보를 조회합니다.
    • list : Kubeflow Pipelines 클러스터에 업로드 된 파이프 라인 목록을 조회 합니다.
    • upload : Kubeflow Pipelines 클러스터에 파이프 라인을 업로드합니다.
  • kfp run <COMMAND> 파이프 라인 실행을 관리하는 데 도움이 되는 명령을 제공합니다.
    • get : 파이프 라인 실행의 상세한 정보를 조회합니다.
    • list : 최근 실행한 파이프 라인 실행 목록을 조회 합니다.
    • submit – 파이프 라인을 실행 시킵니다.

파이프 라인과 컴포넌트 만들기

SDK를 사용하여 파이프 라인과 컴포넌트를 만드는 방법에 대해서 알아 보도록 하겠습니다.

컴포넌트 만드는 방법

파이프라인은 컴포넌트로 구성되어 있습니다. 그래서 파이프라인을 만들기 위해서는 사용할 컴포넌트를 먼저 만들어야합니다. 이미 만들어 놓은 컴포넌트가 있으면 재사용할 수도 있습니다.

컴포넌트를 만드는 단계는 다음과 같습니다.

가 . 컴포넌트 프로그램 작성 : 컴포넌트에서 사용할 프로그램을 작성해야 합니다. 프로그램은 다른 컴포넌트로부터 데이터를 받기 위해서, 파일이나 명령행 인수를 사용해야 합니다.

나. 컴포넌트 컨테이너화 : 작성한 프로그램을 컨테이너 이미지로 만들어야 합니다.

다. 컴포넌트 스펙 작성 : 컴포넌트의 데이터 모델을 정의하기 위해서 YAML 형식의 파일을 작성해야 합니다. 재사용 가능한 컴포넌트를 만들때는 스펙을 작성하는 것이 좋지만, 생략 가능합니다. 컴포넌트 스펙 파일이 있는 경우에는 스펙 파일을 로드해서 컴포넌트를 생성할 수 있습니다. 자세한 내용은 “재사용 가능한 컴포넌트”를 참고 하시기 바랍니다.

파이프라인 만드는 방법

컴포넌트를 이용해 파이프라인을 만들 수 있습니다. 파이프라인을 만들기 위해서 파이프라인 파이썬 코드를 작성해야 합니다.

파이프라인을 만드는 단계는 다음과 같습니다.

가. Kubeflow Pipelines DSL을 사용하여 파이프라인 함수와 컴포넌트 함수를 작성합니다.

나. 파이프 라인을 컴파일 하여 압축 된 YAML 파일을 생성합니다.

파이프 라인을 컴파일 하기 위한 방법은 두 가지가 있습니다.

  • kfp.compiler.Compiler.compile 메소드를 사용하는 방법 kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
  • 커맨드 라인에서 dsl-compile 커맨드를 사용하는 방법 dsl-compile --py [path/to/python/file] --output my_pipeline.zip

다. 파이프 라인을 업로드하고, 실행합니다.

파이프 라인을 실행하는 방법은 두 가지가 있습니다.

  • Kubeflow Pipelines SDK 를 사용하는 방법
client = kfp.Client()
my_experiment = client.create_experiment(name='demo')
my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
  'my_pipeline.zip')
  • Kubeflow Pipelines UI를 사용하는 방법

컴포넌트 만들기

프로그램이 포함된 컨테이너 이미지를 사용하여 컴포넌트를 생성하는 방법에 대해서 알아 보도록 하겠습니다. 생성한 컴포넌트는 파이프라인을 작성하는데 사용됩니다.

가. 프로그램 코드를 작성합니다.

mnist-simple.py

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import numpy as np


def train():
    print("TensorFlow version: ", tf.__version__)

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Reserve 10,000 samples for validation
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(0.2),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, epochs=5, validation_data=(x_val, y_val))

    print("Average test loss: ", np.average(training_history.history['loss']))


if __name__ == '__main__':
    train()

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD mnist-simple.py /app/

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/mnist-simple:kfp .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/mnist-simple:kfp

SDK를 사용하여 파이프라인을 만들고 실행하기

생성한 컴포넌트를 이용해 파이프라인을 만들어 보겠습니다. 파이프라인을 만들기 위해서 파이프라인 파이썬 코드를 작성해야 합니다.

가. Kubeflow Pipelines DSL을 사용하여 컴포넌트 함수를 작성합니다. image 파라미터에 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 컴포넌트 함수는 kfp.dsl.ContainerOp를 리턴해야합니다. 선택적으로 kfp.dsl.component 라는 데코레이터를 사용하여 DSL 컴파일러에서 정적 타입 검사를 활성화 할 수 있습니다. 데코레이터를 사용하려면 @kfp.dsl.component 어노테이션을 컴포넌트 함수에 추가 하면 됩니다.

@kfp.dsl.component
def train_component_op():
    return kfp.dsl.ContainerOp(
        name='mnist-train',
        image='kangwoo/kfp-mnist:kfp'
    )

나. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하는 함수에 사용할 컴포넌트들을 추가합니다. 파이프 라인 함수에서 파이프 라인을 빌드하기 위해서 kfp.dsl.pipeline 데코레이터를 사용합니다. 데코레이터를 사용하려면 @kfp.dsl.pipeline 어노테이션을 파이프 라인 함수에 추가 하면 됩니다.

@dsl.pipeline(
    name='My pipeline',
    description='My machine learning pipeline'
)
def my_pipeline():
    train_task = train_component_op()

다. 파이프 라인을 컴파일하여 압축 된 YAML 파일을 생성하겠습니다. YAML 파일에는 파이프 라인 실행을 위한 쿠버네티스 리소스들이 정의되어 있습니다. kfp.compiler.Compiler.compile 메소드를 사용하는 컴파일 하겠습니다.

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

라. 파이프 라인을 업로드하고, 실행합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 업로드하고 실행하겠습니다.

client = kfp.Client()
my_experiment = client.create_experiment(name='Basic Experiment')
my_run = client.run_pipeline(my_experiment.id, 'my_pipeline', 
  'my_pipeline.zip')

다음은 파이프라인 전체 코드 입니다.

import kfp
from kfp import dsl


@kfp.dsl.component
def train_component_op():
    return kfp.dsl.ContainerOp(
        name='mnist-train',
        image='kangwoo/kfp-mnist:kfp'
    )


@dsl.pipeline(
    name='My pipeline',
    description='My machine learning pipeline'
)
def my_pipeline():
    train_task = train_component_op()


if __name__ == '__main__':
    # Compile
    pipeline_package_path = 'my_pipeline.zip'
    kfp.compiler.Compiler().compile(my_pipeline, pipeline_package_path)

    # Run
    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'my_pipeline', pipeline_package_path)

파이프라인 코드를 실행합니다.

python my_pipeline.py

파이프라인 코드가 실행되면, 파이프라인 패키지가 컴파일 됩니다. 그리고 컴파일된 패키지를 Kubeflow Pipelines 에 전달하여 실행 시킵니다. 실행 결과는 Kubeflow Pipelines UI를 통해서 확인할 수 있습니다.

파이프라인 실행 결과 확인 하기

Kubeflow 파이프라인 UI에서 실행 결과를 확인하려면, 다음 절차대로 진행하시면 됩니다.

  1. Kubeflow 대시보드 화면의 왼쪽 메뉴에서 Pipelines를 클릭합니다.
  2. Kubeflow Pipelines UI 화면의 왼쪽 메뉴에서 Experiments 클릭하여, 현재 파이프 라인 실험 목록을 조회합니다.
  3. 보려는 실험(Experiment)의 ‘실험 이름’을 클릭합니다.
  4. 보려는 실행(Run)의 “실행 이름”을 클릭합니다.
  5. Graph 탭에서 보려는 파이프 라인 컴포넌트를 나타내는 단계를 클릭합니다. 단계 세부 사항이 Artifacts  탭을 표시하며 슬라이드 됩니다.

Kubeflow – Kubeflow Pipelines 이해하기

Kubeflow Pipelines 살펴보기

실 세계에서 머신 러닝 워크 플로우를 만들고 배포하는 것은 무척이나 어렵고 힘든 일입니다.

Kubeflow Pipelines는 머신 러닝 워크 플로우를 만들고 배포하기 위한 컨테이너 기반의 플랫폼으로서, 사용자가 편리하게 사용할 수 있고, 쉽게 확장이 가능합니다.

Kubeflow Pipelines는 머신 러닝 워크 플로우를 생성하기 위해서 파이프 라인을 정의합니다. 파이프 라인에는 사용하는 컴포넌트들과 작업 처리 규칙이 정의되어 있습니다. Kubeflow Pipelines는 파이프라인 뿐만 아니라 컴포넌트도 쉽게 재사용할 수 있도록 만들어져 있습니다. 그래서 만들어진 컴포넌트나 파이프라인 있다면, 레고를 조립하는 것처럼 쉽게 워크 플로우를 구성할 수 있습니다.

Kubeflow Pipelines는 Kubeflow의 핵심 구성 요소로 포함되어 있습니다. 그래서 별도의 설치 없이 사용할 수 있습니다. 물론 Kubeflow 없이 독립적으로 설치해서 사용할 수도 있습니다.

아쉽게도 Kubeflow Pipelines 는 아직 멀티 테넌시를 지원하지 않습니다.

Kubeflow Pipelines 목표

Kubeflow Pipelines 가 추구하는 목표는 다음과 같습니다.

  • 엔드 투 엔드 오케스트레이션 : 머신 러닝 파이프 라인의 오케스트레이션을 지원하고 단순화 시킵니다
  • 손쉬운 실험 : 수많은 아이디어와 기술을 쉽게 시도 할 수 있고, 다양한 시험/실험을 관리 할 수 ​​있도록 합니다.
  • 손쉬운 재사용 : 구성 요소 및 파이프 라인을 재사용하여, 매번 재 구축 할 필요 없이 엔드 투 엔드 솔루션을 신속하게 생성 할 수 있도록 합니다.

Kubeflow Pipelines 개념

Kubeflow Pipelines 에서 사용하는 개념에 대해서 알아보겠습니다.

Pipeline

Pipeline은 머신 러닝 워크 플로우에서 사용하는 컴포넌트들과, 해당 컴토넌트들 간의 작업 처리 규칙을 그래프 형태로 정의한 것입니다. Pipeline에는 파이프 라인을 실행하는 데 필요한 입력 매개 변수와 각 컴포넌트의 입력 및 출력에 대한 정의가 포함되어 있습니다.

Pipeline을 실행하면 시스템이 워크 플로우의 단계에 해당하는 하나 이상의 쿠버네티스 포드를 시작합니다. 포드는 컨테이너를 시작하고 컨테이너는 정의된 프로그램을 실행합니다.

파이프 라인을 개발 한 후 Kubeflow Pipelines UI에서 파이프 라인을 업로드하고 실행 할 수 있습니다.

Component

Pipeline의 컴포넌트는 하나의 단계를 수행하는 코드가 모여있는 컨테이너 이미지입니다. 이 컴포넌트들은 파이프 라인의 실행 단계에서 각자의 담당 역할을 수행하게 됩니다. 예를 들어 데이터 전처리, 데이터 변환, 모델 학습 등이 있습니다. 컴포넌트는 입력 및 출력에 대한 정의를 포함하고 있습니다.

Graph

그래프는 Kubeflow Pipelines UI에서 파이프 라인의 런타임 실행을 나타내는 그림입니다. 그래프는 파이프 라인의 실행된 단계나 실행중인 단계를 나타냅니다. 화살표는 각 단계로 표시되는 파이프 라인 컴포넌트 간의 상/하위 관계를 나타냅니다.

파이프 라인이 실행되면 그래프를 볼 수 있습니다. 그래프 안의 각 노드는 파이프 라인의 단계를 나타냅니다.

각 노드의 오른쪽 상단에는 상태, 실행 중, 성공, 실패 또는 건너뜀 상태를 나타내는 아이콘이 있습니다. 조건절이 있을 경우에는 노드를 건너 뛸 수 있습니다.

Experiment

Experiment는 파이프 라인을 실행 할 수 있는 작업 공간입니다. experiment 사용하여 파이프 라인의 실행(run)을 논리적 그룹으로 묶을 수 있습니다. Experiments에는 임의의 실행(run)뿐만 아니라 반복 실행(recurring run)도 포함될 수 있습니다.

Run and Recurring Run

Run은 파이프 라인을 한번 실행 하는 것을 의미합니다. Run은 사용자가 시도하는 실행에 대한 정보를 저장하고 있기 때문에, 재현이 가능합니다. Kubeflow Pipelines UI의 세부 정보 페이지를 보면, 실행 진행률을 볼 수 있습니다. 여기에서 실행의 각 단계에 대한 런타임 그래프, 출력 결과 및 로그를 확인 할 수 있습니다.

Recurring run 은 파이프 라인의 반복 실행을 의미합니다. 반복 실행을 위한 설정에는 파이프 라인에서 사용하는 파라미터와 실행 트리거를 위한 파라미터가 포함되어 있습니다.

모든 Experiment 내에서 반복 실행을 시작할 수 있습니다. 반복 실행이 설정되어 있으면, 주기적으로 파이프 라인을 실행하게 됩니다. Kubeflow Pipelines UI에서 반복 실행을 활성화/비활성화 할 수 있습니다. 동시에 실행되는 최대 실행 개수를 제한하기 위해서, 최대 동시 실행 개수를 지정할 수도 있습니다. 최대 동시 실행 개수는, 파이프 라인의 실행 시간이 오래 걸리면서, 자주 실행되게 트리거 되는 경우에 도움이 될 수 있습니다.

Run Trigger

실행 트리거는 주어진 실행 조건에 의해서 새로운 실행을 시작하기 위하여 시스템에 알리는 역할을 합니다. 다음과 같은 유형의 실행 트리거를 사용할 수 있습니다.

  • Periodic: 주기적인 간격 기반의 실행을 위한 트리거입니다. (예 : 3 시간마다 또는 45 분마다).
  • Cron : 실행 예약을 위한 cron 표현식을 사용하는 트리거입니다.

Step

Step은 파이프 라인의 컴포넌트 중 하나를 실행한다는 것을 의미합니다. 복잡한 파이프 라인에서 컴포넌트는 반복적으로 여러 번 실행될 수 있습니다. 그리고 if 같은 조건문을 사용하여, 조건부로 실행될 수 있습니다.

Output Artifact

Output Artifact 는 파이프 라인 컴포넌트에서 생성한 출력물입니다. 이 출력물은 Kubeflow Pipelines UI를 사용하여 이해하기 쉽게 시각화로 렌더링 할 수 있습니다.

파이프 라인 컴포넌트에 아티팩트를 포함시켜서 성능 평가 같은 일을 할 수 있으며, 의사 결정을 위한 자료로 사용할 수도 있습니다. 그리고 파이프 라인의 다양한 컴포넌트들이 어떻게 작동하는 이해하는데 많은 도움을 줄 수도 있습니다. 아티팩트는 일반적인 텍스트 뿐만 아니라, 시각화를 위한 데이터까지 다양하게 존재합니다.

Kubeflow Pipelines 구성 요소

Kubeflow Pipelines은 다음과 같은 구성 요소로 이루어져 있습니다.

  • 실험, 작업 및 실행을 관리하고 추적하기 위한 사용자 인터페이스 (UI)
  • 파이프라인을 관리하는 파이프라인 서비스
  • 파이프라인과 컴포넌트를 정의하고 제어하기 위한 SDK.
  • 머신 러닝 워크 플로우 실행을 위한 컨트롤러.

Kubeflow Pipelines UI(User interface)

Kubeflow Pipelines UI 는 현재 실행 중인 파이프 라인 목록, 파이프 라인 실행 기록, 데이터 아티팩트 목록, 개별 파이프 라인 실행에 대한 디버깅 정보, 개별 파이프 라인 실행에 대한 실행 상태를 표시합니다.

https://www.kubeflow.org/docs/images/pipelines-ui.png

Kubeflow Pipelines UI 에서 다음과 같은 작업을 수행 할 수 있습니다.

  • 압축 파일로 만들어진 파이프 라인을 업로드 할 수 있습니다. 업로드 된 파이프 라인은 다른 사람들과 공유 할 수 있습니다.
  • 파이프 라인의 실행을 그룹화하는 “Experiment“을 생성할 수 있습니다.
  • Experiment 내에서 파이프라인을 실행할 수 있습니다.
  • 파이프 라인 실행의 구성, 그래프 및 출력을 확인할 수 있습니다
  • 반복 실행을 작성하여 실행을 예약할 수 있습니다.

Python SDK

Kubeflow Pipelines SDK는 머신 러닝 워크 플로우를 정의하고, 실행시킬 수 있는 파이썬 패키지 세트입니다.

다음은 SDK의 주요 패키지 입니다.

  • kfp.compiler : 파이프 라인을 컴파일 할 수 있는 기능을 제공하고 있습니다.
  • kfp.component : 파이프 라인 컴포넌트와 상호 작용하기 위한 기능을 제공하고 있습니다.
  • kfp.containers : 컴포넌트 컨테이너 이미지를 빌드하는 기능을 제공하고 있습니다.
  • kfp.Client : Kubeflow Pipelines API 용 Python 클라이언트 라이브러리가 포함되어 있습니다.

Pipeline Service

파이프라인 서비스는 파이프 라인을 생성하고 실행하는 등의 관리 역할을 하고 있습니다. 그리고 Experiment, Run 같은 파이프라인 메타데이터를 메타데이터 저장소에 저장하는 역할도 하고 있습니다.

또한 REST API도 제공하고 있습니다. REST API는 셸 스크립트 또는 다른 시스템에 통합하려는 경우 유용하게 사용할 수 있습니다.

Pipelines 데이터 저장소

Kubeflow Pipelines에는 머신 러닝 파이프 라인에 관련된 데이터 관리하기 위해서 다음과 같은 두 개의 저장소를 가지고 있습니다.

  • Metadata : Experiment, Run 등 Kubeflow Pipelines는 파이프 라인 메타 데이터를 MySQL 데이터베이스에 저장합니다.
  • Artifacts : 파이프라인 패키지, 메트릭, 뷰 등 아티팩트를 Minio 서버에 저장합니다.

Kubeflow Pipelines는 쿠버네티스의 퍼시스턴스 볼륨(PV)을 사용하여 MySQL 데이터베이스와 Minio 서버의 데이터를 저장합니다.

Orchestration Controllers

오케스트레이션 컨트롤러는 머신 러닝 워크 플로우 실행 다시 말해서, 파이프 라인을 완료하는데 필요한 컨테이너들을 실행시키는 역할을 하고 있습니다. 컨테이너들은 쿠버네티스의 포드 형태로 실행됩니다. 현재 Kubeflow Pipelines 에서는 Argo Workflow 를 워크플로우 컨트롤러로 사용하고 있습니다.

Kubeflow – Katib : Metrics Collector

Metrics Collector 알아보기

앞서 하이퍼 파라미터 튜닝에서 사용했던 메트릭 수집기는 기본 수집기인 StdOut 메트릭 수집기였습니다. 이번에는 StdOud 메트릭 수집기에 필터를 적용하는 방법과 TensorFlowEvent, File 그리고 Custom 메트릭 수집기에 대해서 알아보겠습니다.

StdOud 메트릭 수집기에 필터 적용하기

StdOut 메트릭 수집기에 필터를 적용하는 방법에 대해서 알아보겠습니다. 기존 예제에서는 StdOut 으로 출력되는 메트릭을 수집하기 위해서 {{MetricsName}}={{MetricsValue}} 형태로 출력을 하였습니다. 필터를 사용하면 메트릭을 나타내는 형식을 지정할 수 있기 때문에, 모델 학습시 출력되는 기본적인 로그를 그대로 사용할 수 있습니다.

예를 든다면, mnist-simple.py 를 실행하면 다음과 같은 로그가 출력됩니다.

Epoch 1/5
50000/50000 [==============================] - 2s 46us/sample - loss: 0.3268 - accuracy: 0.9055 - val_loss: 0.1509 - val_accuracy: 0.9574
Epoch 2/5
50000/50000 [==============================] - 2s 42us/sample - loss: 0.1581 - accuracy: 0.9534 - val_loss: 0.1115 - val_accuracy: 0.9684
Epoch 3/5
50000/50000 [==============================] - 2s 40us/sample - loss: 0.1166 - accuracy: 0.9642 - val_loss: 0.1017 - val_accuracy: 0.9708
Epoch 4/5
50000/50000 [==============================] - 2s 40us/sample - loss: 0.0959 - accuracy: 0.9707 - val_loss: 0.0836 - val_accuracy: 0.9756
Epoch 5/5
50000/50000 [==============================] - 2s 42us/sample - loss: 0.0808 - accuracy: 0.9747 - val_loss: 0.0774 - val_accuracy: 0.9773

로그를 보면, 메트릭이 “accuracy: 0.9055 “, “val_accuracy: 0.9574” 이런 형식으로 출력되는 것을 확인 할 수 있습니다. 필터에 {{MetricsName}}:{{MetricsValue}} 형식을 추가해서 기본 로그에서 메트릭을 추출하도록 하겠습니다. 형식은 go 언어의 정규표현식을 사용할 수 있습니다.

다음은 {{MetricsName}}:{{MetricsValue}} 형식을 필터로 사용하는 metricsCollectorSpec 입니다.

metricsCollectorSpec:
    collector:
      kind: StdOut
    source:
      filter:
        metricsFormat:
          - "([\\\\w|-]+)\\\\s*:\\\\s*((-?\\\\d+)(\\\\.\\\\d+)?)"

모델 코드 만들기

텐서플로우 케라스로 작성한 mnist 숫자를 판별하는 모델입니다. Katib를 위한 별도의 로그는 출력하지 않습니다.

mnist-simple.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import tensorflow as tf
import numpy as np

def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', default=0.01, type=float)
    parser.add_argument('--dropout', default=0.2, type=float)
    args = parser.parse_args()

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Reserve 10,000 samples for validation
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(args.dropout),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=args.learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, epochs=5, validation_data=(x_val, y_val))

    print("Average test loss: ", np.average(training_history.history['loss']))


if __name__ == '__main__':
    train()

모델 컨테이너 이미지 만들기

모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.

다음은 텐서플로우 2.1을 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD mnist-simple.py /app/

다음 명령어로 “kangwoo/mnist-simple:katib” 라는 이름으로 컨테이너 이미지를 빌드할 수 있습니다.

docker build -t kangwoo/mnist-simple:katib.

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 푸시합니다.

docker push kangwoo/mnist-simple:katib

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

random-stdout-filter-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: random-stdout-filter-example
spec:
  metricsCollectorSpec:
    collector:
      kind: StdOut
    source:
      filter:
        metricsFormat:
          - "([\\\\w|-]+)\\\\s*:\\\\s*((-?\\\\d+)(\\\\.\\\\d+)?)"
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: val_accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: random
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist-simple:katib
                  imagePullPolicy: Always
                  command:
                  - "python3"
                  - "/app/mnist-simple.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f random-stdout-filter-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.


TensorFlowEvent 메트릭 수집기 사용하기

TensorFlowEvent 메트릭 수집기를 사용해 보겠습니다. TensorFlowEvent 메트릭 수집기는 텐서플로우에서 생성하는 이벤트를 추출해서 메트릭을 수집합니다. 그래서 기존의 텐서플로우 코드를 사용할 때 유용합니다. 다만 혀재는 텐서플로우 1 버전만을 지원하기 때문에, 텐서플로우 2 버전에 사용하기에는 약간의 문제가 있습니다.

다음은 TensorFlowEvent 메트릭 수집기를 사용하는 metricsCollectorSpec 입니다. fileSystemPath 필드를 사용해서 이벤트가 저장되어 있는 경로를 지정해 주어야합니다.

metricsCollectorSpec:
    collector:
      kind: TensorFlowEvent
    source:
      fileSystemPath:
        path: /train
        kind: Directory

모델 코드 만들기

텐서플로우 1 버전으로 작성한 mnist 숫자를 판별하는 모델입니다. tf.summary를 사용하여 이벤트를 출력하고 있습니다.

mnist-with-summaries.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os
import sys

import tensorflow as tf

from tensorflow.examples.tutorials.mnist import input_data

FLAGS = None


def train():
  # Import data
  mnist = input_data.read_data_sets(FLAGS.data_dir,
                                    fake_data=FLAGS.fake_data)

  sess = tf.InteractiveSession()
  # Create a multilayer model.

  # Input placeholders
  with tf.name_scope('input'):
    x = tf.placeholder(tf.float32, [None, 784], name='x-input')
    y_ = tf.placeholder(tf.int64, [None], name='y-input')

  with tf.name_scope('input_reshape'):
    image_shaped_input = tf.reshape(x, [-1, 28, 28, 1])
    tf.summary.image('input', image_shaped_input, 10)

  # We can't initialize these variables to 0 - the network will get stuck.
  def weight_variable(shape):
    """Create a weight variable with appropriate initialization."""
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

  def bias_variable(shape):
    """Create a bias variable with appropriate initialization."""
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

  def variable_summaries(var):
    """Attach a lot of summaries to a Tensor (for TensorBoard visualization)."""
    with tf.name_scope('summaries'):
      mean = tf.reduce_mean(var)
      tf.summary.scalar('mean', mean)
      with tf.name_scope('stddev'):
        stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
      tf.summary.scalar('stddev', stddev)
      tf.summary.scalar('max', tf.reduce_max(var))
      tf.summary.scalar('min', tf.reduce_min(var))
      tf.summary.histogram('histogram', var)

  def nn_layer(input_tensor, input_dim, output_dim, layer_name, act=tf.nn.relu):
    """Reusable code for making a simple neural net layer.
    It does a matrix multiply, bias add, and then uses ReLU to nonlinearize.
    It also sets up name scoping so that the resultant graph is easy to read,
    and adds a number of summary ops.
    """
    # Adding a name scope ensures logical grouping of the layers in the graph.
    with tf.name_scope(layer_name):
      # This Variable will hold the state of the weights for the layer
      with tf.name_scope('weights'):
        weights = weight_variable([input_dim, output_dim])
        variable_summaries(weights)
      with tf.name_scope('biases'):
        biases = bias_variable([output_dim])
        variable_summaries(biases)
      with tf.name_scope('Wx_plus_b'):
        preactivate = tf.matmul(input_tensor, weights) + biases
        tf.summary.histogram('pre_activations', preactivate)
      activations = act(preactivate, name='activation')
      tf.summary.histogram('activations', activations)
      return activations

  hidden1 = nn_layer(x, 784, 500, 'layer1')

  with tf.name_scope('dropout'):
    keep_prob = tf.placeholder(tf.float32)
    tf.summary.scalar('dropout_keep_probability', keep_prob)
    dropped = tf.nn.dropout(hidden1, keep_prob)

  # Do not apply softmax activation yet, see below.
  y = nn_layer(dropped, 500, 10, 'layer2', act=tf.identity)

  with tf.name_scope('cross_entropy'):
    # The raw formulation of cross-entropy,
    #
    # tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(tf.softmax(y)),
    #                               reduction_indices=[1]))
    #
    # can be numerically unstable.
    #
    # So here we use tf.losses.sparse_softmax_cross_entropy on the
    # raw logit outputs of the nn_layer above, and then average across
    # the batch.
    with tf.name_scope('total'):
      cross_entropy = tf.losses.sparse_softmax_cross_entropy(
          labels=y_, logits=y)
  tf.summary.scalar('cross_entropy', cross_entropy)

  with tf.name_scope('train'):
    train_step = tf.train.AdamOptimizer(FLAGS.learning_rate).minimize(
        cross_entropy)

  with tf.name_scope('accuracy'):
    with tf.name_scope('correct_prediction'):
      correct_prediction = tf.equal(tf.argmax(y, 1), y_)
    with tf.name_scope('accuracy'):
      accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
  tf.summary.scalar('accuracy', accuracy)

  # Merge all the summaries and write them out to
  # /tmp/tensorflow/mnist/logs/mnist_with_summaries (by default)
  merged = tf.summary.merge_all()
  train_writer = tf.summary.FileWriter(FLAGS.log_dir + '/train', sess.graph)
  test_writer = tf.summary.FileWriter(FLAGS.log_dir + '/test')
  tf.global_variables_initializer().run()

  # Train the model, and also write summaries.
  # Every 10th step, measure test-set accuracy, and write test summaries
  # All other steps, run train_step on training data, & add training summaries

  def feed_dict(train):     # pylint: disable=redefined-outer-name
    """Make a TensorFlow feed_dict: maps data onto Tensor placeholders."""
    if train or FLAGS.fake_data:
      xs, ys = mnist.train.next_batch(FLAGS.batch_size, fake_data=FLAGS.fake_data)
      k = FLAGS.dropout
    else:
      xs, ys = mnist.test.images, mnist.test.labels
      k = 1.0
    return {x: xs, y_: ys, keep_prob: k}

  for i in range(FLAGS.max_steps):
    if i % 10 == 0:  # Record summaries and test-set accuracy
      summary, acc = sess.run([merged, accuracy], feed_dict=feed_dict(False))
      test_writer.add_summary(summary, i)
      print('Accuracy at step %s: %s' % (i, acc))
    else:  # Record train set summaries, and train
      if i % 100 == 99:  # Record execution stats
        run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
        run_metadata = tf.RunMetadata()
        summary, _ = sess.run([merged, train_step],
                              feed_dict=feed_dict(True),
                              options=run_options,
                              run_metadata=run_metadata)
        train_writer.add_run_metadata(run_metadata, 'step%03d' % i)
        train_writer.add_summary(summary, i)
        print('Adding run metadata for', i)
      else:  # Record a summary
        summary, _ = sess.run([merged, train_step], feed_dict=feed_dict(True))
        train_writer.add_summary(summary, i)
  train_writer.close()
  test_writer.close()


def main(_):
  if tf.gfile.Exists(FLAGS.log_dir):
    tf.gfile.DeleteRecursively(FLAGS.log_dir)
  tf.gfile.MakeDirs(FLAGS.log_dir)
  train()


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--fake_data', nargs='?', const=True, type=bool,
                      default=False,
                      help='If true, uses fake data for unit testing.')
  parser.add_argument('--max_steps', type=int, default=1000,
                      help='Number of steps to run trainer.')
  parser.add_argument('--learning_rate', type=float, default=0.001,
                      help='Initial learning rate')
  parser.add_argument('--batch_size', type=int, default=100,
                      help='Training batch size')
  parser.add_argument('--dropout', type=float, default=0.9,
                      help='Keep probability for training dropout.')
  parser.add_argument(
      '--data_dir',
      type=str,
      default=os.path.join(os.getenv('TEST_TMPDIR', '/tmp'),
                           'tensorflow/mnist/input_data'),
      help='Directory for storing input data')
  parser.add_argument(
      '--log_dir',
      type=str,
      default=os.path.join(os.getenv('TEST_TMPDIR', '/tmp'),
                           'tensorflow/mnist/logs/mnist_with_summaries'),
      help='Summaries log directory')
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

모델 컨테이너 이미지 만들기

모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.

다음은 텐서플로우 1.11을 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.

Dockerfile

FROM tensorflow/tensorflow:1.11.0

RUN mkdir -p /app
ADD mnist-with-summaries.py /app/

다음 명령어로 “kangwoo/mnist-simple:katib” 라는 이름으로 컨테이너 이미지를 빌드할 수 있습니다.

docker build -t kangwoo/mnist-with-summaries:katib .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 푸시합니다.

docker push kangwoo/mnist-with-summaries:katib

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

random-tf-event-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: random-tf-event-example
spec:
  metricsCollectorSpec:
    source:
      fileSystemPath:
        path: /train
        kind: Directory
    collector:
      kind: TensorFlowEvent
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: accuracy_1
  algorithm:
    algorithmName: random
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.05"
    - name: --batch_size
      parameterType: int
      feasibleSpace:
        min: "100"
        max: "200"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: "kubeflow.org/v1"
          kind: TFJob
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
           tfReplicaSpecs:
            Worker:
              replicas: 1
              restartPolicy: OnFailure
              template:
                spec:
                  containers:
                    - name: tensorflow
                      image: kangwoo/mnist-with-summaries:katib
                      imagePullPolicy: Always
                      command:
                        - "python"
                        - "/app/mnist-with-summaries.py"
                        - "--log_dir=/train/metrics"
                        {{- with .HyperParameters}}
                        {{- range .}}
                        - "{{.Name}}={{.Value}}"
                        {{- end}}
                        {{- end}}

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f random-tf-event-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.


File 메트릭 수집기 사용하기

File 메트릭 수집기를 사용해 보겠습니다. File 메트릭 수집기는 파일로 출력되는 로그를 추출해서 메트릭을 수집합니다. File 메트릭 수집기도 필터를 사용하여 메트릭 형식을 지정할 수 있습니다. 메트릭 형식을 지정하지 않으면, 기본 형식인 “([\w|-]+)\s*=\s*((-?\d+)(\.\d+)?)” 즉 {{MetricsName}}={{MetricsValue}} 을 사용합니다.

다음은 File 메트릭 수집기를 사용하는 metricsCollectorSpec 입니다. fileSystemPath 필드를 사용해서 로그가 저장되어 있는 파일 경로를 지정해 주어야 합니다. 파일 경로를 지정하지 않으면 기본 경로인 “/var/log/katib/metrics.log”을 사용합니다.

metricsCollectorSpec:
    source:
      filter:
        metricsFormat:
        - "([\\\\w|-]+)\\\\s*=\\\\s*((-?\\\\d+)(\\\\.\\\\d+)?)"
      fileSystemPath:
        path: "/var/log/katib/mnist.log"
        kind: File
    collector:
      kind: File

모델 코드 만들기

텐서플로우 케라스로 작성한 mnist 숫자를 판별하는 모델입니다. logging 패키지를 사용하여 파일로 로그를 출력하고 있습니다.

mnist-with-log.py

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import argparse
import numpy as np
from datetime import datetime, timezone

import logging

logging.basicConfig(filename='/var/log/katib/mnist.log', level=logging.DEBUG)


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', default=0.01, type=float)
    parser.add_argument('--dropout', default=0.2, type=float)
    args = parser.parse_args()

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Reserve 10,000 samples for validation
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(args.dropout),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=args.learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])

    print("Training...")
    katib_metric_log_callback = KatibMetricLog()
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_data=(x_val, y_val),
                                 callbacks=[katib_metric_log_callback])
    print("Average test loss: ", np.average(training_history.history['loss']))


class KatibMetricLog(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        # RFC 3339
        local_time = datetime.now(timezone.utc).astimezone().isoformat()
        logging.info("\\n{} accuracy={:.4f} loss={:.4f} Validation-accuracy={:.4f} Validation-loss={:.4f}"
                     .format(local_time, logs['acc'], logs['loss'], logs['val_acc'], logs['val_loss']))


if __name__ == '__main__':
    train()

모델 컨테이너 이미지 만들기

모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.

다음은 텐서플로우 2.1을 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD mnist-with-log.py /app/

다음 명령어로 “kangwoo/mnist-with-log:katib” 라는 이름으로 컨테이너 이미지를 빌드할 수 있습니다.

docker build -t kangwoo/mnist-with-log:katib.

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 푸시합니다.

docker push kangwoo/mnist-with-log:katib

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

random-tf-event-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: random-file-example
spec:
  metricsCollectorSpec:
    source:
      fileSystemPath:
        path: "/var/log/katib/mnist.log"
        kind: File
    collector:
      kind: File
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: random
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist-with-log:katib
                  imagePullPolicy: Always
                  command:
                  - "python3"
                  - "/app/mnist-with-log.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f random-tf-event-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/2d0f4811-7203-4ef7-bfd2-8dd2123f35d3/Untitled.png

Kubeflow – Katib 하이퍼 파라미터 튜닝

하이퍼 파라미터 및 하이퍼 파라미터 튜닝

하이퍼 파라미터는 모델 학습 프로세스를 제어하는 ​​변수로서, 학습을 수행하기 위해 사전에 설정해야 하는 값들입니다. 예를 든다면 Learning rate, Batch Size, Regularization Strength 등이 있습니다.

하이퍼 파라미터 값은 학습되지 않습니다. 즉, 가중치 같은 학습 매개 변수와는 달리, 모델 학습 프로세스에서 하이퍼 파라미터 값을 조정하지 않습니다. 그래서 휴리스틱한 방법이나 경험 법칙에 의해서 결정하는 경우가 많습니다.

하이퍼 파라미터 튜닝은 최적의 하이퍼 파라미터 값을 탐색하여, 모델의 예측 정확도를 최대화하는 프로세스입니다. 만일 Katib 같은 자동화된 하이퍼 파라미터 튜닝 시스템이 없다면, 최적의 값을 찾기 위해 하이퍼 파라미터를 수동으로 조정하여, 많은 학습 작업을 사림이 직접 실행해야할것

자동화된 하이퍼 파라미터 튜닝 시스템은 대상의 목표 값을 이루기 위한 최적의 변수 값을 찾기 위해서 노력합니다. 일반적으로 모델의 정확성(accuracy)을 대상으로 사용합니다.

예를 들어 Katib의 다음 그래프는 다양한 하이퍼 파라미터 값의 조합 (learning_rate, dropout)에 따른 정확도를 보여 줍니다.

Katib는 Experiment이라 부르는 하이퍼 파라미터 튜닝 작업을 실행합니다. 실행된 Experiment는 Trial 이라고 부르는 학습 작업을 여러번 실행합니다.


random 알고리즘과 job을 이용한 하이퍼 파라미터 튜닝

하이퍼 파라미터 튜닝에 사용할 학습 모델 컨테이너 이미지를 만들어 보겠습니다.

모델 코드 작성하기

mnist 숫자를 판별하는 모델을 텐서플로우 케라스로 작성해 보겠습니다.

  1. 하이퍼 파라메터를 입력 받기 위해서 argparse 라이브러를 이용하였습니다. learning_rate와 dropout 값을 입력할 수 있습니다. parser = argparse.ArgumentParser() parser.add_argument('--learning_rate', default=0.01, type=float) parser.add_argument('--dropout', default=0.2, type=float) args = parser.parse_args()
  2. 케라스의 콜백을 이용해서, 매 에폭(epoch)마다 accuracy, loss, Validation-accuracy 그리고 Validation-loss를 StdOut 으로 출력하도록 하였습니다. Katib의 StdOutCollector를 사용해서 메트릭을 수집할 것이기 때문에, StdOut으로 {{MetricsName}}={{MetricsValue}} 형태로 메트릭을 StdOut 으로 출력하면 됩니다. 그리고 라인의 맨 앞부분에 RFC-3339 형식의 시간을 출력하면, 메트릭의 시간도 같이 수집이 됩니다. katib_metric_log_callback = KatibMetricLog() training_history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=(x_val, y_val), callbacks=[katib_metric_log_callback]) … class KatibMetricLog(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): # RFC 3339 local_time = datetime.now(timezone.utc).astimezone().isoformat() print(“\nEpoch {}”.format(epoch+1)) print(“{} accuracy={:.4f}”.format(local_time, logs[‘acc’])) print(“{} loss={:.4f}”.format(local_time, logs[‘loss’])) print(“{} Validation-accuracy={:.4f}”.format(local_time, logs[‘val_acc’])) print(“{} Validation-loss={:.4f}”.format(local_time, logs[‘val_loss’]))

다음 코드는 텐서플로우 케라스로 작성한 mnist 숫자를 판별하는 모델입니다.

katib-mnist-random-job.py

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import numpy as np
import argparse
from datetime import datetime, timezone

def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', default=0.01, type=float)
    parser.add_argument('--dropout', default=0.2, type=float)
    args = parser.parse_args()

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Reserve 10,000 samples for validation
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(args.dropout),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=args.learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])

    print("Training...")

    katib_metric_log_callback = KatibMetricLog()
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_data=(x_val, y_val),
                                 callbacks=[katib_metric_log_callback])

    print("\\ntraining_history:", training_history.history)

    # Evaluate the model on the test data using `evaluate`
    print('\\n# Evaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)


class KatibMetricLog(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        # RFC 3339
        local_time = datetime.now(timezone.utc).astimezone().isoformat()
        print("\\nEpoch {}".format(epoch+1))
        print("{} accuracy={:.4f}".format(local_time, logs['acc']))
        print("{} loss={:.4f}".format(local_time, logs['loss']))
        print("{} Validation-accuracy={:.4f}".format(local_time, logs['val_acc']))
        print("{} Validation-loss={:.4f}".format(local_time, logs['val_loss']))


if __name__ == '__main__':
    train()

모델 컨테이너 이미지 만들기

모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.

다음은 텐서플로우 2.1을 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD katib-mnist-random-job.py /app/

ENTRYPOINT ["python", "/app/katib-mnist-random-job.py"]

다음 명령어로 “kangwoo/katib-mnist-job:0.0.1” 라는 이름으로 컨테이너 이미지를 빌드할 수 있습니다.

docker build -t kangwoo/katib-mnist-job:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 푸시합니다.

docker push kangwoo/katib-mnist-job:0.0.1

Experiment 생성하기

Katib를 사용하여 하이퍼 파라미터를 자동으로 튜닝하려면 Experiment라는 사용자 리소스를 정의해야합니다. Experiment에는 다음과 같은 내용이 포함되어 있습니다.

  • Objective: 최적화하려는 측정 항목.
  • Search algorithm: 최적의 하이퍼 파라미터를 찾는 데 사용하는는 알고리즘.
  • Configuration about parallelism: 병렬 처리에 대한 구성.
  • Search space: 탐색해야 하는 모든 하이퍼 파라미터의 이름 및 분포 (개별 값 또는 연속 값).
  • Trial Template: Trial을 정의하는 데 사용되는 템플릿.
  • Metrics Collection: 메트릭 수집 방법에 대한 정의

병렬 처리에 대한 구성 : 병렬 처리에 대한 설정할 수 있습니다.

  • parallelTrialCount : 병렬로 처리 할 수 있는 Trial 개수입니다.
  • maxTrialCount : Trial이 실행되는 최대 개수입니다.
  • maxFailedTrialCount : 최대 Trial 실패 개수를 넘으면 experiment은 실패하게 됩니다.
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3

목표 : 최적화하려는 측정 항목을 설정할 수 있습니다.

“Validation-accuracy” 라는 이름의 메트릭의 최대값이 0.99에 도달하는 것을 목표로 합니다. 그리고 추가로 “accuracy” 라는 이름의 메트릭도 같이 수집합니다.

type은 maximize 나 minimize 를 사용할 수 있습니다.

  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy

검색 알고리즘 : 최적의 하이퍼 파라미터를 찾는 데 사용할 알고리즘을 설정할 수 있습니다.

하이퍼 파라미터 튜닝 알고리즘은 “random”을 사용합니다. 알고리즘 이름은 grid, random, hyperband, bayesianoptimization 을 사용할 수 있습니다.

  algorithm:
    algorithmName: random

탐색 공간 : 탐색해야하는 모든 하이퍼 파라미터의 이름과 범위(개별 값 또는 연속 값)에 대해 설정할 수 있습니다.

모델 학습에서 사용할 하이퍼 파라미터 목록입니다. learning_rate와, dropout을 파라미터로 정의합니다.

parameterType은 int, double, categorical 을 사용할 수 있습니다. 파라미터 값은 feasibleSpace, list를 사용할 수 있습니다.

  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"

Trial Template: Trial에서 생성할 Worker Job을 정의할 수 있습니다.

쿠버네티스의 Job을 생성해서 모델 학습 작업을 합니다.

  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/katib-mnist-job:0.0.1
                  command:
                  - "python3"
                  - "/app/katib-mnist-random-job.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never
  1. 메트릭 수집 방법에 대해서 정의합니다. 별도로 정의하지 않을 경우 StdOut 메트릭 수집기가 사용됩니다. metricsCollectorSpec: collector: kind: StdOut

다음은 admin에라는 네임스페이스에 생성할 Experiment 매니페스트입니다.

random-job-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: random-job-example
spec:
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: random
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/katib-mnist-job:0.0.1
                  command:
                  - "python3"
                  - "/app/katib-mnist-random-job.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

Experiment 은 katib UI 화면이나, kubectl을 사용해서 생성할 수 있습니다.

Katib UI 화면에서 Experiment 생성하기

Kubeflow 대시보드의 왼쪽 메뉴에서 Katib를 클릭합니다.

“Hyperparameter Tuning”을 클릭합니다.

YAML File 탭에서 작성한 Experiment 매니페스트를 입력하고, 맨 아래에 있는 DEPLOY 버튼을 클릭하면 Experiment가 생성됩니다.

kubectl을 사용해서 Experiment 생성하기

kubectl을 사용해서 Experiment를 생성할 수 있습니다.

Experiment 매니페스트를 random-job-example.yaml 파일로 저정한 후, 다음 명령어를 사용하면, Experiment 를 생성할 수 있습니다.

kubectl apply -f random-job-example.yaml

Experiment 결과 보기

Katib UI 화면에서 Experiment 결과 보기

Katib UI 화면의 좌측 상단에 있는 메뉴를 선택한 후, HP > Monitor 를 선택하면, Experiment Monitor 화면으로 이동할 수 있습니다.

다음은 Experiment Monitor 화면입니다. 생성한 Experiment 목록을 확인할 수 있습니다.

Experiment 이름을 클릭하면 Trial의 측정 결과가 표시됩니다.

Trial 이름을 클릭하면 측정 항목에 대한 세부 정보가 표시됩니다.

kubectl을 사용해서 Experiment 결과 보기

kubectl 사용해서 결과를 조회 할 수 있습니다.

다음 명령을 사용하면, Trial 측정 결과를 조회 할 수 있습니다. (JSON을 구문 분석 때문에, jq를 설치해야 합니다)

kubectl -n admin get trials -l experiment=random-job-example -o json | jq ".items[] | {assignments: .spec.parameterAssignments, observation: .status.observation}"

{
  "assignments": [
    {
      "name": "--learning_rate",
      "value": "0.08177734351368438"
    },
    {
      "name": "--dropout",
      "value": "0.4439382425122721"
    }
  ],
  "observation": {
    "metrics": [
      {
        "name": "Validation-accuracy",
        "value": 0.9712
      }
    ]
  }
}
{
  "assignments": [
    {
      "name": "--learning_rate",
      "value": "0.13167199355992532"
    },
    {
      "name": "--dropout",
      "value": "0.36691549333903695"
    }
  ],
  "observation": {
    "metrics": [
      {
        "name": "Validation-accuracy",
        "value": 0.9752
      }
    ]
  }
}
...

grid 알고리즘을 이용한 하이퍼 파라미터 튜닝

grid 알고리즘을 사용하여 하이퍼 파리미터 튜닝을 해 보겠습니다. 모델 컨테이너 이미지는 radnom 하이퍼 파라미터 튜닝에서 사용한 kangwoo/mnist:katib 을 그대로 사용하겠습니다.

grid 알고리즘을 사용하려면 algorithmName 필드에 grid 라고 설정하면 됩니다.

algorithm:
    algorithmName: grid

주의해할 점은 categorical 타입의 파라미터는 지원하지 않습니다. 그래서 다음과 같은 파라미터는 사용할 수 없습니다

parameters:
    # Grid doesn't support categorical, refer to <https://chocolate.readthedocs.io/api/sample.html#chocolate.Grid>
    - name: --optimizer
      parameterType: categorical
      feasibleSpace:
        list:
        - sgd
        - adam
        - ftrl

그리고 double 타입의 파라미터를 사용할 때는 step을 정의해줘야 합니다. 값을 얼마만큼의 간격으로 증가시킬지를 지정하는 것입니다. int 타입의 파라미터인 경우에도 setup 값을 정의할 수 있습니다. int 타입인 경우 별도로 정의하지 않으면 기본값인 1일 사용합니다.

parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
        step: "0.01"

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

grid-stdout-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: grid-stdout-example
spec:
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: grid
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
        step: "0.01"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
        step: "0.05"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist:katib
                  command:
                  - "python3"
                  - "/app/mnist.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f grid-stdout-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.


bayesianoptimization 알고리즘을 이용한 하이퍼 파라미터 튜닝

bayesianoptimization 알고리즘을 사용하여 하이퍼 파리미터 튜닝을 해 보겠습니다. 모델 컨테이너 이미지는 radnom 하이퍼 파라미터 튜닝에서 사용한 kangwoo/mnist:katib 을 그대로 사용하겠습니다.

bayesianoptimization 알고리즘을 사용하려면 algorithmName 필드에 bayesianoptimization 라고 설정하면 됩니다. 그리고 algorithmSettings 필드를 사용해서 알고리즘을 설정할 수 있습니다.

algorithm:
    algorithmName: bayesianoptimization
    algorithmSettings:
      - name: "random_state"
        value: "10"

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

bayesianoptimization-stdout-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: bayesianoptimization-stdout-example
spec:
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: bayesianoptimization
    algorithmSettings:
      - name: "random_state"
        value: "10"
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist:katib
                  command:
                  - "python3"
                  - "/app/mnist.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f bayesianoptimization-stdout-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.


hyperband 알고리즘을 이용한 하이퍼 파라미터 튜닝

hyperband 알고리즘을 사용하여 하이퍼 파리미터 튜닝을 해 보겠습니다. 모델 컨테이너 이미지는 radnom 하이퍼 파라미터 튜닝에서 사용한 kangwoo/mnist:katib 을 그대로 사용하겠습니다.

r_l and resource_name must be set.

r_l must be a positive float number.

if "eta" in setting_dict:
            eta = int(float(setting_dict["eta"]))
            if eta <= 0:
                eta = 3
        else:
            eta = 3


smax = int(math.log(rl)/math.log(eta))
        max_parallel = int(math.ceil(eta**smax))
        if request.experiment.spec.parallel_trial_count < max_parallel:
            return self._set_validate_context_error(context,
                                                    "parallelTrialCount must be not less than %d." % max_parallel)

parallel_trial_count 는 eta**log(rl)/log(eta) 값보다 커야 합니다.

hyperband 알고리즘을 사용하려면 algorithmName 필드에 hyperband 라고 설정하면 됩니다. 그리고 algorithmSettings 필드를 사용해서 알고리즘을 설정할 수 있습니다.

algorithm:
    algorithmName: hyperband
    algorithmSettings:
      - name: "resource_name"
        value: "--num-epochs"
      - name: "eta"
        value: "3"
      - name: "r_l"
        value: "9"

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

hyperband-stdout-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: hyperband-stdout-example
spec:
  parallelTrialCount: 9
  maxTrialCount: 9
  maxFailedTrialCount: 9
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: hyperband
    algorithmSettings:
      - name: "resource_name"
        value: "--epochs"
      - name: "eta"
        value: "3"
      - name: "r_l"
        value: "9"
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
    - name: --epochs
      parameterType: int
      feasibleSpace:
        min: "10"
        max: "10"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist:katib
                  command:
                  - "python3"
                  - "/app/mnist.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f hyperband-stdout-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.


tpe 알고리즘을 이용한 하이퍼 파라미터 튜닝

tpe 알고리즘을 사용하여 하이퍼 파리미터 튜닝을 해 보겠습니다. 모델 컨테이너 이미지는 radnom 하이퍼 파라미터 튜닝에서 사용한 kangwoo/mnist:katib 을 그대로 사용하겠습니다.

tpe 알고리즘을 사용하려면 algorithmName 필드에 tpe 라고 설정하면 됩니다.

algorithm:
    algorithmName: tpe

Experiment 생성하기

Experiment라는 사용자 리소스를 정의합니다. metricsCollectorSpec 필드에 filter가 추가되어 있습니다.

tpe-stdout-example.yaml

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: admin
  name: tpe-stdout-example
spec:
  parallelTrialCount: 1
  maxTrialCount: 12
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
  algorithm:
    algorithmName: tpe
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.01"
        max: "0.2"
    - name: --dropout
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.5"
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kangwoo/mnist:katib
                  command:
                  - "python3"
                  - "/app/mnist.py"
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never

정의한 Experiment 사용자 리소스를 쿠버네티스 클러스터에 생성합니다.

kubectl apply -f tpe-stdout-example.yaml

Experiment 결과 보기

Katib UI를 통해서 다음과 같은 결과를 확인할 수 있습니다.

이전 : Kubeflow – Katib 소개

다음 : Kubeflow – Katib : Metrics Collector

Kubeflow – Katib 소개

Katib 살펴보기

Katib는 Kubeflow 컴포넌트로서, 하이퍼 파라미터(Hyperparameter) 튜닝 및 신경망 아키텍처 탐색(Neural Architecture Search)을 위한 쿠버네티스 기반의 시스템입니다. Katib는 TensorFlow, PyTorch, Apache MXNet, XGBoost 등 다양한 머신러닝 프레임워크를 지원합니다.

Kubeflow의 컴포넌트인 Katib 시스템에 대해서 이해하고, 하이퍼 파라미터 튜닝하는 방법에 대해서 알아 보도록 하겠습니다. 예제에 사용한 Katib 버전은 0.8 입니다.

Katib의 개념

Katib에는 실험(Experiment), 제안(Suggestion), 시도(Trial) 및 작업(Job) 이라는 개념이 있습니다.

Experiment

Experiment 란 목표로 하는 대상 값을 찾기 위해서, 하이퍼 파라미터 값들을 찾는 일련의 탐색 작업을 의미합니다. Experiment에는 다음과 같은 구성 요소가 포함되어 있습니다

  • 목표 (Objective) : 하이퍼 파라미터 튜닝 작업 통해서, 이루고자 하는 목표를 정의해야 합니다. 예를 든다면, 모델의 정확성(accuracy)의 최대값을 0.91 로 목표로 한다고 정의할 수 있습니다
  • 탐색 범위 (Search Space) : 하이퍼 파라미터 튜닝 작업시 사용해야 할 모든 하이퍼 파라미터 값과 하이퍼 파라미터의 제약 조건을 정의해야 합니다. 예를 든다면, Learning rate는 0.1부터 0.5까지의 값을 사용하고, optimizer는 sgd와 adam을 사용한다고 정의할 수 있습니다.
  • 탐색 알고리즘 (Search Algorithm) : 하이퍼 파라미터 튜닝 작업시 사용할 알고리즘을 정의해야 합니다. Random Search, Grid Search, Bayesian Optimization 등 다용한 알고리즘을 사용할 수 있습니다.

Katib를 사용해서 최적의 하이퍼 파라미터를 찾으려면, Experiment 라는 사용자 리소스를 생성하면 됩니다.

Suggestion

Katib는 각 Experiment 사용자 리소스 마다 하나의 Suggestion 사용자 리소스를 생성합니다. Suggestion 은 탐색 알고리즘이 제안한 하이퍼 파라미터 값들의 세트를 가지고 있습니다. Katib 는 제안된 하이퍼 파라미터 값들을 세트별로 평가하기 위한 Trial을 작성합니다.

Trial

Trial은 제안된 하이퍼 파리미터 값들을 평가하기 위한 하나의 작업을 의미하는 사용자 리소스입니다. 제안된 매개 변수 값들을 Woker Job 인스턴스 할당해서 실행합니다.

Experiment 는 여러 번의 Trial을 수행합니다. Experiment 는 목표나 설정한 최대 시도 횟수에 도달 할 때까지 Trial 을 계속 실행합니다.

Worker Job

Worker Job은 Trial을 평가하고 목표 값을 계산하는 프로세스를 의미합니다. 제안된 하이퍼 파라미터 값들을 넘겨 받아서 실제로 모델을 학습하게 됩니다.

다음은 사용 가능한 Worker Job의 유형입니다.

  • Kubernetes Job
  • Kubeflow TFJob (분산 처리 지원)
  • Kubeflow PyTorchJob (분산 처리 지원)

Metrics Collector

하이퍼 파라미터 튜닝 작업 통해서, 목표로 하는 대상 값을 찾기 위해서는 대상 값을 수집하고 저장해야 합니다. Katib에서는 이러한 메트릭들을 저장하기 위해서 Metrics Collector를 사용합니다.

Job, TFJob, PytorchJob 등과 같은 실제 모델 학습을 진행하는 포드가 실행 될 때, 학습에 관련된 결과 값들을 수집하기 위해서 Metrics Collector가 포함된 Collector 컨테이너를 사이드카로 포드에 주입합니다. Collector 컨테이너는 메트릭 소스의 구문을 분석하여, Worker 컨테이너의 메트릭을 수집하고 Katib-manager의 katib-db 와 같은 영구 저장소에 메트릭을 저장합니다.

Katib에서 지원하고 있는 Metrics Collector는 다음과 같습니다.

  • StdOut : 운영 체제의 기본 출력인 StdOut으로 출력되는 메트릭을 수집합니다. 별도의 수집기를 정의하지 않으면 StdOut가 사용됩니다.
  • File : 지정한 파일을 이용해서 메트릭을 수집합니다. source 필드에 경로를 지정해야합니다.
  • TensorFlowEvent : 지정한 디렉토리에 저장된 tf.Event 를 이용해서 메트릭을 수집합니다. 현재는 텐서플로우 1 버전만 지원합니다. source 필드에 경로를 지정해야합니다.
  • Custom : 사용자가 정의한 메트릭 수집기를 사용합니다.
  • None : Katib의 메트릭 수집기를 사용하지 않을 때 사용합니다.

탐색 알고리즘

Katib에서 제공하는 탐색 알고리즘은 다음과 같습니다.

Hyperparameter Tuning

  • Grid Search (grid) : 그리드 탐색은 하이퍼 파라미터 최적화를 수행하는 전통적인 방법 중 하나로서, 하이퍼 파라미터 공간에서 수동으로 지정한 하위 집합을 모두 조합해서 전부 탐색하는 것을 말합니다. 이러한 작업은 학습 세트에 대한 교차 검증(cross-validation)이나 보류(held-out) 된 검증 세트에 대한 평가에 따라 진행됩니다. 균등한 공간의 시작점들로부터 시작해서, 이 점들의 목적 함수 값(objective functions)을 계산하여 최적의 조합을 선택하게 됩니다. 그리드 탐색은 모든 가능성에 대해 탐색을 수행하기 때문에, 중간 규모의 문제에 대해서도 탐색 프로세스를 매우 길게 만듭니다. 그래서 그리드 탐색은 만들어낼 수 있는 파라미터들의 탐색 조합이 적은 경우에만 유용하게 사용할 수 있습니다.
  • Random Search (random) : 무작위 탐색은 그리드 탐색의 대안으로서, 조합할 수 있는 파라미터의 수가 많을 때 사용하면 좋습니다. 무작위 탐색은 무작위로 파라미터를 선택하여 조합을 만들어냅니다. 하이퍼 파라미터 공간에서 수동으로 하위 집합을 지정할 필요가 없기 때문에 간단하게 적용 할 수 있습니다. 그렇기 때문에 무작위 탐색은 모든 가능성에 대한 탐색이 불가능할 때 사용하기 좋은 알고리즘입니다. Katib는 hyperopt 라는 최적화 프레임워크를 사용해서 무작위 탐색 알고리즘을 지원합니다.
  • Tree of Parzen Estimators (tpe) : Katib 는 hyperopt 를 사용해서 Tree of Parzen Estimators (TPE) 알고리즘을 지원합니다 . 이 방법은 “정방향 및 역방향 그라디언트 기반” 탐색을 제공합니다.
  • Hyperband (hyperbadn): 하이퍼밴드는 반복 알고리즘을 조정하는 비교적 새로운 방법으로서, 최적화 탐색 속도에 중점을 두었습니다. 리소스 할당을 최적화하여 평가 할 수 있는 조합의 수를 최대화 합니다. 그래서 빠르게 목적에 도달해서 해서 조기 중지(early stopping)에 이르게 하고 있습니다.
  • Bayesian Optimization (skopt-bayesian-optimization) : ‘베이지안 최적화’방법은 가우시안 프로세스 회귀를 사용하여 탐색 공간을 모델링합니다. 이 기법은 탐색 공간의 모든 지점에서 손실 함수의 추정치와 해당 추정치의 불확실성을 계산합니다. 즉, 현재 모델을 기반으로 유망한 하이퍼 파라미터 구성을 반복적으로 평가해서, 최적의 위치에 대한 정보를 나타내는 관측치를 수집하는 등의 확률적 추정 결과를 바탕으로 최적의 값을 찾습니다. 이 방법은 탐색 공간의 차원 수가 적은 경우에 적합합니다. 이 방법은 예상 손실과 불확실성을 모두 모델링하므로 탐색 알고리즘이 몇 단계로 수렴되므로 , 매개 변수 구성 평가를 완료하는 데 시간이 오래 걸릴 경우 사용하면 좋습니다. Katib는  Scikit-Optimize (skopt) 라는 라이브러리를 사용해서 베이지안 탐색을 지원합니다.

Neural Architecture Search

Katib 구성 요소

Katib는 다음과 같은 구성 요소로 이루어져 있습니다.

  • katib-ui : 하이퍼 파라미터 튜닝을 실행하고 관리하기 위한 사용자 인터페이스 (UI).
  • katib-controller : Katib 사용자 리소스를 제어하기 위한 쿠버네티스 컨트롤러.
  • katib-db-manager: DB 인터페이스인 Katib의 GRPC API 서버.
  • katib-mysql : Katib의 데이터를 저장하기 위한 MySql 데이터베이스.

Katib UI 접속하기

Katib 사용자 인터페이스를 사용하면, Experiment 을 제출하고 결과를 조회 해 볼 수 수 있습니다.

다음은 Kubeflow 에 있는 Katib UI 화면입니다.

Kubeflow 대시보드 화면의 왼쪽 메뉴에서 Katib를 클릭하면 접속할 수 있습니다.

다음 : Kubeflow – Katib 하이퍼 파라미터 튜닝