KFServing InferenceService 배포와 예측 – ONNX Model with ONNX Runtime

ONNX를 사용하는 InferenceService

학습이 완료된 ONNX 모델을 저장 한 경우, KFServing에서 제공하는 ONNX 서버를 사용하여 간단히 배포 할 수 있습니다.

모델 생성

ONNX 서버를 테스트하려면 먼저 ONNX 모델을 생성해야 합니다.

파이토치로 학습한 모델을 ONNX 모델로 저장하겠습니다. 파이토치는 동적 그래프를 사용하므로, ONNX 로 export 할때 신경망 계산을 실제로 한 번 실행해야 합니다. 이를 위해서 실제 이미지 데이터 대신, 차원이 동일한 더미 데이터를 사용할 수도 있습니다.

앞서 PyTorch 에서 저장한 모델을 읽어와서 ONNX 모델로 저장하는코드를 작성해 보겠습니다. 파이토치의 모델 저장 경로는 /mnt/pv/models/pytorch/cifar10/model.pt 입니다.

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.onnx as onnx


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/onnx/cifar10', type=str)
    args = parser.parse_args()

    model_path = args.model_path
    if not (os.path.isdir(model_path)):
        os.makedirs(model_path)

    model_file = os.path.join(model_path, 'model.onnx')

    net = Net()
    state_dict = torch.load("/mnt/pv/models/pytorch/cifar10/model.pt")
    net.load_state_dict(state_dict)
    net.eval()

    x = torch.empty(1, 3, 32, 32)
    torch.onnx.export(net, x, model_file)

생성 된 모델을 사용하여 ONNX 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

cifa10 이미지를 분류하는 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

먼저 학습이 완료된 모델을 불러옵니다. 모델을 변환하기 전에 모델을 추론 모드로 바꾸기 위해서 torch_model.eval() 을 호출합니다. 모델을 변환하기 위해서는 torch.onnx.export() 함수를 호출합니다. 이 함수는 모델을 실행하여 어떤 연산자들이 출력값을 계산하는데 사용되었는지를 기록해줍니다. export 함수가 모델을 실행하기 때문에, 직접 텐서를 입력값으로 넘겨주어야 합니다. 이 텐서의 값은 알맞은 자료형과 모양이라면 더미 데이터를 사용해도 상관없습니다.

ipytorch_cifar10.py

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.onnx as onnx


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/onnx/cifar10', type=str)
    args = parser.parse_args()

    model_path = args.model_path
    if not (os.path.isdir(model_path)):
        os.makedirs(model_path)

    model_file = os.path.join(model_path, 'model.onnx')

    net = Net()
    state_dict = torch.load("/mnt/pv/models/pytorch/cifar10/model.pt")
    net.load_state_dict(state_dict)
    net.eval()

    x = torch.empty(1, 3, 32, 32)
    torch.onnx.export(net, x, model_file, input_names=['input1'], output_names=['output1'])

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, torchtorchvision 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install torch torchvision

RUN mkdir -p /app
ADD onnx_cifar10.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'onnx-cifar10-job-{uuid.uuid4().hex[:4]}'

command = ["python", "onnx_cifar10.py", "--model_path", "/mnt/pv/models/onnx/cifar10"]
output_map = {
    "Dockerfile": "Dockerfile",
    "onnx_cifar10.py": "onnx_cifar10.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="onnx-cifar10",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

PyTorch을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 pytorch 을 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/onnx/cifar10/ 이므로, pvc://kfserving-models-pvc/models/onnx/cifar10/model.onnx 라고 지정해 줍니다.

onnx.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "onnx-cifar10"
spec:
  default:
    predictor:
      onnx:
        storageUri: "pvc://kfserving-models-pvc/models/onnx/cifar10/model.onnx"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f onnx.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME              URL                                                                  READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
onnx-cifar10      <http://onnx-cifar10.admin.example.com/v1/models/onnx-cifar10>         True    100                                56s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

cifar10-input.json

{
  "inputs": {
    "input1": {
      "dims": [
        "1",
        "3",
        "32",
        "32"
      ],
      "dataType": 1,
      "rawData": "..."
    }
  }
}

다음은 admin 네임스페이스의 xgboost-iris InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=onnx-cifar10
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice onnx-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./cifar10-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/onnx-cifar10:predict HTTP/1.1
> Host: onnx-cifar10.admin.example.com
> User-Agent: curl/7.64.1
> Content-Type: application/json
> Accept: application/json
> Content-Length: 16533
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 145
< content-type: application/json
< date: Sat, 04 Apr 2020 18:19:07 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 10068
< x-ms-request-id: 3f6c43cf-586a-416e-a55a-d0b3ac35057e
< 
* Connection #0 to host localhost left intact
{"outputs":{"output1":{"dims":["1","10"],"dataType":1,"rawData":"ejmIv2h5BsAdRVQ/PswYQCSLZL/Midc/rBSTP2QqU79gRCi/8NJQPQ==","dataLocation":"DEFAULT"}}}

KFServing InferenceService 배포와 예측 – PyTorch

PyTorch를 사용하는 InferenceService

학습이 완료된 PyTorch 모델을 저장 한 경우, KFServing에서 제공하는 PyTorch 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • state_dict() 사용하여 모델을 저장해야 합니다.

모델 생성

PyTorch 서버를 테스트하려면 먼저 파이썬을 사용하여 간단한 PyTorch 모델을 생성해야 합니다.

추론을 위해 모델을 저장할 때, 학습이 완료된 모델의 학습 매개 변수만 저장하면 됩니다. torch.save()함수를 사용하여 모델의 state_dict 를 저장하면 나중에 모델을 복원 할 때 유연성이 가장 높아 지므로 모델 저장에 권장되는 방법입니다. 현재 KFServing은 PyTorch가 추론을 위해 권장하는 모델 저장 방법인 state_dict() 메소드를 사용하여 저장된 PyTorch 모델을 지원합니다.

PyTorch의 KFServing 인터페이스는 사용자가 PyTorch 모델과 동일한 위치에 model_class_file을 업로드 할 것으로 예상하고 선택적 model_class_name을 런타임 입력으로 전달하도록 허용합니다. 모델 클래스 이름을 지정하지 않으면 ‘PyTorchModel’을 기본 클래스 이름으로 사용합니다. 다른 인터페이스를 사용하여 저장된 PyTorch 모델을 지원하기 위해, 이 인터페이스가 발전함에 따라 현재 인터페이스가 변경 될 수 있습니다.

PyTorch의 데이터셋 중의 하나인 cifar10 데이터를 분류하는 모델을 작성해 보겠습니다.

cifar10.py

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                            download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=4,
                                              shuffle=True, num_workers=2)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                           download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=4,
                                             shuffle=False, num_workers=2)

    classes = ('plane', 'car', 'bird', 'cat',
               'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

    net = Net()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    for epoch in range(2):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 2000))
                running_loss = 0.0

    print('Finished Training')

    # Save model
    torch.save(net.state_dict(), "model.pt")

생성 된 모델을 사용하여 PyTorch 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

cifa10 이미지를 분류하는 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다. state_dict() 메소드를 사용하여 모델을 저장하게 되면, 학습된 파라미터만 저장이 됩니다. 그래서 모델이 정의된 파이썬 파일이 별도로 필요합니다. 모델이 정의된 파이썬 파일도 같이 저장소에 복사해 줍니다.

ipytorch_cifar10.py

import argparse
import os
import shutil

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/pytorch/iris', type=str)
    args = parser.parse_args()

    model_path = args.model_path
    if not (os.path.isdir(model_path)):
        os.makedirs(model_path)

    model_file = os.path.join(model_path, 'model.pt')

    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                            download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=4,
                                              shuffle=True, num_workers=2)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                           download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=4,
                                             shuffle=False, num_workers=2)

    classes = ('plane', 'car', 'bird', 'cat',
               'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

    net = Net()
    if torch.cuda.is_available():
        print('Use GPU')
        net = net.cuda()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    for epoch in range(2):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 2000))
                running_loss = 0.0

    print('Finished Training')

    # Save model
    torch.save(net.state_dict(), model_file)

    shutil.copy(os.path.abspath(__file__), os.path.join(model_path, __file__))

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, torchtorchvision 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install torch torchvision

