Kubeflow Pipelines – 경량 파이썬 컴포넌트

파이썬 코드를 이용한 경량 컴포넌트를 사용하기

경량 파이썬 컴포넌트는 생성한 코드를 컨테이너 이미지로 빌드 하지 않아도 됩니다. 그래서 빠르게 반복하여 코드를 변경하는 경우 유용하게 사용할 수 있습니다.

경량 파이썬 컴포넌트

경량 컴포넌트를 빌드하려면, 먼저 독립형 파이썬 함수를 정의해야 합니다. 그런 다음 kfp.components.func_to_container_op()를 호출하여 파이썬 함수를 파이프 라인에서 사용할 수 있는 컴포넌트로 변환해야 합니다

경량 컴포넌트를 만들기 위해서는 몇 가지 제약 사항이 있습니다.

  • 파이썬 함수의 기능은 독립적이어야 합니다.
    • 정의한 함수 외부에서 선언한 코드를 사용해서는 안됩니다.
    • import 는 함수 내부에 선언해야합니다.
  • import 는 기본 이미지에서 사용 가능한 패키지만 가져올 수 있습니다.
    • 사용할 패키지가 기본 이미지에 없는 경우에는, 해당 패키지가 포함된 이미지를 사용해야 합니다.
  • 파이썬 함수의 파리미터로 숫자를 사용하려면, 파라미터에 타입 힌트가 있어야 합니다. 지원되는 타입은 int, float, bool입니다. 모든 파라미터는 문자열로 전달됩니다.
  • 출력값을 여러개 사용하려면, 파이썬의 typing.NamedTuple 타입 힌트를 사용해야합니다.

kfp.components.func_to_container_op(func)

이 함수는 입력 받은 파이썬 함수를 컴포넌트로 변환합니다. 변환한 컴포넌트는 파이프라인에서 사용할 수 있습니다.

def func_to_container_op(func, output_component_file=None, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False):

전달 인자

다음은 func_to_container_op()에서 사용하는 주요 전달 인자입니다.

  • base_image : 컴포넌트에서 사용할 기본 컨테이너 이미지입니다. 경량 컴포넌트의 경우 이미지에는 파이썬 3.5 이상이 설치 되어 있어야합니다. 기본값은 tensorflow/tensorflow:1.13.2-py3 입니다. (선택)
  • output_component_file: 컴포넌트 정의를 로컬 파일에 작성합니다. 이 파일은 공유할 때 사용할 수 있습니다. (선택)
  • packages_to_install: 사용자의 함수를 실행하기 전에 설치할 파이썬 패키지 목록입니다. (선택)

경량 파이썬 컴포넌트 만들기

하나의 값을 출력하는 경량 파이썬 컴포넌트 만들기

먼저 컴포넌트로에서 사용할 파이썬 함수를 작성합니다. 입력된 두 개의 값을 더한 값을 반환하는 간단한 함수입니다.

def add(a: float, b: float) -> float:
    return a + b

func_to_container_op() 호출하여 파이썬 함수를 컴포넌트로 변환합니다. 별도의 컨테이너 이미지를 지정하지 않으면, 기본 이미지인 tensorflow/tensorflow:1.13.2-py3 를 사용합니다.

add_op = comp.func_to_container_op(add)

DSL을 사용하여 파이프 라인을 구성합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 바로 실행하였습니다.

def lightweight_component_pipeline(a='10', b='20'):
    add_task = add_op(a, b)
    print_text(add_task.output)


