순차적 처리를 하는 파이프 라인 만들기
파이프 라인을 구성하고 실행하기
순차적으로 작업을 하는 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과를 출력하는 단계로 구성 되어 있습니다.
flip_coin_op() 는 동전을 던지는 단계입니다. 파이썬 코드로 작성되었습니다. 랜덤 함수를 사용해서 0이면 앞면(heads), 1이면 뒷면(tails) 이라고 출력합니다. 그리고 출력 결과를 /tmp/output 파일에 저장합니다. 결과가 저장된 /tmp/output 파일을 다음 단계에서 사용하기 위해 file_outputs 으로 정의합니다.
def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} )
print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 단순히 입력 받은 메시지를 echo 명령어를 사용하여 화면에 출력합니다.
def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], )
DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 다음 print_op()가 실행됩니다. 그리고 flip_coin_op()의 출력 결과를 print_op()의 입력 파라미터로 사용하고 있습니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Sequential pipeline', description='A pipeline with two sequential steps.' ) def sequential_pipeline(): """A pipeline with two sequential steps.""" flip = flip_coin_op() print_op(flip.output) if __name__ == '__main__': kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} ) def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], ) @dsl.pipeline( name='Sequential pipeline', description='A pipeline with two sequential steps.' ) def sequential_pipeline(): """A pipeline with two sequential steps.""" flip = flip_coin_op() print_op(flip.output) if __name__ == '__main__': kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sequential pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.
Graph 탭에서 flip-coin 단계를 선택하고, Input/Output 탭을 클릭하면 출력 결과를 볼 수 있습니다.

조건에 의해서 분기를 하는 파이프 라인 만들기
파이프 라인을 구성하고 실행하기
조건에 의해서 분기를 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과에 따러 “승리” 또는 “패배” 를 출력하는 단계로 구성 되어 있습니다.
flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.
def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} )
print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.
def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], )
DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 결과에 따라서 print_op()를 사용해서 “승리” 또는 “패배”를 출력하게 됩니다. 동전이 앞면인지 뒷면이지 판단하기 위해서 dsl.Condition() 을 사용하였습니다. 앞면이면 ‘YOUT WIN’을 출력하고, 뒷면이면 ‘YOU LOSE’를 출력합니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Conditional execution pipeline', description='Shows how to use dsl.Condition().' ) def condition_pipeline(): flip = flip_coin_op() with dsl.Condition(flip.output == 'heads'): print_op('YOUT WIN') with dsl.Condition(flip.output == 'tails'): print_op('YOU LOSE') if __name__ == '__main__': kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} ) def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], ) @dsl.pipeline( name='Conditional execution pipeline', description='Shows how to use dsl.Condition().' ) def condition_pipeline(): flip = flip_coin_op() with dsl.Condition(flip.output == 'heads'): print_op('YOUT WIN') with dsl.Condition(flip.output == 'tails'): print_op('YOU LOSE') if __name__ == '__main__': kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Condition pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