RUN mkdir -p /app
ADD pytorch_cifar10.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'sklean-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "pytorch_cifar10.py", "--model_path", "/mnt/pv/models/pytorch/cifar10"]
output_map = {
    "Dockerfile": "Dockerfile",
    "pytorch_cifar10.py": "pytorch_cifar10.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="pytorch-cifar10", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

PyTorch을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 pytorch 을 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/pytorch/cifar10/ 이므로, pvc://kfserving-models-pvc/models/pytorch/cifar10/ 라고 지정해 줍니다.

앞서 학습 할때 사용한 모델 클래스의 이름이 Net 입니다. modelClassName 에 모델 클래스의 이름인 Net 를 지정해 줍니다. 만약 모델 클래스 이름을 별도로 지정해 주지 않으면, 기본값인 PyTorchModel 을 모델 클래스 이름으로 사용합니다.

pytorch.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "pytorch-cifar10"
spec:
  default:
    predictor:
      pytorch:
        storageUri: "pvc://kfserving-models-pvc/models/pytorch/cifar10/"
        modelClassName: "Net"

만약 GPU 리소스를 사용하려고 한다면, resource 필드에 GPU 를 할당해 주면됩니다.

pytorch_gpu.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "pytorch-cifar10-gpu"
spec:
  default:
    predictor:
      pytorch:
        storageUri: "pvc://kfserving-models-pvc/models/pytorch/cifar10/"
        modelClassName: "Net"
        resources:
          limits:
            cpu: 100m
            memory: 1Gi
            nvidia.com/gpu: "1"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f pytorch.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME              URL                                                                  READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
pytorch-cifar10   <http://pytorch-cifar10.admin.example.com/v1/models/pytorch-cifar10>   True    100                                59s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

cifar10-input.json

{
   "instances":[
      [
         [
            [
               0.23921573162078857,
               0.24705886840820312,
               0.29411768913269043,
               0.301960825920105,
               0.2549020051956177,
               0.22352945804595947,
               0.2705882787704468,
               0.24705886840820312,
               0.23921573162078857,
               0.24705886840820312,
               0.2627451419830322,
               0.2549020051956177,
               0.2627451419830322,
               0.301960825920105,
               0.32549023628234863,
               0.3333333730697632,
               0.30980396270751953,
               0.2705882787704468,
               0.2549020051956177,
               0.2549020051956177,
               0.22352945804595947,
               0.16862750053405762,
               0.17647063732147217,
               0.16078436374664307,
               0.16862750053405762,
               0.12156867980957031,
               0.09803926944732666,
               0.10588240623474121,
               0.12156867980957031,
               0.07450985908508301,
               -0.011764705181121826,
               -0.09019607305526733
            ]
....
         ]
      ]
   ]
}

다음은 admin 네임스페이스의 xgboost-iris InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=pytorch-cifar10
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice pytorch-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./cifar10-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/pytorch-cifar10:predict HTTP/1.1
> Host: pytorch-cifar10.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 110681
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 224
< content-type: text/html; charset=UTF-8
< date: Sat, 29 Mar 2020 13:36:23 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 8857
< 
* Connection #0 to host localhost left intact
{"predictions": [[-1.0642542839050293, -2.1011602878570557, 0.829179584980011, 2.3874664306640625, -0.8927481174468994, 1.6838929653167725, 1.1490685939788818, -0.8248656988143921, -0.6572933197021484, 0.05098217725753784]]}

KFServing InferenceService 배포와 예측 – XGBoost

XGBoost를 사용하는 InferenceService

학습이 완료된 XGBoost 모델을 저장 한 경우, KFServing에서 제공하는 XGBoost 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • 모델 피클의 이름은 model.bst 여야 합니다.
  • xgboost v0.82 버전을 사용해야 합니다.

모델 생성

XGBoost 서버를 테스트하려면 먼저 파이썬을 사용하여 간단한 XGBoost 모델을 생성해야 합니다.

Scikit-learn의 기본적인 데이터셋 중의 하나인 아이리스 꽃 데이터를 사용하여, 아이리스 꽃을 분류하는 모델을 작성해 보겠습니다. 모델 피클의 이름 model.bst 이어야 합니다.

import xgboost as xgb
from sklearn.datasets import load_iris

iris = load_iris()
X = iris['data']
y = iris['target']
dtrain = xgb.DMatrix(X, label=y)
param = {'max_depth': 6,
         'eta': 0.1,
         'silent': 1,
         'nthread': 4,
         'num_class': 10,
         'objective': 'multi:softmax'
         }
xgb_model = xgb.train(params=param, dtrain=dtrain)
xgb_model.save_model('model.bst')

생성 된 모델을 사용하여 XGBoost 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

아이리스 꽃을 분류하는 간단한 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

iris.py

import argparse
import os

from joblib import dump
from sklearn import datasets
from sklearn import svm


def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/sklearn/iris', type=str)
    args = parser.parse_args()

    if not (os.path.isdir(args.model_path)):
        os.makedirs(args.model_path)

    model_file = os.path.join(args.model_path, 'model.joblib')

    clf = svm.SVC(gamma='scale')
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    clf.fit(X, y)
    dump(clf, model_file)


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, xgboostscikit-learn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install xgboost==0.82 scikit-learn

RUN mkdir -p /app
ADD xgboost_iris.py.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'xgboost-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "iris.py", "--model_path", "/mnt/pv/models/xgboost/iris"]
output_map = {
    "Dockerfile": "Dockerfile",
    "xgboost_iris.py": "xgboost_iris.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="xgboost-iris", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='seldon-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

XGBoost을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 xgboost 을 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/xgboost/iris 이므로, pvc://kfserving-models-pvc/models/xgboost/iris 라고 지정해 줍니다.

xgboost.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "xgboost-iris"
spec:
  default:
    predictor:
      xgboost:
        storageUri: "pvc://kfserving-models-pvc/models/xgboost/iris"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f xgboost.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME           URL                                                            READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
xgboost-iris   <http://xgboost-iris.admin.example.com/v1/models/xgboost-iris>   True    100                                58s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

iris-input.json

{
  "instances": [
    [6.8,  2.8,  4.8,  1.4],
    [6.0,  3.4,  4.5,  1.6]
  ]
}

다음은 admin 네임스페이스의 xgboost-iris InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=xgboost-iris
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice xgboost-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./iris-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/xgboost-iris:predict HTTP/1.1
> Host: xgboost-iris.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 76
> Content-Type: application/x-www-form-urlencoded
> 
* upload completely sent off: 76 out of 76 bytes
< HTTP/1.1 200 OK
< content-length: 27
< content-type: text/html; charset=UTF-8
< date: Sat, 28 Mar 2020 18:54:46 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 10165
< 
* Connection #0 to host localhost left intact
{"predictions": [1.0, 1.0]}

KFServing InferenceService 배포와 예측 – Scikit-Learn

Scikit-Learn를 사용하는 InferenceService

학습이 완료된 scikit-learn모델을 피클로 저장 한 경우, KFServing에서 제공하는 sklearn 서버를 사용하여 간단히 배포 할 수 있습니다.

전제 조건

  • 모델 피클은 joblib을 사용하여 저장해야 합니다. 그리고 파일명은 model.joblib 이어야 합니다.
  • 현재 sklearn 0.20.3 버전을 사용합니다. 피클 모델은 이 버전과 호환되어야 합니다.

모델 생성

SKLearn 서버를 테스트하려면 먼저 파이썬을 사용하여 간단한 scikit-learn 모델을 생성해야합니다.

Scikit-learn의 기본적인 데이터셋 중의 하나인 아이리스 꽃 데이터를 사용하여, 아이리스 꽃을 분류하는 모델을 작성해 보겠습니다. 모델 피클은 joblib을 사용하여 저장해야 하고, 파일명은 model.joblib 이어야 합니다.

from joblib import dump
from sklearn import datasets
from sklearn import svm

clf = svm.SVC(gamma='scale')
iris = datasets.load_iris()
X, y = iris.data, iris.target
clf.fit(X, y)
dump(clf, 'model.joblib')

생성 된 모델을 사용하여 scikit-learn 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

아이리스 꽃을 분류하는 간단한 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

iris.py

import argparse
import os

from joblib import dump
from sklearn import datasets
from sklearn import svm


def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/sklearn/iris', type=str)
    args = parser.parse_args()

    if not (os.path.isdir(args.model_path)):
        os.makedirs(args.model_path)

    model_file = os.path.join(args.model_path, 'model.joblib')

    clf = svm.SVC(gamma='scale')
    iris = datasets.load_iris()
    X, y = iris.data, iris.target
    clf.fit(X, y)
    dump(clf, model_file)


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, scikit-learn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install scikit-learn==0.20.3 joblib

RUN mkdir -p /app
ADD iris.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'sklean-iris-job-{uuid.uuid4().hex[:4]}'

command=["python", "iris.py", "--model_path", "/mnt/pv/models/sklearn/iris"]
output_map = {
    "Dockerfile": "Dockerfile",
    "iris.py": "iris.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="sklean-iris", dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=False, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

SKLearn을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 sklearn 을 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/sklearn/iris 이므로, pvc://kfserving-models-pvc/models/sklearn/iris 라고 지정해 줍니다.

sklearn.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
spec:
  default:
    predictor:
      sklearn:
        storageUri: "pvc://kfserving-models-pvc/models/sklearn/iris"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f sklearn.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME           URL                                                            READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
sklearn-iris   <http://sklearn-iris.admin.example.com/v1/models/sklearn-iris>   True    100                                19s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

iris-input.json

{
  "instances": [
    [6.8,  2.8,  4.8,  1.4],
    [6.0,  3.4,  4.5,  1.6]
  ]
}

다음은 admin 네임스페이스의 sklearn-iris InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=sklearn-iris
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice sklearn-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./iris-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/sklearn-iris:predict HTTP/1.1
> Host: sklearn-iris.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 76
> Content-Type: application/x-www-form-urlencoded
> 
* upload completely sent off: 76 out of 76 bytes
< HTTP/1.1 200 OK
< content-length: 23
< content-type: text/html; charset=UTF-8
< date: Sat, 29 Mar 2020 15:20:23 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 9032
< 
* Connection #0 to host localhost left intact
{"predictions": [1, 1]}

KFServing InferenceService 배포와 예측

InferenceService를 사용하여 모델 서버를 제공하려면, 사용할 네임스페이스가 다음과 같은지 확인해야합니다.

  • [serving.kubeflow.org/inferenceservice=enabled](<http://serving.kubeflow.org/inferenceservice=enabled>) 레이블이 네임스페이스 추가 되어 있어야 합니다.
  • 쿠버네티스 클러스터의 Istio IngressGateway에 접근할 수 있어야 합니다.

레이블 추가

Kubeflow의 대시보드나 프로필 컨트롤러(Profile Controller)를 사용하여, 사용자 네임스페이스를 만드는 경우에는 KFServing에서 모델을 배포할 수 있도록 serving.kubeflow.org/inferenceservice: enabled 레이블이 자동으로 추가됩니다. 만약 네임스페이스를 직접 생성하는 경우에는 해당 네임스페이스에 serving.kubeflow.org/inferenceservice: enabled 레이블을 추가해야만 합니다.

다음은 my-namespace 라는 네임스페이스에 레이블을 추가하는 예제입니다.

kubectl label namespace my-namespace serving.kubeflow.org/inferenceservice=enabled

Istio IngressGateway에 접근하기

InferenceService 가 정상적으로 생성되면, istio의 ingressgateway 를 모델 서버에 접속할 수 있습니다. KFServing에서 사용하는 ingressgateway의 이름을 알려면, config-istio 라는 ConfigMap을 조회하면 됩니다.

다음은 knative-serving 네임스페이스에 있는 config-istio 을 조회하는 예제입니다.

kubectl -n knative-serving get cm config-istio -o yaml

정상적으로 조회 되면 다음과 같은 결과를 얻을 수 있습니다.

apiVersion: v1
data:
  gateway.knative-serving.knative-ingress-gateway: kfserving-ingressgateway.istio-system.svc.cluster.local
  local-gateway.knative-serving.cluster-local-gateway: cluster-local-gateway.istio-system.svc.cluster.local
  local-gateway.mesh: mesh
  reconcileExternalGateway: "false"
kind: ConfigMap
metadata:
  ...
  name: config-istio
  namespace: knative-serving

data 섹션의 gateway.knative-serving.knative-ingress-gateway 필드가 현재 KFServing에서 사용하는 ingressgateway 를 설정하는 부분입니다. 위의 예제에서는 kfserving-ingressgateway를 사용하고 있습니다.

kfserving-ingressgateway를 조회해 보겠습니다.

다음은 istio-system 네임스페이스에 있는 kfserving-ingressgateway을 조회하는 예제입니다.

kubectl -n istio-system get service kfserving-ingressgateway 

KFServing이 설치된 쿠버네티스 클러스터에 따라 결과가 다르게 나옵니다. 응답 결과에 따른 크게 세가지 방법으로 접근 할 수 있습니다.

  • LoadBalancer 를 통해서 접근하기
  • NodePort를 통해서 접근하기
  • kubectl port-forward를 통해서 접근하기

LoadBalancer

쿠버네티스 클러스터가 LoadBalancer 를 지원하면 다음과 같은 결과를 얻을 수 있습니다. 서비스의 타입이 LoadBalancer 이고, EXTERNAL-IP 에 IP가 할당되어 있습니다. 이럴 경우에는 EXTERNAL-IP 를 통해서 ingressgateway에 접근할 수 있습니다.

NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
kfserving-ingressgateway   LoadBalancer   10.101.141.37   10.201.121.4  15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   2d23h

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다. EXTERNAL-IP 주소를 사용합니다.

CLUSTER_IP=10.201.121.4

NodePort

쿠버네티스 클러스터가 LoadBalancer 를 지원하지 않거나, 서비스의 타입이 NodePort 인 경우 EXTERNAL-IP 의 값이 비어 있습니다. 이럴 경우에는 클러스터의 노드 IP 와 NodePort를 통해서 접근할 수 있습니다.

NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
kfserving-ingressgateway   LoadBalancer   10.101.141.37   <pending>     15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   2d23h

노드 IP는 노드를 조회하면 알 수 있습니다.

다음은 노드를 조회 하는 예제입니다.

kubectl get node -o wide

정상적으로 조회되면 다음과 같은 응답 결과가 나옵니다.

NAME     STATUS   ROLES    AGE   VERSION    INTERNAL-IP     EXTERNAL-IP   OS-IMAGE             KERNEL-VERSION      CONTAINER-RUNTIME
mortar   Ready    master   13d   v1.15.10   192.168.21.38   <none>        Ubuntu 18.04.3 LTS   4.15.0-91-generic   docker://18.9.9

노드가 한 개가 아닌 경우에는 여러개의 노드 정보가 출력됩니다. 해당 노드들 중에서 아무 노드의 INTERNAL-IP 를 사용하면 됩니다.

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다. 노드의 IP 와 80 PORT(80:32380/TCP)의 노드 포트인 32380을 포트로 사용합니다.

CLUSTER_IP=192.168.21.38:32380

port-forward

외부에서 쿠버네티스 클러스터의 서비스에 접근할 수 없는 경우, kubectl 의 port-forward를 사용할 수 있습니다. 접근 하려는 외부 시스템에서 다음 명령어 실행하면 로컬 포트를 경유 해서 쿠버네티스 서비스에 접근할 수 있습니다.

kubectl -n istio-system port-forward svc/kfserving-ingressgateway 8080:80

포트 포워딩이 정상적으로 실행되면, 로컬포트로 ingressgateay 서비스로 접근할 수 있습니다. http://localhost:8080 처럼 선언한 로컬 포트의 주소로 접근하면, 쿠버네티스 ingressgateway 의 80 포트로 포워딩 됩니다.

앞으로 만들 예제에서 사용하기 위해서 ingressgateway 의 접근 주소를 다음과 같이 정의하겠습니다.

CLUSTER_IP=localhost:8080

PVC 생성하기

InferenceService 에 사용할 모델은 PVC에 저장하겠습니다. 만약 클라우드 스토리지와 같은 다른 저장소를 사용하려면, “클라우드 저장소를 이용하여 InfeerneceService 배포와 예측”을 참조하시기 바랍니다.

kfserving-models-pvc라는 PVC 매니페스트를 작성합니다.

kfserving-models-pvc.yaml

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: kfserving-models-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

다음 명령어를 실행하여, admin 네임스페이스에 kfserving-models-pvc라는 PVC를 생성하겠습니다.

kubectl -n admin apply kfserving-models-pvc.yaml

KFServing 설정

KFServing 에서 사용하는 여러 설정 정보들은 inferenceservice-config 라는 쿠버네티스 ConfigMap에 정의되어 있습니다.

이 설정 정보에는 다음과 같은 내용이 정의되어 있습니다.

  • credentials : S3나 GCS를 사용할 때 참조할 값들.
  • explainers : explainer를 실행할 때 사용할 컨테이너의 이미지 정보.
  • ingress : KFServing에서 사용할 Istio ingress 정보.
  • predictors : predictor를 실행할 때 사용할 컨테이너 이미지의 정보.

다음 명령어를 실행하면, 설정 정보를 조회할 수 있습니다.

kubectl -n kubeflow get cm inferenceservice-config -o yaml

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

apiVersion: v1
data:
  credentials: |-
    {
       "gcs": {
           "gcsCredentialFileName": "gcloud-application-credentials.json"
       },
       "s3": {
           "s3AccessKeyIDName": "awsAccessKeyID",
           "s3SecretAccessKeyName": "awsSecretAccessKey"
       }
    }
  explainers: |-
    {
        "alibi": {
            "image" : "gcr.io/kfserving/alibi-explainer",
            "defaultImageVersion": "0.2.2",
            "allowedImageVersions": [
               "0.2.2"
            ]
        }
    }
  ingress: |-
    {
        "ingressGateway" : "knative-ingress-gateway.knative-serving",
        "ingressService" : "kfserving-ingressgateway.istio-system.svc.cluster.local"
    }
  logger: |-
    {
        "image" : "gcr.io/kfserving/logger:0.2.2",
        "memoryRequest": "100Mi",
        "memoryLimit": "1Gi",
        "cpuRequest": "100m",
        "cpuLimit": "1"
    }
  predictors: |-
    {
        "tensorflow": {
            "image": "tensorflow/serving",
            "defaultImageVersion": "1.14.0",
            "defaultGpuImageVersion": "1.14.0-gpu",
            "allowedImageVersions": [
               "1.11.0",
               "1.11.0-gpu",
               "1.12.0",
               "1.12.0-gpu",
               "1.13.0",
               "1.13.0-gpu",
               "1.14.0",
               "1.14.0-gpu"
            ]
        },
        "onnx": {
            "image": "mcr.microsoft.com/onnxruntime/server",
            "defaultImageVersion": "v0.5.1",
            "allowedImageVersions": [
               "v0.5.1"
            ]
        },
        "sklearn": {
            "image": "gcr.io/kfserving/sklearnserver",
            "defaultImageVersion": "0.2.2",
            "allowedImageVersions": [
               "0.2.2"
            ]
        },
        "xgboost": {
            "image": "gcr.io/kfserving/xgbserver",
            "defaultImageVersion": "0.2.2",
            "allowedImageVersions": [
               "0.2.2"
            ]
        },
        "pytorch": {
            "image": "gcr.io/kfserving/pytorchserver",
            "defaultImageVersion": "0.2.2",
            "allowedImageVersions": [
               "0.2.2"
            ]
        },
        "tensorrt": {
            "image": "nvcr.io/nvidia/tensorrtserver",
            "defaultImageVersion": "19.05-py3",
            "allowedImageVersions": [
               "19.05-py3"
            ]
        }
    }
  storageInitializer: |-
    {
        "image" : "gcr.io/kfserving/storage-initializer:0.2.2",
        "memoryRequest": "100Mi",
        "memoryLimit": "1Gi",
        "cpuRequest": "100m",
        "cpuLimit": "1"
    }
  transformers: |-
    {
    }
kind: ConfigMap
metadata:
...
  name: inferenceservice-config
  namespace: kubeflow

Kubeflow – KFServing 설치

KFServing는 Kubeflow의 구성 요소로 포함되어 있습니다. 별도로 설치가 필요 없이 사용할 수 있습니다. 물론 Kubeflow 없이 독립적으로 설치해서 사용할 수도 있습니다.

전제 조건

KFServing을 사용하려면, 쿠버네티스 클러스터에 Knative Serving 및 Istio가 설치되어 있어야 합니다. Knative는 Istio Ingress Gateway를 사용하여 요청을 Knative 서비스로 라우팅합니다. Kubeflow 및 KFServing 팀이 테스트 한 정확한 버전을 사용하려면 개발자 안내서의 전제 조건을 참조하십시오

Knative를 빠르게 실행하거나 서비스 메시가 필요하지 않은 경우, 서비스 메시(sidecar injection 없이 Istio를 설치하는 것이 좋습니다.

현재는 Knative Serving 만 필요합니다. cluster-local-gateway 는 클러스터 내부 트래픽을 위한 통로로 사용합니다. cluster-local-gateway를 설치하려면 여기의 지침을 따르십시오

KFServing 웹훅 인증서를 제공합니다.

KFServing 설치

Kubeflow와 함께 KFServing 설치

KFServing 은 Kubeflow를 설치할때 기본적으로 설치됩니다. Kubeflow 매니페스트에 KFServing을 설치하는 부분이 포함되어 있습니다. Kubeflow와 함께 설치되는 KFServing의 경우 KFServing 컨트롤러는 kubeflow  네임스페이스에 배포됩니다. Kubeflow의 쿠버네티스 최소 요주 버전이 1.14이므로 개체 선택기(object selector)를 지원하지 않을 수 있습니다. 그래서 Kubeflow 설치시 ENABLE_WEBHOOK_NAMESPACE_SELECTOR 가 기본적으로 활성화 되어 있어야합니다.

Kubeflow의 대시보드나 프로필 컨트롤러(Profile Controller)를 사용하여, 사용자 네임스페이스를 만드는 경우에는 KFServing에서 모델을 배포할 수 있도록 serving.kubeflow.org/inferenceservice: enabled 레이블이 자동으로 추가됩니다. 만약 네임스페이스를 직접 생성하는 경우에는 해당 네임스페이스에 serving.kubeflow.org/inferenceservice: enabled 레이블을 추가해야만, KFServing의  InferenceService 를 사용할 수 있습니다.

독립형 KFServing 설치

쿠버네티스 클러스터에 KFServing을 독립적으로 설치하면, 우선 위의 전제 조건을 충족시켜야 합니다. 전제 조건이 충족되면 다음 명령어를 사용하여 KFServing을 설치할 수 있습니다. 다음 명령어는 GitHub 리포지토리의 yaml 파일을 사용하여 KFServing 0.3.0을 설치합니다.

TAG=v0.3.0
CONFIG_URI=https://raw.githubusercontent.com/kubeflow/kfserving/master/install/$TAG/kfserving.yaml

kubectl apply -f ${CONFIG_URI}

KFServing을 독립형으로 설치했을 경우에는 KFServing 컨트롤러는 kfserving-system 네임스페이스에 배포됩니다.

KFServing은 pod mutator와 mutating admission webhooks 을 사용하여 KFServing의 스토리지 이니셜라이저(storage initializer) 컴포넌트를 주입합니다. 기본적으론 네임스페이스에 control-plane 레이블이 지정되어 있지 않으면, 해당 네임스페이스의 포드들은 pod mutator를 통과합니다. 그렇기 때문에 KFServing의 pod mutator의 웹훅이 필요 없는 포드가 실행될때 문제가 발생할 수 있습니다.

쿠버네티스 1.14 사용자의 경우 serving.kubeflow.org/inferenceservice: enabled 레이블이 추가된 네임스페이스의 포드에 ENABLE_WEBHOOK_NAMESPACE_SELECTOR 환경변수를 추가하여, KFServing pod mutator를 통과하도록 하는게 좋습니다.

env:
- name: ENABLE_WEBHOOK_NAMESPACE_SELECTOR
  value: enabled

쿠버네티스 1.15+ 사용자의 경우 KFServing InferenceService 포드만 pod mutator 를 통과 할 수 있도록 객체 선택기(object selector)를 켜는 것이 좋습니다.

kubectl patch mutatingwebhookconfiguration inferenceservice.serving.kubeflow.org --patch '{"webhooks":[{"name": "inferenceservice.kfserving-webhook-server.pod-mutator","objectSelector":{"matchExpressions":[{"key":"serving.kubeflow.org/inferenceservice", "operator": "Exists"}]}}]}'

Kubeflow – KFServing 개요

KFServing은 쿠버네티스에서 서버리스 추론을 가능하게 하며, TensorFlow, XGBoost, scikit-learn, PyTorch 및 ONNX와 같은 일반적인 머신러닝 프레임워크를 위한 고성능의 추상화 인터페이스를 제공합니다. 그래서 프로덕션에서 다양한 프레임워크의 모델을 서빙하기에 적합합니다.

KFServing을 이용하여 모델을 서빙하려면, InferenceService 라는 쿠버네티스 사용자 리소스를 생성하면 됩니다.

KFServing의 장점은 다음과 같습니다.

  • 다양한 머신 러닝 프레임워크 제공하기 위한, 쿠버네티스 사용자 리소스의 추상화가 잘 되어 있습니다. 그래서 쉽고 간편한게 추론 서비스를 생성할 수 있습니다.
  • 자동 확장, 네트워킹, 상태 확인 및 서버 구성의 복잡성을 캡슐화하여 GPU 자동 확장 및 카나리아 롤아웃과 같은 최첨단 서비스 기능을 머신러닝 배포에 사용할 수 있습니다.
  • 기본적으로 예측, 전처리, 후처리 및 설명 기능을 제공하여 프로덕션 머신러닝 추론 서버에 대해 간단하고 플러그 가능하며 완벽한 제품을 만들 수 있습니다.

KFServing는 Kubeflow의 함께 설치됩니다. 그래서 별도의 설치 없이 사용할 수 있습니다. 물론 Kubeflow 없이 독립적으로 설치해서 사용할 수도 있습니다.

모델 서버

KFServing 은 다음과 같은 머신러닝 프레임워크를 지원하는 모델 서버를 제공하고 있습니다.

  • Tensorflow
  • NVIDIA Triton Inference Server
  • PyTorch
  • XGBoost
  • scikit-learn
  • ONNX

이러한 머신러닝 프레임워크를 사용하여 모델을 저장한 경우에는 Google 버킷, S3 버킷, Azure 또는 minio에 저장된 모델의 위치만 있으면 쉽게 추론 서비스를 생성할 수 있습니다.

다음은 scikit-learn과 tensorflow의 매니페스트 예제입니다.

scikit-learn

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "sklearn-iris"
spec:
  default:
    predictor:
      sklearn:
        storageUri: "gs://kfserving-samples/models/sklearn/iris"

pytorch

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "pytorch-cifar10"
spec:
  default:
    predictor:
      pytorch:
        storageUri: "gs://kfserving-samples/models/pytorch/cifar10/"
        modelClassName: "Net"

tensorflow

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "flowers-sample"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "gs://kfserving-samples/models/tensorflow/flowers"

storageUri는 학습한 모델의 저장 경로입니다.

모델 저장소

storageUri에서 사용할 수 있는, 지원하는 스토리는 다음과 같습니다.

  • Google Cloud Storage : 접두사가 “gs://” 로 시작합니다.
    • 기본적으로 사용자 인증에 GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 사용합니다.
    • GOOGLE_APPLICATION_CREDENTIALS 가 제공되지 않으면, 익명 클라이언트가 아티팩트를 다운로드합니다.
  • S3 Compatible Object Storage : 접두사가 “s3://” 로 시작합니다.
    • 기본적으로 사용자 인증을 위해 S3_ENDPOINT, AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 환경 변수를 사용합니다.
  • Azure Blob Storage : “https://{$STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{$CONTAINER}/{$PATH}”
    • 기본적으로 익명 클라이언트를 사용하여 아티팩트를 다운로드합니다.
  • 로컬 파일 시스템 : 접두사가 없거나 접두사가 “file://” 로 시작합니다.
    • 절대 경로: /absolute/path or file:///absolute/path
    • 상대 경로 : relative/path or file://relative/path
    • 로컬 파일 시스템의 경우 접두사 없이 상대 경로를 사용하는 것이 좋습니다.
  • Persistent Volume Claim (PVC) : 접두사가 “pvc://” 로 시작합니다.
    • 경로 형태는 “pvc://{$pvcname}/[path]” 입니다.
    • pvcname은 모델을 저장하고 있는 PVC의 이름입니다.
    • [path]“는 PVC의 모델에 대한 상대 경로입니다.
    • For e.g. pvc://mypvcname/model/path/on/pvc

KFServing 스택

다음은 KFServing 스택을 나타내는 그림입니다.

출처 : kfsrving

KFServing 은 쿠버네티스 위에서 동작합니다. 그리고 istio와 Knative를 사용하고 있습니다.

Istio

서비스를 연결(Connect), 보안(secure), 제어(control) 그리고 관찰(observe) 하기 위한 서비스 메쉬(Service Mesh) 플랫폼입니다.

  • Connect : 서비스 간의 트래픽 및 API 호출 흐름을 지능적으로 제어하고 다양한 테스트를 수행하며 Red/Black 배포를 제공합니다.
  • Secure : 관리 인증, 권한 부여 및 서비스 사이의 통신 암호화를 통해 서비스를 자동으로 보호합니다.
  • Control : 정책을 적용하고 시행해서 자원이 소비자에게 공정하게 분배되도록 합니다.
  • Observe : 모든 서비스의 트레이싱, 모니터링 및 로깅을 관찰하여 발생하는 상황을 확인합니다.

Knative

Knative는 선언적인 컨테이너 기반 애플리케이션을 구축하는데 필수적인 미들웨어 구성 요소 세트를 제공합니다 Knative는 서빙(Serving) 컴포넌트과 이벤트(Eventing) 컴포넌트로 구성되어 있습니다.

Knative Eventing은 클라우드 네이티브 개발에 대한 일반적인 요구를 해결하도록 설계된 시스템이며 바인딩 가능한 이벤트 소스 및 이벤트 소비자를 가능하게하는 구성 요소를 제공합니다.

Knative Serving은 애플리케이션이나 함수들을 서버리스 컨테이너를 사용하여 배포와 서빙 할 수 있는 기능을 지원합니다. Knative Serving은 사용 하기 쉽고, 여러 고급 시나리오를 지원하도록 확장할 수 있습니다.

Knative Serving 프로젝트는 다음을 가능하게 하는 미들웨어 기본 요소를 제공합니다.

  • 서버리스 컨테이너의 빠른 배포
  • 자동 스케일링
  • Istio 구성 요소의 라우팅 및 네트워크 프로그래밍
  • 배포 된 코드 및 구성의 특정 시점 스냅 샷

KFServing

KFServing은 모델의 호스팅 측면을 관리합니다. KFServing 은 추론을 서비스하기위 해서, 쿠버네티스의 사용자 리소스인 InferenceService 를 제공하고 있습니다. InferenceService 를 생성하게 되면, 모델 서버가 실행되어 추론 요청을 처리할 수 있습니다.

  • InferenceService : 모델의 생명주기를 관리합니다
  • Configuration : 모델 배포 기록을 관리합니다. 기본(Default) 및 카나리아(Canary)의 두 가지 구성이 존재합니다.
  • Revision : 모델 버전의 스냅샷 입니다. 설정 정보와 이미지 정보를 가지고 있습니다.
  • Route : 네트워크 트래픽을 관리 하는 엔드 포인트 입니다.


KFServing Data Plane

InferenceService 의 데이터 플레인은 Predictor, Explainer, Transformer 로 구성되어 있습니다. 이중에서 실제 예측을 수행하는 모델 서버인 Predictor가 핵심 컴포넌트 입니다. 그리고 모델을 안전하게 변경할 수 있도록, Default”와 “Canary” 엔드포인트를 가지고 있습니다.

다음은 InferenceService의 데이터 플레인을 그래프로 나타낸것입니다.

출처 : kfserving

구조

Endpoint: “Default”와 “Canary” 엔드포인트를 가집니다. 이 엔드포인트 덕분에 사용자는 카나리아 배포 전략을 사용해서 모델을 안전한게 변경 할 수 있습니다. 구 버전의 모델 서버와 새 버전의 모델 서버들을 구성하고 일부 트래픽을 새 버전으로 분산하여 오류 여부를 판단합니다. 분산 후 결과에 따라 새 버전이 운영 환경을 대체할 수도 있고, 다시 구 버전으로 돌아갈 수도 있습니다.

Component: 각 엔드 포인트는 “예측자(predictor)”, “설명자(explainer)”및 “변환기(transformer)”와 같은 여러 컴포넌트로 구성됩니다. 꼭 필요한 컴포넌트는 시스템의 핵심인 “예측자(predictor)”입니다. KFServing은 Outlier Detection과 같은 사용 사례를 지원하기 위해 지원되는 컴포넌트의 수를 늘릴 계획을 가지고 있습니다.

  • Predictor: Predictor는 InferenceService의 핵심 컴포넌트입니다. 네트워크 엔드포인트에서 사용 가능하게하는 모델 및 모델 서버입니다.
  • Explainer: Explainer는 선택 가능한 컴포넌트로서 모델이 어떻게 예측을 했는지에 대한 설명을 제공합니다. 사용자는 자신들이 가지고 있는 자체 설명 컨테이너를 정의할 수 있습니다. 일반적인 사용 사례의 경우 KFServing은 Alibi와 같은 기본 Explainer를 제공합니다.
  • Transformer: Transformer는 예측 및 설명 워크 플로우 전에 사전 및 사후 처리 단계를 정의 할 수 있는 컴포넌트 입니다. Explainer 과 마찬가지로 관련 환경 변수로 구성됩니다. 일반적인 사용 사례의 경우 KFServing은 Feast와 같은 기본 Transformer를 제공합니다.

Data Plane (V1)

KFServing에는 제공하는 모든 모델 서버에는 표준화 된 API를 지원합니다.

데이터 플레인 프로토콜의 V1 에서는 다음과 같은 HTTP/REST API를 제공하고 있습니다.

APIVERBPATH
ListGET/v1/models
ReadGET/v1/models/
PredictPOST/v1/models/:predict
ExplainPOST/v1/models/:explain

Predict

모든 InferenceServices는 Tensorflow V1 HTTP API (https://www.tensorflow.org/tfx/serving/api_rest#predict_api)와 호환되는 API를 사용합니다.

URL

POST <http://host>:port/v1/models/${MODEL_NAME}:predict

Request format

예측 API의 요청 본문은 다음과 같은 형식의 JSON 객체여야 합니다.

{
  "instances": <value>|<(nested)list>|<list-of-objects>
}

예측을 요청할 데이터의 값은 JSON 객체의 instances 필드에 입력합니다.

{
  // List of 3 scalar tensors.
  "instances": [ "foo", "bar", "baz" ]
}

{
  // List of 2 tensors each of [1, 2] shape
  "instances": [ [[1, 2]], [[3, 4]] ]
}

Response format

예측을 요청하면, 예측 결과를 응답합니다. 응답 본문에는 JSON 객체가 포함되어 있습니다. 행 형식의 요청에는 다음과 같은 형식의 응답이 있습니다.

{
  "predictions": <value>|<(nested)list>|<list-of-objects>
}

Explain

Explainer와 함께 배치 된 모든 InferenceService는 표준화 된 설명 API를 지원합니다. 이 인터페이스는 “: explain”동사가 추가 된 Tensorflow V1 HTTP API와 동일합니다.

URL

POST <http://host>:port/v1/models/${MODEL_NAME}:explain

Request format

예측 API의 요청 본문은 다음과 같은 형식의 JSON 객체여야 합니다.

{
  "instances": <value>|<(nested)list>|<list-of-objects>
}

예측을 요청할 데이터의 값은 JSON 객체의 instances 필드에 입력합니다.

{
  // List of 3 scalar tensors.
  "instances": [ "foo", "bar", "baz" ]
}

{
  // List of 2 tensors each of [1, 2] shape
  "instances": [ [[1, 2]], [[3, 4]] ]
}

Response format

예측을 요청하면, 예측 결과를 응답합니다. 응답 본문에는 JSON 객체가 포함되어 있습니다. 행 형식의 요청에는 다음과 같은 형식의 응답이 있습니다.

{
  "predictions": <value>|<(nested)list>|<list-of-objects>,
	"explanations": <value>|<(nested)list>|<list-of-objects>
}

Data Plane (V2)

데이터 플레인 프로토콜의 두 번째 버전은 V1 데이터 플레인 프로토콜에서 발견 된 여러 가지 문제를 해결하기 위해서 만들어지고 있습니다. 여기에는 수많은 모델 프레임워크의 일반화와 서버 성능 문제등을 포함하고 있습니다.

Predict

V2 프로토콜은 HTTP/REST 및 GRPC API를 모두 제안하고 있습니다. 자세한 내용은 전체 제안서를 참조하십시오.

Kubeflow Pipelines – 경량 파이썬 컴포넌트에서의 시각화 사용하기

경량 컴포넌트에서 시각화를 사용할 경우 구문이 약간 다릅니다.

텐서플로우의 mnist 이미지 식별 모델 학습을 이용하여 파이프라인 메트릭과 출력 뷰어의 한 종류인 텐서보드를 사용해 보도록 하겠습니다.

파이썬 함수를 시각화를 할 경우 typing.NamedTuple 타입 힌트를 사용하여 메타데이터와 메트릭을 선언하여야합니다.

def train(tb_log_dir: str) -> NamedTuple('Outputs', [('mlpipeline_ui_metadata', 'ui_metadata'),
                                                     ('mlpipeline_metrics', 'metrics')]):

그리고 함수의 마지막 부분에서 메타데이터와 메트릭의 값을 JSON 형식으로 변환하여 반환해야 합니다.

    from collections import namedtuple
    outputs = namedtuple('Outputs', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return outputs(json.dumps(metadata), json.dumps(metrics))

다음은 경량 컴포넌트에서 시각화를 사용할 전체 코드 입니다.

tensorboard_pipeline.py

from typing import NamedTuple

import kfp
from kubernetes.client.models import V1EnvVar


def train(tb_log_dir: str) -> NamedTuple('Outputs', [('mlpipeline_ui_metadata', 'ui_metadata'),
                                                     ('mlpipeline_metrics', 'metrics')]):
    import tensorflow as tf
    import json

    print("TensorFlow version: ", tf.__version__)

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    model.summary()

    callbacks = [tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir)]

    print("Training...")
    model.fit(x_train, y_train, epochs=5, validation_split=0.2, callbacks=callbacks)

    score = model.evaluate(x_test, y_test, batch_size=128)
    print('Test accuracy: ', score[1])

    metadata = {
        'outputs': [{
            'type': 'tensorboard',
            'source': tb_log_dir,
        }]
    }

    loss = score[0]
    accuracy = score[1]
    metrics = {
        'metrics': [{
            'name': 'accuracy',
            'numberValue': float(accuracy),
            'format': "PERCENTAGE",
        }, {
            'name': 'loss',
            'numberValue': float(loss),
            'format': "RAW",
        }]
    }

    from collections import namedtuple
    outputs = namedtuple('Outputs', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return outputs(json.dumps(metadata), json.dumps(metrics))


train_op = kfp.components.func_to_container_op(train, base_image='tensorflow/tensorflow:2.1.0-py3')


def lightweight_tensorboard_pipeline(tb_log_dir):
    s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000'
    minio_endpoint = "http://" + s3_endpoint
    minio_username = "minio"
    minio_key = "minio123"
    minio_region = "us-east-1"

    train_op(tb_log_dir).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID', value=minio_username)) \\
        .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY', value=minio_key)) \\
        .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\
        .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\
        .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))


if __name__ == '__main__':
    arguments = {'tb_log_dir': 's3://tensorboard/lightweight'}
    my_run = kfp.Client().create_run_from_pipeline_func(lightweight_tensorboard_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 Run Output 화면입니다.

Kubeflow Pipelines – 파이프라인에서 외부 저장소를 이용하기

파이프라인 컴포넌트에서 PVC 사용하기

파이프라인 컴포넌트에서 모델을 학습하고 PV에 저장하려면, PVC를 사용하면 됩니다. PV이 마운트 되면, 내부 파일시스템처럼 접근하여 데이터를 저장하고 읽을 수 있습니다.

PVC 를 생성하기

PV를 사용하기 위하여, 쿠버네티스 리소스인 PVC 를 생성해 보겠습니다.

다음은 100Mi의 용량을 가지는 PersistentVolumeClaim 매니페스트입니다.

kfp-pvc.yaml

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: kfp-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Mi

kubectl 을 사용하여 kubeflow 네임스페이스 PVC 를 생성하겠습니다.

kubectl -n kubeflow apply -f kfp-pvc.yaml

텐서플로우 mnist 모델을 학습하고 S3에 저장하기

mnist 모델을 S3에 저장하겠습니다. 코드는 앞서 사용한 Tensorflow 코드와 동일하기 때문에 자세한 설명은 생략하겠습니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='./model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-mnist-storage:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-mnist-storage:0.0.1

컴포넌트 작성

Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 PV에 접근할 수 있도록, PVC를 마운트 합니다.

pvc_name = "kfp-pvc"
    volume_name = 'pipeline'
    volume_mount_path = '/mnt/pipeline'

    dsl.ContainerOp(
        name='mnist_pvc',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', '/mnt/pipeline/kfp/mnist/model']
    ).add_volume(V1Volume(name=volume_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))) \\
        .add_volume_mount(V1VolumeMount(mount_path=volume_mount_path, name=volume_name))

파이프라인 작성

Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl

from kubernetes.client.models import V1PersistentVolumeClaimVolumeSource, \\
    V1Volume, V1VolumeMount


def pipeline_pvc():
    pvc_name = "kfp-pvc"
    volume_name = 'pipeline'
    volume_mount_path = '/mnt/pipeline'

    dsl.ContainerOp(
        name='mnist_pvc',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', '/mnt/pipeline/kfp/mnist/model']
    ).add_volume(V1Volume(name=volume_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))) \\
        .add_volume_mount(V1VolumeMount(mount_path=volume_mount_path, name=volume_name))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_pvc, arguments={},
                                                        experiment_name='Sample Experiment')

파이프라인을 실행하면 학습된 모델이 지정한 위치에 저장됩니다.

onprem.mount_pvc() 사용하기

앞 예제에서는 PVC를 직접 마운트해 주었습니다. kfp 에서 제공하는 onprem.mount_pvc() 메소드를 사용하면 보다 간단히 사용할 수 있습니다.

다음은 onprem.mount_pvc() 를 사용하는 컴포넌트 예제입니다.

import kfp
from kfp import dsl
from kfp import onprem


def pipeline_pmount_pvc():
    pvc_name = "kfp-pvc"
    volume_name = 'pipeline'
    volume_mount_path = '/mnt/pipeline'

    dsl.ContainerOp(
        name='mnist_mount_pvc',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', '/mnt/pipeline/kfp/mnist/model']
    ).apply(onprem.mount_pvc(pvc_name, volume_name=volume_name, volume_mount_path=volume_mount_path))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_pmount_pvc, arguments={},
                                                        experiment_name='Sample Experiment')

파이프라인 컴포넌트에서 S3 사용하기

파이프라인 컴포넌트에서 모델을 학습하고 S3에 저장하려면, S3에 접속할 수 있는 정보가 필요합니다. 환경 변수를 이용하여 접속 정보를 컴포넌트에 전달할 수 있습니다.

S3 Secret 을 생성하기

S3에 접속하여 데이터를 저장하거나 가져오려면 접속 정보가 필요합니다. 중요한 정보인 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 는 쿠버네티스트 Secret 리소스에 저장하겠습니다.

다음은 Kubeflow에서 제공하는 minio를 s3 저장소로 사용하는 Secret 매니페스트 입니다. data 섹션에 있는 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY필드의 값은 BASE64 로 인코딩 된 값을 사용해야합니다.

export AWS_ACCESS_KEY_ID=minio
export AWS_SECRET_ACCESS_KEY=minio123

kubectl -n kubeflow create secret generic kfp-aws-secret \\
    --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\
    --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}

생성한 kfp-aws-secret 을 조회해 보겠습니다.

kubectl -n kubeflow get secret kfp-aws-secret

정상적으로 조회되면, 다음과 같은 결과를 확인할 수 있습니다.

apiVersion: v1
kind: Secret
metadata:
  name: kfp-aws-secret
type: Opaque
data:
  AWS_ACCESS_KEY_ID: bWluaW8=
  AWS_SECRET_ACCESS_KEY: bWluaW8xMjM=

텐서플로우 mnist 모델을 학습하고 S3에 저장하기

mnist 모델을 S3에 저장하겠습니다. 코드와 컨테이너 이미지는 앞서 사용한 PVC 저장 코드와 동일하기 때문에, “파이프라인 작성” 단계로 건너 뛰어도 됩니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='./model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-mnist-storage:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-mnist-storage:0.0.1

컴포넌트 작성

Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 s3에 접근할 수 있도록, s3 설정 정보를 환경 변수로 넘겨 주었습니다.

		secret_name = "kfp-aws-secret"

    s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000'
    minio_endpoint = "http://" + s3_endpoint
    minio_region = "us-east-1"

    dsl.ContainerOp(
        name='tensorboard',
        image='kangwoo/kfp-mnist-s3:0.0.1',
        arguments=['--model', 's3://tensorflow/kfp/mnist/model']
    ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID',
                                   value_from=V1EnvVarSource(
                                       secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_ACCESS_KEY_ID')))) \\
        .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY',
                                   value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret_name,
                                                                                                key='AWS_SECRET_ACCESS_KEY')))) \\
        .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\
        .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\
        .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))