if __name__ == '__main__':
    arguments = {'a': '1000', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(lightweight_component_pipeline, arguments=arguments, experiment_name='Basic Experiment')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

lightweight_component.py

import kfp
from kfp import dsl
from kfp import components
from kfp.components import func_to_container_op, InputPath, OutputPath


@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')


def add(a: float, b: float) -> float:
    return a + b


add_op = components.func_to_container_op(add)


def lightweight_component_pipeline(a='10', b='20'):
    add_task = add_op(a, b)
    print_text(add_task.output)


if __name__ == '__main__':
    arguments = {'a': '1000', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(lightweight_component_pipeline, arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


여러개의 값을 출력하는 경량 파이썬 컴포넌트 만들기

먼저 컴포넌트로에서 사용할 파이썬 함수를 작성합니다. 입력된 두 개의 값을 더한 값과 곱한 값 두 개를 반환하는 함수입니다. 출력값이 여러개 이기 때문에 파이썬의 typing.NamedTuple 타입 힌트를 사용해야합니다.

def add_multiply_two_numbers(a: float, b: float) \\
        -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
    return (a + b, a * b)

func_to_container_op() 함수를 호출하여 컴포넌트를 로드합니다. 별도의 컨테이너 이미지를 지정하지 않으면, 기본 이미지인 tensorflow/tensorflow:1.13.2-py3 를 사용합니다.

add_multiply_two_numbers_op = comp.func_to_container_op(add_multiply_two_numbers)

DSL을 사용하여 파이프 라인을 구성합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 바로 실행하였습니다.

@dsl.pipeline(
    name='Multiple outputs pipeline',
    description='A pipeline to showcase multiple outputs.'
)
def multiple_outputs_pipeline(a='10', b='20'):
    add_multiply_task = add_multiply_two_numbers_op(a, b)
    print_op('sum={}, product={}'.format(add_multiply_task.outputs['sum'],
                                         add_multiply_task.outputs['product']))


if __name__ == '__main__':
    arguments = {'a': '3', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(multiple_outputs_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

multiple_outputs.py

import kfp
from kfp import dsl
import kfp.components as comp
from typing import NamedTuple


def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


def add_multiply_two_numbers(a: float, b: float) \\
        -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
    return (a + b, a * b)


add_multiply_two_numbers_op = comp.func_to_container_op(add_multiply_two_numbers)


@dsl.pipeline(
    name='Multiple outputs pipeline',
    description='A pipeline to showcase multiple outputs.'
)
def multiple_outputs_pipeline(a='10', b='20'):
    add_multiply_task = add_multiply_two_numbers_op(a, b)
    print_op('sum={}, product={}'.format(add_multiply_task.outputs['sum'],
                                         add_multiply_task.outputs['product']))


if __name__ == '__main__':
    arguments = {'a': '3', 'b': '4'}
    my_run = kfp.Client().create_run_from_pipeline_func(multiple_outputs_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


경량 파이썬 컴포넌트에서 데이터 입력과 출력

컴포넌트에는 입력 및 출력이 있습니다. 한 작업의 출력값을 다른 작업의 입력값에 전달하여 데이터 입력과 출력을 연결하여 컴포넌트의 작업간에 데이터를 생성하고 소비하는 것입니다. 파이썬 함수를 컴포넌트로 변환하여 사용할 때, 입력 및 출력 데이터를 생성하고 소비하는 방법에 대해서 알아 보겠습니다.

작은 데이터

작은 데이터는 프로그램의 명령 줄 인수로 쉽게 전달할 수있는 데이터입니다. 작은 데이터 크기는 몇 킬로바이트를 초과하지 않아야 합니다. 숫자나, 작은 문자열, 그리고 URL 같은 것을 예로 들 수 있습니다. 작은 리스트, 사전 및 JSON 구조는 사용해도 괜찮지만, 데이터의 크기가 크다면 파일 기반의 데이터 전달 방법을 사용하는 것이 좋습니다.

작은 출력 데이터들은 문자열로 직렬화 됩니다. 그리고 입력 데이터로 전달할때 역직렬화 됩니다. 예를 들어 일반적인 유형인 str, int, float, bool, list, dict 유형들은 내장된 직렬화/역직렬화 변환가 있습니다. 하지만 다른 유형의 데이터를 사용하다면, 수동으로 직렬화 해야합니다. 유형 어노테이션을 올바르게 지정하지 않으면, 자동으로 직렬화 되지 않고 문자열로 전달합니다.

큰 데이터

큰 데이터는 파일을 통해서 전달합니다. 출력 데이터를 파일로 쓰고, 다른 컴포넌트를 파일을 읽어서 입력 데이터로 사용합니다. 입력 및 출력 파일의 경로는 시스템에 의해 결정되며, 함수에는 문자열 형태로 전달됩니다.

파일을 통해서 데이터를 전달하려면, InputPathOutputPath 파라미터 어노테이션을 사용하며 됩니다.

InputPath 파라미터 어노테이션은 함수가 해당 입력 데이터를 파일로 소비하고 싶다고 시스템에 알려줍니다. 시스템은 데이터를 다운로드하여 로컬 파일에 쓴 다음, 해당 파일의 경로(path)를 함수에 전달합니다.

OutputPath 파라미터 어노테이션은 함수가 해당 출력 데이터를 파일로 생성하고 싶다고 시스템에 알려줍니다. 시스템은 함수가 데이터를 출력해야하는 파일의 경로(paht) 준비해서 전달해줍니다. 함수가 종료되면 시스템은 스토리지에 데이터를 업로드하여 다운 스트림 컴포넌트로 전달합니다.

InputPathOutputPath 에 타입을 지정하여 소비/생산 데이터의 유형을 지정할 수 있습니다. 유형은 파이썬 유형이거나, 임의의 문자열일 수 있습니다. OutputPath('TFModel')은 함수가 파일에 쓴 데이터의 유형이 TFModel 임을 나타냅니다. InputPath('TFModel') 은 함수가 파일에서 읽는 데이터의 유형이 TFModel 을 나타냅니다. 파이프라인 작성자가 입력과 출력을 연결하면, 시스템이 해당 유형이 일치하는 확인합니다.

일반적으로 함수가 컴포넌트로 변환될 때 입력 및 출력 이름은 파라미터 이름을 따르지만, _path_file 같은 접미사가 사용된 경우에는 이름에서 제거됩니다. 예를들어 파라미터 가 number_file_path: InputPath(int) 인 경우에는 입력은 number: int 로 바뀌게 됩니다. 이것은 인자 전달을 보다 자연스럽게 보이게 하기 위함입니다.

InputPathOutputPath 사용하기

write_numbers() 함수는 OutputPath 어노테이션을 사용하여, 시스템에게 출력 데이터를 파일로 쓴다는 것을 알려줍니다. 파일을 저장할 경로인 numbers_path 를 시스템에게 전달받아서 생성한 숫자를 파일로 씁니다.

@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 1, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, start + count):
            writer.write(str(i) + '\\n')

sum_multiply_numbers() 함수는 InputPath 어노테이션을 사용하여, 시스템으로 부터 입력 데이터를 파일로 전달 받습니다. 입력 데이터가 저장된 파일의 경로인 numbers_path 를 전달받아서, 데이터를 읽어옵니다. 입력 데이터는 숫자가 저장되어 있습니다. 읽어온 숫자를 모두 더하고, 곱한 후에 OutputPath 어노테이션을 사용하여 파일로 저장합니다.

@func_to_container_op
def sum_multiply_numbers(
        numbers_path: InputPath(str),
        sum_path: OutputPath(str),
        product_path: OutputPath(str)):

    sum = 0
    product = 1
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
            product = product * int(line)
    with open(sum_path, 'w') as writer:
        writer.write(str(sum))
    with open(product_path, 'w') as writer:
        writer.write(str(product))

print_text() 함수는 InputPath 어노테이션을 사용하여, 시스템으로 부터 입력 데이터를 파일로 전달 받아서 화면에 출력합니다.

@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')

컴포넌트들을 사용하여, 파이프라인을 구성합니다. 앞서 얘기 했듯이, 함수가 컴포넌트로 변환될 때 입력 및 출력 이름은 파라미터 이름을 따르지만, _path_file 같은 접미사가 사용된 경우에는 이름에서 제거됩니다. sum_multiply_numbers() 함수에서 출력 파라미터 이름은 sum_pathproduct_path 였지만 출력에서 값을 가지고 오기 위해 사용한 이름은 sumproduct 입니다. 즉 _path 접미사가 이름에서 제거되었습니다.

def python_input_output_pipeline(count='10'):
    numbers_task = write_numbers(count=count)
    sum_multiply_task = sum_multiply_numbers(numbers_task.output)

    print_text(sum_multiply_task.outputs['sum'])
    print_text(sum_multiply_task.outputs['product'])

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

python_input_output_pipeline.py

import kfp
from kfp.components import func_to_container_op, InputPath, OutputPath


@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 1, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, start + count):
            writer.write(str(i) + '\\n')


@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')


@func_to_container_op
def sum_multiply_numbers(
        numbers_path: InputPath(str),
        sum_path: OutputPath(str),
        product_path: OutputPath(str)):

    sum = 0
    product = 1
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
            product = product * int(line)
    with open(sum_path, 'w') as writer:
        writer.write(str(sum))
    with open(product_path, 'w') as writer:
        writer.write(str(product))


def python_input_output_pipeline(count='10'):
    numbers_task = write_numbers(count=count)
    sum_multiply_task = sum_multiply_numbers(numbers_task.output)

    print_text(sum_multiply_task.outputs['sum'])
    print_text(sum_multiply_task.outputs['product'])


~~i~~f __name__ == '__main__':
    arguments = {'count': '10'}
    my_run = kfp.Client().create_run_from_pipeline_func(python_input_output_pipeline,
                                                        arguments=arguments, experiment_name='Basic Experiment')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “python_input_output_pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다