병렬 루프를 사용하는 파이프 라인 만들기
파이프 라인을 구성하고 실행하기
정적 항목 세트에 대한 병렬 루프를 사용하는 파이프 라인을 만들어 보겠습니다. 동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.
flip_coins_op() 는 동전을 여러번 던지는 단계입니다. 동전을 던진 결과 값을 json 형식으로 저정합니다.
def flip_coins_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import json; import sys; import random; ' 'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], ' 'open(\\'/tmp/output.json\\', \\'w\\'))"'], file_outputs={'output': '/tmp/output.json'} )
print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.
def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], )
DSL을 사용하여 파이프 라인을 구성합니다. flip_coins_op()가 먼저 실행되고, 그 결과가 json 형식으로 반환됩니다. json에 포함된 아이템의 개수 만큼 print_op() 를 실행합니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Loop pipeline', description='A pipeline with parallel loop' ) def loop_pipeline(): flips = flip_coins_op() with dsl.ParallelFor(flips.output) as item: print_op(item)
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl def flip_coins_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import json; import sys; import random; ' 'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], ' 'open(\\'/tmp/output.json\\', \\'w\\'))"'], file_outputs={'output': '/tmp/output.json'} ) def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], ) @dsl.pipeline( name='Loop pipeline', description='A pipeline with parallel loop' ) def loop_pipeline(): flips = flip_coins_op() with dsl.ParallelFor(flips.output) as item: print_op(item) if __name__ == '__main__': kfp.compiler.Compiler().compile(loop_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Loop pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Loop pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Exit Handler를 사용하는 파이프 라인 만들기
파이프 라인을 구성하고 실행하기
Exit Handler 사용하는 파이프 라인을 만들어 보겠습니다. Exit Handler를 사용하면 작업 그룹이 종료 될 때 지정한 작업을 실행시킬 수 있습니다. 작업 그룹의 성공 여부와는 상관 없이 무조건 지정한 작업이 실행됩니다.
동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.
flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.
def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} )
print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.
def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], )
DSL을 사용하여 파이프 라인을 구성합니다. dsl.ExitHandler()를 사용해서, 작업 그룹이 종료될때 실행할 작업을 지정할 수 있습니다. 작업 그룹은 flip_coin_op() 과 print_op(flip.output) 으로 구성되어 있습니다. 작업 그룹에 속한 작업들이 종료되면 dsl.ExitHandler(exit_op)에 지정한 exit_op 작업이 실행됩니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Exit handler', description=' The exit handler will run after the pipeline finishes (successfully or not)' ) def sequential_pipeline(): exit_op = print_op('Exit') with dsl.ExitHandler(exit_op): flip = flip_coin_op() print_op(flip.output)
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
exit_handler.py
import kfp from kfp import dsl def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} ) def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], ) @dsl.pipeline( name='Exit handler', description=' The exit handler will run after the pipeline finishes (successfully or not)' ) def sequential_pipeline(): exit_op = print_op('Exit') with dsl.ExitHandler(exit_op): flip = flip_coin_op() print_op(flip.output) if __name__ == '__main__': kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Exit Handler', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Exit Handler” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

파이프 라인 파라미터 사용하기
파이프 라인을 구성하고 실행하기
파라미터를 입력 받아서, 파이프 라인을 실행할 수 있는 파이프 라인을 만들어 보겠습니다.
파이프 라인을 실행할 때 앞면(heads) 또는 뒷면(tails)을 파라미터로 입력받아서, 동전을 던진 결과와 같으면 승링를 다르면 패배를 출력하게 하였습니다.
flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.
def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} )
print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.
def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], )
DSL을 사용하여 파이프 라인을 구성합니다. 파이프 라인 함수에 predict 파라미터를 입력 받을 수 있도록 추가하였습니다. 파이프 라인 함수가 호출되면 각 파라미터는 PipelineParam 객체가 됩니다.
@dsl.pipeline( name='Pipeline parameters', description='Pipeline parameters' ) def condition_pipeline( predict : str = 'heads'): flip = flip_coin_op() with dsl.Condition(flip.output == predict): print_op('YOU WIN') with dsl.Condition(flip.output != predict): print_op('YOU LOSE')
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다. 파이프 라인 실행 시 파라미터를 넘기기 위해서, params 파라미터를 사용하였습니다.
if __name__ == '__main__': kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})
파라미터는 Kubeflow Pipelines UI를 통해서 실행할 때도 입력할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl from kfp.dsl import PipelineParam def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} ) def print_op(msg): """Print a message.""" return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['echo', msg], ) @dsl.pipeline( name='Pipeline parameters', description='Pipeline parameters' ) def condition_pipeline( predict : str = 'heads'): flip = flip_coin_op() with dsl.Condition(flip.output == predict): print_op('YOU WIN') with dsl.Condition(flip.output != predict): print_op('YOU LOSE') if __name__ == '__main__': kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Pipeline parameters” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.