파이프라인 작성

Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl

from kubernetes.client.models import V1EnvVar, V1EnvVarSource, V1SecretKeySelector


def pipeline_s3():
    secret_name = "kfp-aws-secret"

    s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000'
    minio_endpoint = "http://" + s3_endpoint
    minio_region = "us-east-1"

    dsl.ContainerOp(
        name='mnist-s3',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', 's3://tensorflow/kfp/mnist/model']
    ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID',
                                   value_from=V1EnvVarSource(
                                       secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_ACCESS_KEY_ID')))) \\
        .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY',
                                   value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret_name,
                                                                                                key='AWS_SECRET_ACCESS_KEY')))) \\
        .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\
        .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\
        .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_s3, arguments={},
                                                        experiment_name='Sample Experiment')

파이프라인을 실행하면 학습된 모델이 지정한 위치에 저장됩니다.

aws.use_aws_secret() 사용하기

앞 예제에서는 환경 변수에 직접 값들을 설정해 주었습니다. kfp 에서 제공하는 aws.use_aws_secret() 메소드를 사용하면 보다 간단히 인증 정보를 설정할 수 있습니다. 하지만 현재 버전에서는 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 값 만을 변경할 수 있기 때문에 aws 에서 제공하는 s3만 사용할 수 있습니다. 다시 말해서 minio는 사용할 수 없습니다.

다음은 aws.use_aws_secret() 를 사용하는 컴포넌트 예제입니다.

import kfp
from kfp import aws
from kfp import dsl


def pipeline_use_aws_secret():
    secret_name = "kfp-aws-secret"

    dsl.ContainerOp(
        name='mnist_use_aws_secret',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', 's3://tensorflow/kfp/mnist/model']
    ).apply(aws.use_aws_secret(secret_name,
                               aws_access_key_id_name='AWS_ACCESS_KEY_ID',
                               aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_use_aws_secret, arguments={},
                                                        experiment_name='Sample Experiment')

파이프라인 컴포넌트에서 GCS 사용하기

파이프라인 컴포넌트에서 모델을 학습하고 GCS에 저장하려면, GCS에 접속할 수 있는 정보가 필요합니다. 환경 변수를 이용하여 접속 정보를 컴포넌트에 전달할 수 있습니다.

GCS Secret 을 생성하고 서비스 계정에 추가하기

컴포넌트에서 GCS에 접속하여 데이터를 저장하거나 가져오려면 접속 정보가 필요합니다.

구글 클라우드의 JSON 서비스 계정 키를 다운로드 합니다. 이 서비스 계정은 사용할 GCS에 접근할 권한이 부여되어 있어야합니다.

다음은 서비스 계정 키를 gcp-sa-credentials.json 파일로 다운로드 하는 명령어 입니다. [SA-NAME] 에는 서비스 계정의 아이디를, [PROJECT-ID] 에는 프로젝트 아이디를 입력하면 됩니다.

gcloud iam service-accounts keys create gcp-sa-credentials.json \\
  --iam-account [SA-NAME]@[PROJECT-ID].iam.gserviceaccount.com

다운로드 받은 구글 클라우드의 서비스 계정 키를, 쿠버네티스 Secret 에 등록합니다.

다음은 kubeflow 네임스페이스 Secret 을 생성하는 명령어 입니다.

kubectl -n kubeflow create secret generic kfp-gcp-sa \\
  --from-file=gcloud-application-credentials.json=gcp-sa-credentials.json

텐서플로우 mnist 모델을 학습하고 GCS에 저장하기

mnist 모델을 GCS에 저장하겠습니다. 코드와 컨테이너 이미지는 앞서 사용한 S3 저장 코드와 동일하기 때문에, “파이프라인 작성” 단계로 건너 뛰어도 됩니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='./model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-mnist-storage:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-mnist-storage:0.0.1

컴포넌트 작성

Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 gcs에 접근할 수 있도록, gcs 설정 정보를 환경 변수로 넘겨 주었습니다. kfp-gcp-sa 를 볼륨 마운트 한 다음, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 인증 정보가 들어 있는 json 파일의 위치를 설정해 줍니다.

		GCSCredentialFileName = "gcloud-application-credentials.json"
    GCSCredentialVolumeName = "user-gcp-sa"
    GCSCredentialVolumeMountPath = "/var/secrets/"
    GCSCredentialEnvKey = "GOOGLE_APPLICATION_CREDENTIALS"
    GCSCredentialFilePath = os.path.join(GCSCredentialVolumeMountPath, GCSCredentialFileName)

    secret_name = 'kfp-gcp-sa'

    dsl.ContainerOp(
        name='mnist-gcs',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', 'gcs://kfp-bucket/kfp/mnist/model']
    ).add_volume(V1Volume(name=GCSCredentialVolumeName, secret=V1SecretVolumeSource(secret_name=secret_name))) \\
        .add_volume_mount(V1VolumeMount(name=GCSCredentialVolumeName, mount_path=GCSCredentialVolumeMountPath)) \\
        .add_env_variable(V1EnvVar(name=GCSCredentialEnvKey, value=GCSCredentialFilePath))

파이프라인 작성

Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import os

import kfp
from kfp import dsl
from kubernetes.client.models import V1EnvVar, V1VolumeMount, V1Volume, \\
    V1SecretVolumeSource


def pipeline_gcs():
    GCSCredentialFileName = "gcloud-application-credentials.json"
    GCSCredentialVolumeName = "user-gcp-sa"
    GCSCredentialVolumeMountPath = "/var/secrets/"
    GCSCredentialEnvKey = "GOOGLE_APPLICATION_CREDENTIALS"
    GCSCredentialFilePath = os.path.join(GCSCredentialVolumeMountPath, GCSCredentialFileName)

    secret_name = 'kfp-gcp-sa'

    dsl.ContainerOp(
        name='mnist-gcs',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', 'gs://kfp-bucket/kfp/mnist/model']
    ).add_volume(V1Volume(name=GCSCredentialVolumeName, secret=V1SecretVolumeSource(secret_name=secret_name))) \\
        .add_volume_mount(V1VolumeMount(name=GCSCredentialVolumeName, mount_path=GCSCredentialVolumeMountPath)) \\
        .add_env_variable(V1EnvVar(name=GCSCredentialEnvKey, value=GCSCredentialFilePath))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_gcs, arguments={},
                                                        experiment_name='Sample Experiment')

gcp.use_gcp_secret() 사용하기

앞 예제에서는 환경 변수에 직접 값들을 설정해 주었습니다. kfp 에서 제공하는 gcp.use_gcp_secret() 메소드를 사용하면 보다 간단히 인증 정보를 설정할 수 있습니다.

다음은 gcp.use_gcp_secret() 를 사용하는 컴포넌트 예제입니다.

import kfp
from kfp import dsl
from kfp import gcp


def pipeline_use_gcp_secret():
    secret_name = 'kfp-gcp-sa'
    secret_file_path_in_volume = '/gcloud-application-credentials.json'

    dsl.ContainerOp(
        name='mnist_use_gcp_secret',
        image='kangwoo/kfp-mnist-storage:0.0.1',
        arguments=['--model', 'gs://kfp-bucket/kfp/mnist/model']
    ).apply(gcp.use_gcp_secret(secret_name, secret_file_path_in_volume=secret_file_path_in_volume))


if __name__ == '__main__':
    my_run = kfp.Client().create_run_from_pipeline_func(pipeline_use_gcp_secret, arguments={},
                                                        experiment_name='Sample Experiment')

한꺼번에 외부 저장소 추가하기

파이프라인의 단계가 많아지면 일일히 외부 저장소를 추가해 주는 작업이 번거로울 수 있습니다. 그럴 경우에는 다음 예제처럼 반복문을 이용해서 저장소를 추가해 줄 수 있습니다.

ingest = dsl.ContainerOp(...)
transformation = dsl.ContainerOp(...)
train = dsl.ContainerOp(...)

steps = [ingest, transformation, train]
for step in steps:
	step.apply(oprem.mount_pvc(pvc_name, 'local-storage', '/mnt'))

Kubeflow Pipelines – 파이프 라인 UI에서 결과 시각화

소개

Kubeflow Pipelines UI는 기본적으로 여러 유형의 시각화를 제공하고 있습니다. 그래서 성능 평가 및 비교 데이터를 보여주기 위한 방법으로 사용할 수 있습니다. 이 시각화 기능은 출력 뷰어를 사용하고 있습니다. 컴포넌트의 결과물을 출력 뷰어를 이용하여 시각화 할 수 있는 것입니다. 출력 뷰어를 사용하려면, 파이프 라인 컴포넌트의 애플리케이션에서 JSON 파일을 로컬 파일 시스템에 저장하면 됩니다.

시각화된 출력 결과는 Kubeflow 파이프 라인 UI를 통해서 확인 할 수 있습니다. Artifacts 페이지와 , Run output 페이지에서 시각화된 출력 결과를 조회 할 수 있습니다.

Artifacts

Artifacts 탭에는 선택한 파이프 라인 단계의 시각화가 표시됩니다.

Kubeflow 파이프 라인 UI에서 Artifacts 탭을 열려면, 다음 절차대로 진행하시면 됩니다.

  1. Experiments 클릭하여 현재 파이프 라인 실험 목록을 조회합니다.
  2. 보려는 실험의 ‘실험 이름’을 클릭하십시오.
  3. 보려는 실행의 “실행 이름”을 클릭하십시오.
  4. Graph 탭에서 보려는 파이프 라인 컴포넌트를 나타내는 단계를 클릭하십시오. 세부 사항이 Artifacts  탭을 표시하며 슬라이드 됩니다.

Run output

Run output 탭에는 선택한 실행의 모든 ​​단계에 대한 시각화가 표시됩니다.

Kubeflow 파이프 라인 UI에서 Run output 탭을 열려면, 다음 절차대로 진행하시면 됩니다.

  1. Experiments 클릭하여 현재 파이프 라인 실험 목록을 조회합니다.
  2. 보려는 실험의 ‘실험 이름’을 클릭하십시오.
  3. 보려는 실행의 “실행 이름”을 클릭하십시오.
  4. Run output 탭을 클릭하십시오.

출력 뷰어를 위한 메타데이터 저장하기

파이프 라인 컴포넌트의 결과물을 출력 뷰어를 통해서 시각화 할 수 있습니다. 출력 뷰어를 통해서 결과물을 시각화 하려면, JSON 형태의 메타 데이터 파일을 로컬 저장소에 작성해야 합니다. 파일 이름은 /mlpipeline-ui-metadata.json 을 사용합니다.

JSON은 outputs 배열을 가지고 있습니다. 각 outputs 항목은 출력 뷰어의 메타 데이터를 설명합니다. JSON 구조는 다음과 같습니다.

{
  "version": 1,
  "outputs": [
    {
      "type": "confusion_matrix",
      "format": "csv",
      "source": "my-dir/my-matrix.csv",
      "schema": [
        {"name": "target", "type": "CATEGORY"},
        {"name": "predicted", "type": "CATEGORY"},
        {"name": "count", "type": "NUMBER"},
      ],
      "labels": "vocab"
    },
    {
      ...
    }
  ]
}

출력 뷰어의 메타 데이터에는 생성할 뷰어와 사용할 데이터에 대한 정의가 포함되어 있습니다.

Kubeflow Pipilines 시스템은 컴포넌트가 컨테이너 파일 시스템에 저장한 메타 데이터 파일을 읽어와서 Kubeflow Pipelines UI에서 지정한 뷰어를 생성합니다. Kubeflow Pipelines UI는 데이터를 메모리에 로드하여 렌더링합니다.

컴포넌트가 해당 메타데이터 파일을 컨테이너 파일 시스템에 쓰는 경우 Kubeflow Pipelines 시스템은 파일을 자동으로 Artifact 저장소에 저장합니다. Kubeflow Pipelines UI는 이 파일을 사용하여 지정된 뷰어를 생성합니다. 메타 데이터에는 Kubeflow Pipelines UI 가 아티팩트 데이터를 가져오기 위한 위치가 정의되어 있습니다. Kubeflow Pipelines UI는 데이터를 메모리에 가져와서 렌더링합니다.

주의 : Kubeflow Pipilines 0.3.0 버전에서 지원하는 아티팩터 데이터의 사용 가능한 위치는 Google Cloud Storage, Amazon S3, http https 입니다. 이슈 데이터를 정상적으로 가져오기 위해서는 Kubeflow Pipelines UI 에서 해당 아티팩터 데이터 위치로 접근할 수 있어야합니다. 그래서 설치 환경에 따라서 별도의 접근 권한을 Kubeflow Pipelines UI 에 부여해줘야 할 수도 있습니다.

다음 표는 outputs 배열에서 지정할 수 있는 메타 데이터 필드입니다. 각 output 항목에는 type이 있어야합니다. type 값에 따라 사용하는 필드 값은 약간 다를 수 있습니다.나중에 페이지의 출력 뷰어 목록에 설명 된대로 다른 필드가 필요할 수도 있습니다.

outputs

필드 이름설명
format아티팩트 데이터의 형식입니다. 기본값은 csv입니다. (현재 사용 가능한 유일한 형식은 csv입니다.)
header아티팩터 데이터의 헤더로 사용될 문자열 목록입니다. 예를 들어, 테이블에서 이 문자열은 첫 번째 행에서 사용됩니다.
labels이슈 열 또는 행의 레이블로 사용되는 문자열 목록입니다.
predicted_col예측 열의 이름입니다.
schema아티팩트 데이터의 스키마를 지정하는 {type, name} 객체의 목록입니다.
source데이터의 전체 경로입니다. 사용 가능한 위치에는 http, https, Amazon S3 및 Google Cloud Storage가 포함됩니다. 경로에는 와일드 카드 ‘*’가 포함될 수 있으며,이 경우 Kubeflow Pipelines UI는 일치하는 소스 파일의 데이터를 연결합니다. source는 인라인 문자열 데이터도 포함 할 수 있습니다. storage 이 inline일 때 경로 대신 문자열 데이터를 포함합니다.
storagestorage 가 inline 인 경우 source값이 위치 대신 인라인 데이터로 사용 됩니다. 이것은 텐서 보드를 제외한 모든 유형의 출력에 적용됩니다.
target_col대상 열의 이름입니다.
type데이터를 시각화하는 데 사용되는 뷰어의 이름입니다. 아래 목록은 사용 가능한 유형을 보여줍니다.

사용 가능한 출력 뷰어

Kubeflow Pipelines 에서 제공하고 있는 출력 뷰어는 다음과 같습니다.

  • Confusion matrix
  • Markdown
  • ROC curve
  • Table
  • TensorBoard
  • Web app

사용 가능한 뷰어 유형과 각 유형에 필요한 메타 데이터 필드에 대해 알아보도록 하겠습니다.

출력 뷰어 사용하기

사용 가능한 뷰어 유형과 각 유형에 필요한 메타 데이터 필드에 대해 알아보도록 하겠습니다.

Confusion matrix

분류결과표(Confusion Matrix)는 대상의 원래 클래스와 모델이 예측한 클래스가 일치 하는 경우의 개수를 세어서, 그 결과를 표나 나타낸 것입니다. 정답 클래스는 행(row)으로 예측한 클래스는 열(column)로 나타냅니다.

타입: confusion_matrix

메타 데이터 필수 필드 :

  • format
  • labels
  • schema
  • source

메타 데이터 선택 필드 :

  • storage

뷰어는 다음 위치에서 분류 결과 데이터를 읽을 수 있습니다

  • source 필드에 포함 된 분류 결과 형식의 문자열입니다. storage 필드의 값은 inline 이어야 합니다.
  • source 필드에 지정한 경로의 원격 파일에 분류 결과를 읽어옵니다.. storage 필드는 비어 있거나 inline을 제외한 모든 값을 포함 할 수 있습니다.

confusion_matrix 뷰어는 데이터 구문을 분석하기 위해 schema에 정의된 값들을 사용합니다. labels은 x 및 y 축에 나타낼 클래스 이름입니다.

Example:

	metadata = {
    'outputs' : [
    # Confustion matrix that is hardcoded inline
    {
      'type': 'confusion_matrix',
      'format': 'csv',
      'schema': [
        {'name': 'target', 'type': 'CATEGORY'},
        {'name': 'predicted', 'type': 'CATEGORY'},
        {'name': 'count', 'type': 'NUMBER'},
      ],
      'source': <CONFUSION_MATRIX_CSV_INLINE>
			'storage': 'inline',
      'labels': list(map(str, vocab)),
    },
    # Confustion matrix that is read from a file
		{
      'type': 'confusion_matrix',
      'format': 'csv',
      'schema': [
        {'name': 'target', 'type': 'CATEGORY'},
        {'name': 'predicted', 'type': 'CATEGORY'},
        {'name': 'count', 'type': 'NUMBER'},
      ],
      'source': <CONFUSION_MATRIX_CSV_FILE>,
      'labels': list(map(str, vocab)),
    }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

파이프 라인을 구성하고 실행하기

분류결과표(Confusion Matrix)는 출력하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, sklearn을 사용한 간단한 분류결과표 프로그램 코드를 작성합니다. confusion_matrix() 메소드를 사용하여 분류결과표를 만들고, csv 형태의 파일로 저장하기 위해서 데이터를 가공합니다. 예제에서는 데이터를 외부 저장소에 저장하지 않고 inline 으로 사용하기 위해서, 저장한 csv 데이터를 다시 문자열로 읽어온 후, 메타데이터의 source 의 필드에 저장합니다. 만약 외부 저장소를 사용하고 싶으면, storage 필드를 삭제하고, source 필드에 저장할 곳의 위치를 지정하면 됩니다.

다음은 프로그램의 전체 코드입니다.

src/confusion_matrix.py

import json
import os

import pandas as pd
from sklearn.metrics import confusion_matrix

y_target = [2, 0, 2, 2, 0, 1]
y_pred = [0, 0, 2, 2, 0, 2]

vocab = [0, 1, 2]

cm = confusion_matrix(y_target, y_pred, labels=vocab)

data = []
for target_index, target_row in enumerate(cm):
    for predicted_index, count in enumerate(target_row):
        data.append((vocab[target_index], vocab[predicted_index], count))

df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])

output = '.'
cm_file = os.path.join(output, 'confusion_matrix.csv')
with open(cm_file, 'w') as f:
    df_cm.to_csv(f, columns=['target', 'predicted', 'count'], header=False, index=False)

lines = ''
with open(cm_file, 'r') as f:
    lines = f.read()

metadata = {
    'outputs': [{
            'type': 'confusion_matrix',
            'format': 'csv',
            'schema': [
                {'name': 'target', 'type': 'CATEGORY'},
                {'name': 'predicted', 'type': 'CATEGORY'},
                {'name': 'count', 'type': 'NUMBER'},
            ],
            'source': lines,
            'storage': 'inline',
            'labels': list(map(str, vocab)),
        }]
}

with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM python:3.6.10-slim

RUN pip install sklearn pandas

COPY ./src /app
WORKDIR /app

	CMD ["python", "/app/confusion_matrix.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-confusion-matrix:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-confusion-matrix:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

dsl.ContainerOp(
    name='confusion-matrix',
    image='kangwoo/kfp-confusion-matrix:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl

def confusion_matrix_pipeline():
  dsl.ContainerOp(
    name='confusion-matrix',
    image='kangwoo/kfp-confusion-matrix:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(confusion_matrix_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 confusion_matrix 화면입니다.


Markdown

마크 다운 뷰어는 Kubeflow 파이프 라인 UI에서 마크 다운 문자열을 렌더링합니다.

타입 : markdown

메타 데이터 필수 필드 :

  • source

메타 데이터 선택 필드 :

  • storage :

뷰어는 다음 위치에서 마크 다운 데이터를 읽을 수 있습니다

  • source 필드에 포함 된 마크 다운 형식 문자열입니다. storage 필드의 값은 inline 이어야 합니다.
  • source 필드에 지정한 경로에서 원격 파일의 마크 다운 코드. storage 필드는 비어 있거나 inline을 제외한 모든 값을 포함 할 수 있습니다.

Example:

	metadata = {
    'outputs' : [
    # Markdown that is hardcoded inline
    {
      'storage': 'inline',
      'source': '# Inline Markdown\\n[A link](<https://www.kubeflow.org/>)',
      'type': 'markdown',
    },
    # Markdown that is read from a file
    {
      'source': 'gs:///your_bucket/your_markdown_file',
      'type': 'markdown',
    }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

파이프 라인을 구성하고 실행하기

마크 다운을 출력하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, 마크 다운을 저장하는 프로그램 코드를 작성합니다. 예제에서는 데이터를 외부 저장소에 저장하지 않고 inline 으로 사용하기 위해서, 저장한 csv 데이터를 다시 문자열로 읽어온 후, 메타데이터의 source 의 필드에 저장합니다. 만약 외부 저장소를 사용하고 싶으면, storage 필드를 삭제하고, source 필드에 저장할 곳의 위치를 지정하면 됩니다.

다음은 프로그램의 전체 코드입니다.

src/markdown.py

import json

metadata = {
    'outputs': [{
            'storage': 'inline',
            'source': '# Inline Markdown\\n[A link](<https://www.kubeflow.org/>)',
            'type': 'markdown',
        }]
}

with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM python:3.6.10-slim

COPY ./src /app
WORKDIR /app

CMD ["python", "/app/markdown.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-markdown:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-markdown:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

dsl.ContainerOp(
    name='markdown',
    image='kangwoo/kfp-markdown:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl


def markdown_pipeline():
  dsl.ContainerOp(
    name='markdown',
    image='kangwoo/kfp-markdown:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(markdown_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 markdown 화면입니다.


ROC curve

ROC(Receiver Operator Characteristic) 곡선은 클래스 판별 기준값의 변화에 따른 위양성률(fall-out)과 재현율(recall)의 변화를 시각화한 것이다. 위양성율(fall-out)은 실제 양성 클래스에 속하지 않는 표본 중에 양성 클래스에 속한다고 출력한 표본의 비율을 뜻합니다. 위양성율은 FPR(false positive rate)라고도 합니다. 재현율(recall)은 실제 양성 클래스에 속한 표본 중에 양성 클래스에 속한다고 출력한 표본의 비율을 뜻합니다. 재현율은 TPR(true positive rate)라고도 합니다. 위양성률(fall-out)과 재현율(recall)은 일반적으로 양의 상관 관계가 있습니다.

타입 : roc

메타 데이터 필수 필드 :

  • format
  • schema
  • source

메타 데이터 선택 필드 :

  • storage

뷰어는 다음 위치에서 데이터를 읽을 수 있습니다

  • source 필드에 포함 된 ROC 곡선 형식의 문자열입니다. storage 필드의 값은 inline 이어야 합니다.
  • source 필드에 지정한 경로의 원격 파일에 분류 결과를 읽어옵니다. storage 필드는 비어 있거나 inline을 제외한 모든 값을 포함 할 수 있습니다.

Example:

	metadata = {
    'outputs': [
    # Roc that is hardcoded inline
    {
      'type': 'roc',
      'format': 'csv',
      'schema': [
        {'name': 'fpr', 'type': 'NUMBER'},
        {'name': 'tpr', 'type': 'NUMBER'},
        {'name': 'thresholds', 'type': 'NUMBER'},
      ],
      'source': <ROC_CSV_INLINE>
			'storage': 'inline',      
    },
    # Roc that is read from a file
		{
      'type': 'roc',
      'format': 'csv',
      'schema': [
        {'name': 'fpr', 'type': 'NUMBER'},
        {'name': 'tpr', 'type': 'NUMBER'},
        {'name': 'thresholds', 'type': 'NUMBER'},
      ],
      'source': <ROC_CSV_FILE>
    }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

파이프 라인을 구성하고 실행하기

ROC(Receiver Operator Characteristic) 곡선을 출력하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, sklearn을 이용하여 ROC 곡선을 생성하는 프로그램 코드를 작성합니다. roc_curve() 메소드를 사용하여 ROC 곡선 만들고, csv 형태의 파일로 저장하기 위해서 데이터를 가공합니다. 예제에서는 데이터를 외부 저장소에 저장하지 않고 inline 으로 사용하기 위해서, 저장한 csv 데이터를 다시 문자열로 읽어온 후, 메타데이터의 source 의 필드에 저장합니다. 만약 외부 저장소를 사용하고 싶으면, storage 필드를 삭제하고, source 필드에 저장할 곳의 위치를 지정하면 됩니다.

다음은 프로그램의 전체 코드입니다.

src/roc.py

import json
import os

import pandas as pd
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve

X, y = make_classification(n_samples=1000, weights=[0.95, 0.05], random_state=5)

model = LogisticRegression().fit(X, y)
y_hat = model.predict(X)

fpr, tpr, thresholds = roc_curve(y, model.decision_function(X))

output = '.'
df_roc = pd.DataFrame({'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds})
roc_file = os.path.join(output, 'roc.csv')
with open(roc_file, 'w') as f:
    df_roc.to_csv(f, columns=['fpr', 'tpr', 'thresholds'], header=False, index=False)

lines = ''
with open(roc_file, 'r') as f:
    lines = f.read()

metadata = {
    'outputs': [{
        'type': 'roc',
        'format': 'csv',
        'schema': [
            {'name': 'fpr', 'type': 'NUMBER'},
            {'name': 'tpr', 'type': 'NUMBER'},
            {'name': 'thresholds', 'type': 'NUMBER'},
        ],
        'source': lines,
        'storage': 'inline',
    }]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM python:3.6.10-slim

RUN pip install sklearn pandas

COPY ./src /app
WORKDIR /app

CMD ["python", "/app/roc.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-roc:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-roc:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

dsl.ContainerOp(
    name='roc',
    image='kangwoo/kfp-roc:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl


def roc_pipeline():
    dsl.ContainerOp(
        name='roc',
        image='kangwoo/kfp-roc:0.0.1',
        output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
    )


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(roc_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 roc 화면입니다.

ROC 곡선을 볼 때, 커서를 ROC 곡선 위로 가져 가면 커서의 가장 가까운 fpr 및 tpr 값에 사용 된 thresholds 값을 볼 수 있습니다.


Table

table 뷰어는 지정된 소스 경로의 데이터에서 HTML 테이블을 작성합니다. 여기서 헤더 필드는 테이블의 첫 번째 행에 표시 될 값을 지정합니다. 테이블은 페이징 기능을 지원합니다.

타입 : table

메타 데이터 필수 필드 :

  • format
  • header
  • source

메타 데이터 선택 필드 :

  • storage

뷰어는 다음 위치에서 데이터를 읽을 수 있습니다

  • source 필드에 포함 된 데이터 문자열입니다. storage 필드의 값은 inline 이어야 합니다.
  • source 필드에 지정한 경로의 원격 파일에 분류 결과를 읽어옵니다. storage 필드는 비어 있거나 inline을 제외한 모든 값을 포함 할 수 있습니다.

Example:

	metadata = {
    'outputs' : [
    # Table that is hardcoded inline
    {
      'type': 'table',
      'format': 'csv',
      'header': [x['name'] for x in schema],
      'source': <TABLE_CSV_INLINE>
			'storage': 'inline',
    },
    # Table that is read from a file
		{
      'type': 'table',
      'format': 'csv',
      'header': [x['name'] for x in schema],
      'source': <TABLE_CSV_FILE>
    }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

파이프 라인을 구성하고 실행하기

테이블을 출력하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, 테이블을 생성하는 프로그램 코드를 작성합니다. 테이블롤 출력할 데이터를 만든 후, csv 형태의 파일로 저장하기 위해서 데이터를 가공합니다. 예제에서는 데이터를 외부 저장소에 저장하지 않고 inline 으로 사용하기 위해서, 저장한 csv 데이터를 다시 문자열로 읽어온 후, 메타데이터의 source 의 필드에 저장합니다. 만약 외부 저장소를 사용하고 싶으면, storage 필드를 삭제하고, source 필드에 저장할 곳의 위치를 지정하면 됩니다.

다음은 프로그램의 전체 코드입니다.

src/table.py

import json
import os

import pandas as pd
from sklearn.metrics import classification_report

y_true = [1, 0, 1, 1, 0, 1]
y_pred = [0, 1, 1, 0, 0, 1]

target_names = ['class 0', 'class 1']
report = classification_report(y_true, y_pred, target_names=target_names, output_dict=True)
print(report)

df_report = pd.DataFrame(report).transpose()

output = '.'
table_file = os.path.join(output, 'table.csv')
with open(table_file, 'w') as f:
    df_report.to_csv(f, header=False)

lines = ''
with open(table_file, 'r') as f:
    lines = f.read()

metadata = {
    'outputs': [{
        'type': 'table',
        'format': 'csv',
        'header': [''] + [x for x in df_report],
        'source': lines,
        'storage': 'inline',
    }]
}

with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM python:3.6.10-slim

RUN pip install sklearn pandas

COPY ./src /app
WORKDIR /app

CMD ["python", "/app/table.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-table:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-table:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

dsl.ContainerOp(
    name='table',
    image='kangwoo/kfp-table:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl


def table_pipeline():
  dsl.ContainerOp(
    name='table',
    image='kangwoo/kfp-table:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(table_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 table 화면입니다.

TensorBoard

tensorboard 뷰어는 Start Tensorboard 버튼을 출력 페이지에 추가합니다.

타입 : tensorboard

메타 데이터 필수 필드 :

  • source

출력 페이지에서 다음을 수행 할 수 있습니다.

  • Start Tensorboard 을 클릭하면, Kubeflow 클러스터에 Tensorboard 인스턴스가 시작됩니다. Tensorboard 포드가 실행되면, 버튼 텍스트가Open Tensorboard 로 전환됩니다.
  • Open Tensorboard 를 클릭하면 source 필드에 지정한 logdir 데이터를 읽어오는 TensorBoard 화면에 접속 할 수 있습니다..
  • Delete Tensorboard 을 클릭하면, Tensorboard 인스턴스가 종료됩니다.

Kubeflow Pipelines UI는 TensorBoard 인스턴스를 완전히 관리하지 않습니다. Start Tensorboard  버튼은 편리한 기능이므로 파이프 라인 실행을 볼 때 워크 플로우를 중단 할 필요가 없습니다. Kubernetes 관리 도구를 사용하여 TensorBoard 포드를 재활용하거나 삭제해야합니다.

Example:

	metadata = {
    'outputs' : [{
      'type': 'tensorboard',
      'source': <TENSORBOARD_PATH>,
    }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

외부 저장소 설정 하기

tensorboard 뷰어는 텐서보드를 실행할 수 있는 버튼을 제공하고 있습니다. 버튼을 클릭하면 텐서보드가 실행이 됩니다. 이때 실행되는 텐서보드는 텐서플로우의 로그가 저장된 위치에 접근할 수 있어야합니다. 그래서 텐서보드가 지원하는 형태의 외부 저장소가 필요합니다.

예제에서는 kubeflow와 함께 설치된 minio를 S3 호환 저장소로 사용하겠습니다. 만약 별도의 저장소를 사용하고 있다면, 외부 저장소 설정 하기는 넘어가도 됩니다.

가. minio에 사용할 버킷을 생성하겠습니다.

버킷을 생성하기 위해서 minio에 접속해야합니다.

다음 명령어를 실행하면, minio의 서비스 정보를 조회할 수 있습니다.

kubectl -n kubeflow get service minio-service

정상적으로 실행되면, 다음과 같은 결과를 확인 할 수 있습니다.

NAME            TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
minio-service   ClusterIP   10.103.56.47   <none>        9000/TCP   15d

로컬 환경에서 minio에 접속하기 위해서 port-forward 를 실행합니다.

kubectl -n kubeflow port-forward svc/minio-service 9000:9000

포트 포워딩이 실행되면, 다음과 같은 결과를 확인 할 수 있습니다.

Forwarding from 127.0.0.1:9000 -> 9000
Forwarding from [::1]:9000 -> 9000

웹브라이저를 실행시켜서 http://localhost:9000/ 으로 접속합니다.

정상적으로 접속되면, 로그인 화면을 확인 할 수 있습니다.

AccessKey 와 Secret Key 를 입력합니다. 기본 값을 minio / minio123 입니다.

로그인 되면 다음과 같은 화면을 볼 수 있습니다.

화면 오른쪽 하단의 + 버튼을 누른 후, “Create bucket”을 눌러서 버킷을 생성합니다.

예제서 사용할 버컷의 이름은 tensorboard 입니다.

나. 텐서보드가 s3에 접근할 수 있도록 인증 정보를 설정하겠습니다.

s3에 접근 하기위한 AccessKey와 SecretKey 정보를 쿠버네티스의 Secret 리소스로 저장하겠습니다.

다음 명령어를 실행하면 시크릿 리소스가 생성됩니다.

export AWS_ACCESS_KEY_ID=minio
export AWS_SECRET_ACCESS_KEY=minio123

kubectl -n kubeflow create secret generic ml-pipeline-aws-secret \\
    --from-literal=accesskey=$AWS_ACCESS_KEY_ID \\
    --from-literal=secretkey=$AWS_SECRET_ACCESS_KEY

텐서보드가 실행될때 s3에 접근할 수 있도록, s3 설정 정보를 쿠버네티스 ConfigMap 리소스로 저장하겠습니다. 중요한 정보인 AWS_ACCESS_KEY_ID 와 AWS_SECRET_ACCESS_KEY 는 앞서 생성한 Secret 리소스에서 가져오게 되어있습니다.

viewer-tensorboard-template-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: ml-pipeline-ui-viewer-template
data:
  viewer-tensorboard-template.json: |
    {
        "spec": {
            "containers": [
                {
                    "env": [
                        {
                            "name": "AWS_ACCESS_KEY_ID",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "ml-pipeline-aws-secret",
                                    "key": "accesskey"
                                }
                            }
                        },
                        {
                            "name": "AWS_SECRET_ACCESS_KEY",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "ml-pipeline-aws-secret",
                                    "key": "secretkey"
                                }
                            }
                        },
                        {
                            "name": "S3_ENDPOINT",
                            "value": "minio-service.kubeflow.svc.cluster.local:9000"
                        },
                        {
                            "name": "AWS_ENDPOINT_URL",
                            "value": "<http://minio-service.kubeflow.svc.cluster.local:9000>"
                        },
                        {
                            "name": "AWS_REGION",
                            "value": "us-east-1"
                        },
                        {
                            "name": "S3_USE_HTTPS",
                            "value": "0"
                        },
                        {
                            "name": "S3_VERIFY_SSL",
                            "value": "0"
                        }
                    ]
                }
            ]
        }
    }

다음 명령어를 실행하여, ml-pipeline-ui-viewer-template ConfigMap을 생성합니다.

kubectl -n kubeflow create -f viewer-tensorboard-template-configmap.yaml

다. 이제 생성한 ConfigMap을 사용할 수 있도록, ml-pipeline-ui 의 설정 정보를 변경하도록 하겠습니다.

다음 명령어를 실행하여 ml-pipeline-ui 디플로이먼트를 수정합니다.

kubectl -n kubeflow edit deployment ml-pipeline-ui

생성한 ConfigMap 을 볼륨으로 마운트 해 줍니다.

...
        volumeMounts:
        - mountPath: /etc/config
          name: config-volume
...
     volumes:
      - configMap:
          defaultMode: 420
          name: ml-pipeline-ui-viewer-template
        name: config-volume

마운트한 볼륨에 있는 viewer-tensorboard-template.json 파일을 VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH 라는 환경 변수를 지정해 줍니다.

      containers:
      - env:
        - name: VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH
          value: /etc/config/viewer-tensorboard-template.json

다음은 변경한 ml-pipeline-ui 디플로이먼트 매니페스트의 일부분 입니다.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
...
  name: ml-pipeline-ui
  namespace: kubeflow
...
spec:
...
  template:
...
    spec:
      containers:
      - env:
        - name: VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH
          value: /etc/config/viewer-tensorboard-template.json
...
        image: gcr.io/ml-pipeline/frontend:0.3.0
        imagePullPolicy: IfNotPresent
        name: ml-pipeline-ui
...
        volumeMounts:
        - mountPath: /etc/config
          name: config-volume
...
      dnsPolicy: ClusterFirst
...
      volumes:
      - configMap:
          defaultMode: 420
          name: ml-pipeline-ui-viewer-template
        name: config-volume
...

변경한 내용을 저장하면, 새로운 설정이 적용된 포드가 실행됩니다.

파이프 라인을 구성하고 실행하기

텐서보드 뷰어를 사용하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, 텐서플로우 로그 데이터를 저장하는 프로그램 코드를 작성합니다. 텐서플로우 케라스를 이용하여 mnist 이미지를 식별하는 모델을 생성합니다. 그리고 tf.keras.callbacks.TensorBoard 를 사용하여 학습할때 로그를 남깁니다. 그리고 Kubeflow Pipelines 에서 텐서보드 뷰어를 사용할 수 있도록 /mlpipeline-ui-metadata.json 파일에 메타데이터를 저장합니다.

다음은 프로그램의 전체 코드입니다.

src/tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import json

import tensorflow as tf


def train():
    parser = argparse.ArgumentParser()
    parser.add_argument('--tb_log_dir', default='./data/logs', type=str)
    args = parser.parse_args()

    tb_log_dir = args.tb_log_dir

    print("TensorFlow version: ", tf.__version__)

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    model.summary()

    callbacks = [tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir)]

    print("Training...")
    model.fit(x_train, y_train, epochs=5, validation_split=0.2, callbacks=callbacks)

    score = model.evaluate(x_test, y_test, batch_size=128)
    print('Test accuracy: ', score[1])

    metadata = {
        'outputs': [{
            'type': 'tensorboard',
            'source': tb_log_dir,
        }]
    }
    with open('/mlpipeline-ui-metadata.json', 'w') as f:
        json.dump(metadata, f)


if __name__ == '__main__':
    train()

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

COPY ./src /app
WORKDIR /app


ENTRYPOINT ["python", "/app/tensorflow_mnist.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-tensorboard:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-tensorboard:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

텐서플로우가 실행되는 컴포트넌트에서 s3에 접근할 수 있도록, s3 설정 정보를 환경 변수로 넘겨 주었습니다.

s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000'
    minio_endpoint = "http://" + s3_endpoint
    minio_username = "minio"
    minio_key = "minio123"
    minio_region = "us-east-1"

    dsl.ContainerOp(
        name='tensorboard',
        image='kangwoo/kfp-tensorboard:0.0.1',
        arguments=['--tb_log_dir', tb_log_dir],
        output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
    ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID', value=minio_username)) \\
        .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY', value=minio_key)) \\
        .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\
        .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\
        .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl

from kubernetes.client.models import V1EnvVar


def tensorboard_pipeline(tb_log_dir):
    s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000'
    minio_endpoint = "http://" + s3_endpoint
    minio_username = "minio"
    minio_key = "minio123"
    minio_region = "us-east-1"

    dsl.ContainerOp(
        name='tensorboard',
        image='kangwoo/kfp-tensorboard:0.0.1',
        arguments=['--tb_log_dir', tb_log_dir],
        output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
    ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\
        .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID', value=minio_username)) \\
        .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY', value=minio_key)) \\
        .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\
        .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\
        .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))


if __name__ == '__main__':
    arguments = {'tb_log_dir': 's3://tensorboard/mnist'}
    my_run = kfp.Client().create_run_from_pipeline_func(tensorboard_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 tensorboard 화면입니다.

구동하면 싶은 텐서보드의 버전을 선택할 수 있습니다. “Start Tensorboard”를 클릭하면 텐서보드가 실행됩니다.

텐서보드가 실행되면 버튼이 “Open Tensorboard”로 바뀝니다. “Open Tensorboard” 버튼을 클릭하면 텐서보드 화면으로 접속할 수 있습니다.

다음은 텐서보드 접속 화면입니다.


Web app

web-app 뷰어는 사용자 정의 출력을 렌더링 할 수있는 유연성을 제공합니다.

타입 : web-app

메타 데이터 필수 필드 :

  • source

메타 데이터 선택 필드 :

  • storage

뷰어는 다음 위치에서 데이터를 읽을 수 있습니다

  • source 필드에 포함 된 데이터 문자열입니다. storage 필드의 값은 inline 이어야 합니다.
  • source 필드에 지정한 경로의 원격 파일에 분류 결과를 읽어옵니다. storage 필드는 비어 있거나 inline을 제외한 모든 값을 포함 할 수 있습니다.

컴포넌트가 생성하는 HTML 파일을 지정할 수 있습니다. Kubeflow Pipelines UI는 해당 HTML을 출력 페이지에서 렌더링합니다. HTML 파일은 파일 시스템의 다른 파일에 대한 참조가 없어야 합니다. HTML 파일에는 웹 파일에 대한 절대 참조가 포함될 수 있습니다. web-app 내에서 실행되는 콘텐츠는 iframe에서 샌드박스 처리되며 Kubeflow Pipelines UI와 통신 할 수 없습니다.

Example:

static_html_path = os.path.join(output_dir, _OUTPUT_HTML_FILE)
  file_io.write_string_to_file(static_html_path, rendered_template)

  metadata = {
    'outputs' : [
    # Web app that is hardcoded inline
    {
      'type': 'web-app',
      'storage': 'inline',
      'source': <STATIC_HTML_INLINE>,
    },
    # Web app  that is read from a file
		{
      '
      'type': 'web-app',
      'source': <STATIC_HTML_PATH>,
    }]
  }
  with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

파이프 라인을 구성하고 실행하기

web-app을 출력하는 파이프 라인을 만들어 보겠습니다.

가. 프로그램 코드를 작성합니다.

컴포넌트를 만들기 위하여, HTML 생성하는 프로그램 코드를 작성합니다. 예제에서는 데이터를 외부 저장소에 저장하지 않고 inline 으로 사용하기 위해서, 저장한 csv 데이터를 다시 문자열로 읽어온 후, 메타데이터의 source 의 필드에 저장합니다. 만약 외부 저장소를 사용하고 싶으면, storage 필드를 삭제하고, source 필드에 저장할 곳의 위치를 지정하면 됩니다.

다음은 프로그램의 전체 코드입니다.

src/webapp.py

import json

metadata = {
    'outputs': [{
        'type': 'web-app',
        'storage': 'inline',
        'source': '<p><strong>Kubeflow pipelines</strong> are reusable end-to-end ML workflows built using the Kubeflow Pipelines SDK.</p>',
    }]
}

with open('/mlpipeline-ui-metadata.json', 'w') as f:
    json.dump(metadata, f)

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM python:3.6.10-slim

COPY ./src /app
WORKDIR /app

CMD ["python", "/app/webapp.py"]

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/kfp-webapp:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/kfp-webapp:0.0.1

다. Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 그리고 output_artifact_paths 파라미터를 사용하여, 메트릭 파일이 저장된 경로를 지정해 줍니다.

  dsl.ContainerOp(
    name='webapp',
    image='kangwoo/kfp-webapp:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )

라. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.

import kfp
from kfp import dsl


def webapp_pipeline():
  dsl.ContainerOp(
    name='webapp',
    image='kangwoo/kfp-webapp:0.0.1',
    output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}
  )


if __name__ == '__main__':
    arguments = {}
    my_run = kfp.Client().create_run_from_pipeline_func(webapp_pipeline,
                                                        arguments=arguments,
                                                        experiment_name='Sample Experiment')

다음은 Kubeflow 파이프 라인 UI의 web-app 화면입니다.