Подорож у створення свого власного Kubernetes.

pic

Останні кілька місяців я працював над особистим проєктом — створенням Kubernetes самостійно.

Чому я вирішив це зробити?
Мені подобається програмування, DevOps, а найголовніше — поєднання цих двох напрямків. Останні два роки я працював над управлінням кількома кластерами Openshift.
Вважаю, що для того, щоб зрозуміти, як працює програма, необхідно вивчити, як вона побудована, і спробувати написати її самостійно.

У цій статті я поділюсь основними моментами та етапами, з якими я стикався під час розробки. Якщо ви хочете одразу перейти до коду, можете зайти на джерело коду.

Попередні вимоги:

  • Знання Golang
  • Знання принципів роботи Kubernetes (kube-api, etcd, kube-proxy, kubelet)

Важливе зауваження перед початком:

  • Проєкт «own-kubernetes» підтримує функціональність створення простору імен, створення та видалення подів за допомогою YAML, створення та видалення сервісів (clusterIP та NodePort), що пов'язані з цими подами (endpoints створюються під капотом)
  • Я намагався створити свій Kubernetes раніше і зробив його більше монолітним, аніж Kubernetes-подібним. Ви можете переглянути серію статей, яку я публікував на dev.to
  • Я розумію, що код міг бути більш загальним і відповідати принципам SOLID, але мені довелося йти на компроміси, щоб виконати завдання (врешті-решт я це робив у свій вільний час)
  • Це, звісно, спрощений код робочої оркестрації, подібної до Kubernetes. Я ніколи не працював у Kubernetes чи не вносив внесок у вихідний код (я намагався побудувати його, використовуючи свої знання Kubernetes та вивчаючи вихідний код Kubernetes, статті та налагоджуючи робочий кластер)

Структура проєкту

Проєкт складається з 4 «частин»:

  • kube-api: API сервер, написаний за допомогою go-restful, використовується для отримання HTTP запитів щодо ресурсів і взаємодії з etcd та його спостерігачами.
  • kubelet: бінарник Golang, що працює всередині вузла та керує операціями створення, оновлення та видалення контейнерів.
  • kubeproxy: частина kubelet, яка керує операціями створення та видалення сервісів і endpoints для зв'язку між подами.
  • own-kubectl: бінарник Golang для взаємодії з Kubernetes (як kubectl).

Процес запуску проєкту полягає в запуску віртуальної машини, запуску процесу kubelet, який запускає kube-api, etcd, а потім взаємодії з API.

Наступне відео демонструє робочий вузол з kubelet та створення 3 подів із сервісом node-port на ньому.
Ліва верхня панель призначена для SSH підключення до вузла та запуску kubelet.
Права панель використовується для потокової передачі журналів kubelet та kube-proxy (тут можна побачити створення контейнерів і сервісів).
Нижня ліва панель призначена для виконання команд own-kubectl.

Потік створення пода

Розглянемо потік створення пода від початку до кінця:

  1. own-kubectl надсилає YAML з деталями пода до API для створення.
  2. API отримує ці запити і зберігає нову специфікацію пода в ETCD.
  3. kubelet, що слухає дії пода, ініціюється.
  4. kubelet створює контейнер Pause за допомогою containerd (подробиці використання pause container можна знайти тут).
  5. kubelet створює нові контейнери з просторами імен IPC і Network за допомогою containerd.
  6. kubelet перевіряє наявні IP-адреси в CIDR пода і призначає veth до мережевого простору імен пода.
    7.
    kubelet надсилає запит назад до API, що створення завершено, з вказівкою статусу та додаткової інформації.

Тепер давайте подивимося на деякий код!
Дія "watcher-listener" є дуже поширеною в Kubernetes, і ми будемо часто з нею стикатися, тому давайте розглянемо, як вона реалізована.

У kube-api ми використовуємо event-stream HTTP та канали, щоб коли клієнт запитує спостереження за ресурсом за допомогою селектора полів, ми відкриваємо канал для спостереження в etcd і чекаємо на дію.
коли відбувається дія, ми надсилаємо її клієнту і чекаємо на наступну.

func (namespace *Namespace) watcher(req *restful.Request, resp *restful.Response, etcdKey string) {  
 fieldSelector := req.QueryParameter("fieldSelector")  
 namespaceQuery := req.PathParameter("namespace")  

 resp.Header().Set("Access-Control-Allow-Origin", "*")  
 resp.Header().Set("Content-Type", "text/event-stream")  
 resp.Header().Set("Cache-Control", "no-cache")  
 resp.Header().Set("Connection", "keep-alive")  

 watchChan, closeChanFunc, err := etcdServiceAppNamespace.GetWatchChannel(fmt.Sprintf("%s/%s", etcdKey, namespaceQuery))  
 if err != nil {  
 err = resp.WriteErrorString(http.StatusBadRequest, err.Error())  
 if err != nil {  
 fmt.Printf("error while sending error: %v", err)  
 }  

 return  
 }  
 defer closeChanFunc()  
 defer resp.CloseNotify()  

 log.Println("Client pod watcher started")  

 for {  
 select {  
 case watchResp := <-watchChan:  
 if watchResp.Err() != nil {  
 fmt.Printf("error while sending error: %v", err)  
 }  

 for _, event := range watchResp.Events {  
 log.Printf("watch: %s executed on %s with value %s\n", event.Type, string(event.Kv.Key), string(event.Kv.Value))  

 if fieldSelector == "" {  
 fmt.Fprintf(resp, "Type: %s Value: %s\n", event.Type, string(event.Kv.Value))  
 } else {  
 splitedFieldSelector := strings.Split(fieldSelector, "=")  
 resGJSON := gjson.Get(string(event.Kv.Value), splitedFieldSelector[0])  

 if resGJSON.Exists() && resGJSON.Value() == splitedFieldSelector[1] {  
 fmt.Fprintf(resp, "Type: %s Value: %s\n", event.Type, string(event.Kv.Value))  
 }  
 }  

 resp.Flush()  
 }  

 case <-req.Request.Context().Done():  
 log.Println("Connection closed")  
 return  
 }  
 }  
}

У kubelet, після того як він запускає системні маніфести (etcd та kube-api), він стартує прослуховувач (того, що ми називаємо клієнтом на стороні kube-api).
Ця функція ініціює запит на спостереження (watch) до kube-api на вузлі, на якому працює pod. Так само, як і у kube-api, ми використовуємо канали для очікування дії.
Коли відбувається дія, ми отримуємо spec pod, парсимо його в структуру, з якою можемо працювати, і в залежності від дії створюємо або видаляємо pod.

func ListenForPod(kubeAPIEndpoint string, hostname string, podCIDR string, podBridgeName string) error {  
 log.Printf("почав спостереження за pod через kube API")  

 resp, err := http.Get(fmt.Sprintf(  
 "%s/pods/?watch=true&fieldSelector=%s",  
 kubeAPIEndpoint,  
 url.QueryEscape(fmt.Sprintf("spec.nodeName=%s", hostname)),  
 ),  
 )  
 if err != nil {  
 return fmt.Errorf("помилка при надсиланні запиту: %v", err)  
 }  
 defer resp.Body.Close()  

 if resp.StatusCode != http.StatusOK {  
 body, err := io.ReadAll(resp.Body)  
 if err != nil {  
 return fmt.Errorf("помилка при зчитуванні тіла відповіді: %v", err)  
 }  

 return fmt.Errorf("запит не вдався з кодом статусу: %d %s", resp.StatusCode, string(body))  
 }  

 reader := bufio.NewReader(resp.Body)  

 for {  
 line, err := reader.ReadString('\n')  
 if err != nil {  
 log.Printf("помилка при парсингу відповіді: %v", err)  

 continue  
 }  

 line = strings.TrimSpace(line)  

 if len(line) == 0 {  
 continue  
 }  

 typeEvent, value, err := utils.GetTypeAndValueFromEvent(line)  
 if err != nil {  
 log.Printf("помилка при отриманні типу та значення події: %v", err)  

 continue  
 }  

 log.Printf("подія pod для pods: %s %s", typeEvent, value)  

 var pod kubeapi_rest.Pod  
 err = yaml.Unmarshal([]byte(value), &pod)  
 if err != nil {  
 log.Printf("помилка при парсингу pod з події: %v", err)  

 continue  
 }  

 if typeEvent == "PUT" {  
 if pod.Status.Phase == podRunningPhase && strings.Contains(line, kubeapi_rest.LastAppliedConfigurationAnnotationKey) {  
 equal, err := compareLastAppliedToCurrentPod(pod)  
 if err != nil {  
 log.Printf("помилка при порівнянні останнього застосованого: %v", err)  

 continue  
 }  

 if equal {  
 log.Printf("Pod не змінився з останнього застосованого анотації")  

 continue  
 }  

 log.Printf("Pod змінився, починається створення")  
 }  

 if pod.Status.Phase == podPendingPhase {  
 go createPod(pod, podCIDR, podBridgeName, kubeAPIEndpoint)  
 } else if pod.Status.Phase == podTerminatingPhase {  
 go deletePod(pod, kubeAPIEndpoint)  
 }  
 }  
 }  
}

У функції createPod ми викликаємо CreatePodContainers і UpdatePodStatus.
Назви функцій є досить зрозумілими.
Я заглиблюсь у функцію CreatePodContainers.
Вона викликає кілька функцій, в які я не хочу занурюватися занадто глибоко (ви можете побачити їх у вихідному коді).

У наведеній нижче функції ми створюємо контейнер-пауза, отримуємо його PID і призначаємо його новоствореним контейнерам. Таким чином, ми досягаємо спільного простору імен мережі між контейнерами в одному pod.
Після цього ми налаштовуємо мережу для pod: призначаємо пару veth і ceth для простору імен мережі pod, підключаємо її до віртуального моста на вузлі та призначаємо IP-адресу pod.

func CreatePodContainers(pod kubeapi_rest.Pod, podCIDR string, podBridgeName string) (*kubeapi_rest.Pod, error) {  
 if pod.Metadata.UID == "" {  
 pod.Metadata.UID = uuid.NewString()  
 }  

 pauseContainerPID, err := createPauseContainer(  
 pod.Metadata.UID,  
 pod.Metadata.Name,  
 pod.Metadata.Namespace,  
 pod.Spec.HostNetwork,  
 )  
 if err != nil {  
 return nil, fmt.Errorf("не вдалося створити контейнер-паузу, %v", err)  
 }  

 for _, container := range pod.Spec.Containers {  
 containerStatusName := fmt.Sprintf("own_k8s_%s_%s_%s_%s", container.Name, pod.Metadata.Name, pod.Metadata.Namespace, pod.Metadata.UID)  

 containerID, err := kube_containerd.CreateContainer(  
 &container,  
 kube_containerd.CreateContainerSpec{  
 LogLocation: fmt.Sprintf(defaultPodLoggingLocation, containerStatusName),  
 ResolvConfLocation: fmt.Sprintf(defaultPodResolvConfLocation, pod.Metadata.UID),  
 HostnameLocation: fmt.Sprintf(defaultPodContainerHostnameLocation, pod.Metadata.UID, containerStatusName),  
 EtcHostsLocation: fmt.Sprintf(defaultPodContainerEtcdHostsLocation, pod.Metadata.UID),  
 HostNetwork: pod.Spec.HostNetwork,  
 NetworkNamespacePath: fmt.Sprintf(defaultNetNamespacePath, pauseContainerPID),  
 IPCNamespacePath: fmt.Sprintf(defaultIPCNamespacePath, pauseContainerPID),  
 },  
 )  
 if err != nil {  
 return nil, fmt.Errorf("не вдалося створити та запустити контейнер %v", err)  
 }  

 pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, kubeapi_rest.ContainerStatus{  
 ContainerID: containerID,  
 Image: container.Image,  
 Name: containerStatusName,  
 })  

 log.Printf("контейнер %s створений і запущений", containerStatusName)  
 }  

 ip, err := kubelet_net.ConfigurePodNetwork(  
 pod.Metadata.UID,  
 podBridgeName,  
 podCIDR,  
 fmt.Sprintf(defaultNetNamespacePath, pauseContainerPID),  
 pod.Spec.HostNetwork,  
 )  
 if err != nil {  
 return nil, fmt.Errorf("не вдалося налаштувати мережу pod %v", err)  
 }  

 pod.Status.PodIP = ip  
 pod.Status.Phase = podRunningPhase  

 return &pod, nil  
}

І вуаля! У нас є працюючий pod!
Рекомендую ознайомитись з деталями у вихідному коді в секції pkg/kubelet.

Потік створення сервісу

Давайте розглянемо процес створення сервісу від початку до кінця:

1.
own-kubectl надсилає YAML з деталями сервісу до API для створення.
2. API отримує ці запити і зберігає нову специфікацію сервісу в ETCD.
3. kube-proxy, який прослуховує (Event Listener) дії з сервісами, спрацьовує.
4. kube-proxy перевіряє доступні IP-адреси в CIDR для ClusterIP.
5. kube-proxy створює новий ланцюг IPTables для ClusterIP.
6. Якщо сервіс є NodePort, kube-proxy створює новий ланцюг IPTables для NodePort.
7. kube-proxy створює Endpoints для відповідного селектора сервісу до міток pod.

Я не буду вдаватися в деталі щодо watcher-listener, оскільки це практично те саме, що і для pod. Замість цього ми детально розглянемо створення ланцюга iptables для сервісів.

Коли ми створюємо ClusterIP сервіс, насправді ми створюємо набір правил iptables на вузлі, які перехоплюють пакети і вказують, куди їх направляти.
У ланцюгу ClusterIP ми створюємо правило, яке пропускає пакети через ланцюг, що відповідає наступним правилам:

  • Протокол TCP
  • Адреса призначення: clusterIP
  • Порт призначення: порт сервісу

Сервіс також додає правило маскараду до сервісу (воно повинно бути першим правилом), щоб маскувати кожен пакет, що потрапляє в ланцюг, за винятком тих, чий джерело належить до PodCIDR.

Все вищеописане відбувається в наступному коді:

func NewClusterIPService(clusterIP string, podCIDR string, namespace string, serviceName string, servicePort int, portName string) error {  
 id := chainHashPrefix(serviceName, namespace, portName)  
 serviceNameChain := fmt.Sprintf("%s-%s", clusterIPServicePrefix, id)  

 // iptables -t nat -N KUBE-SVC-id  
 if err := newIPTablesChain(serviceNameChain); err != nil {  
 return err  
 }  

 // iptables -t nat -I KUBE-SERVICES 1 -d ipAddr/32 -p tcp -m tcp --dport servicePort -m comment -j KUBE-SVC-id --comment "namesapce/serviceName cluster IP"  
 if err := insertNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-d %s/32 -p tcp -m tcp --dport %d -j %s", clusterIP, servicePort, serviceNameChain),  
 kubeServicesChain,  
 1,  
 fmt.Sprintf("%s/%s:%s-clusterIP", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 // iptables -t nat -A KUBE-SVC-id ! -s podCIDR/16 -d clusterIP/32 -p tcp -m tcp --dport servicePort -m comment --comment "namespace/serviceName cluster IP" -j KUBE-MARK-MASQ  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("! -s %s -d %s/32 -p tcp -m tcp --dport %d -j %s", podCIDR, clusterIP, servicePort, kubeServicesMark),  
 serviceNameChain,  
 fmt.Sprintf("%s/%s:%s-clusterIP", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 return nil  
}

Створення сервісу NodePort дуже схоже.
Ми перевіряємо доступні порти в діапазоні та створюємо новий ланцюг з набором правил, які маскують пакети та направляють їх до раніше створеного ланцюга clusterIP сервісу:

func NewNodePortService(namespace string, serviceName string, port int, portName string) error {  
 id := chainHashPrefix(serviceName, namespace, portName)  
 serviceNameChain := fmt.Sprintf("%s-%s", nodePortServiceExtPrefix, id)  

 // iptables -t nat -N KUBE-EXT-id  
 if err := newIPTablesChain(serviceNameChain); err != nil {  
 return err  
 }  

 // iptables -A KUBE-NODEPORT -p tcp -m tcp --dport nodePort -m comment --comment "namespaces/podName" -j KUBE-EXT-id  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-p tcp -m tcp --dport %d -j %s", port, serviceNameChain),  
 nodePortServiceChain,  
 fmt.Sprintf("%s/%s-service:%s", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 // iptables -A KUBE-EXT-id -m comment --comment "masquerade traffic for namespace/podName external destinations" -j KUBE-MARK-MASQ  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-j %s", kubeServicesMark),  
 serviceNameChain,  
 fmt.Sprintf("masquerade-traffic-for-%s/%s-external-destinations", namespace, serviceName),  
 ); err != nil {  
 return err  
 }  

 // iptables -A KUBE-EXT-id -j KUBE-SVC-id  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-j %s", fmt.Sprintf("%s-%s", clusterIPServicePrefix, chainHashPrefix(serviceName, namespace, portName))),  
 serviceNameChain,  
 fmt.Sprintf("%s/%s-service:%s", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 return nil  
}

Тепер у нас є набір ланцюгів IPTables, які направляють пакети до відповідного сервісу.
Після цього ми створюємо відповідні endpoints, які будуть "вказувати" на pod.

Знову ж таки, раджу вам ознайомитись з деталями у вихідному коді в розділі pkg/kube-proxy.

Потік створення Endpoint

Давайте розглянемо потік створення сервісу від початку до кінця:

  1. kube-proxy відправляє запит на створення endpoint після того, як новий сервіс створено.
  2. API отримує ці запити і зберігає нову специфікацію endpoint у ETCD.
  3. kube-proxy, що слухає дії endpoint, спрацьовує.
  4. kube-proxy створює набір ланцюгів IPTables, які направляють трафік до відповідних pod.

Kube-proxy, крім того, що слухає нові сервіси, також слухає створення endpoint (та сама реалізація, як і у kubelet).
Ця функція викликається, коли відбувається нова подія створення endpoint, вона проходить по адресах та портах endpoint і для кожного з них створює ланцюг IPTables із ймовірністю для ефекту "round robin load balancing".

func createEndpointIPTable(kubeAPIEndpoint string, endpoint kubeapi_rest.Endpoint) {  
 log.Printf("створення правил iptable з використанням endpoint")  

 service, err := getRelatableService(kubeAPIEndpoint, endpoint.Metadata.Name, endpoint.Metadata.Namespace)  
 if err != nil {  
 log.Printf("помилка при отриманні зв'язаної служби: %v", err)  

 return  
 }  

 for _, subset := range endpoint.Subsets {  
 for index, address := range subset.Addresses {  
 for _, port := range subset.Ports {  
 if !clusterip.CheckIfClusterIPServiceEndpointExists(service.Metadata.Namespace, address.TargetRef.Name, port.Name) {  
 log.Printf("len: %d", len(subset.Addresses))  
 log.Printf("index: %d", index)  
 log.Printf("ймовірність: %f", float32(len(subset.Addresses)-index)/float32(len(subset.Addresses)))  
 err := iptables.CreateEndpointChain(  
 service.Metadata.Namespace,  
 service.Metadata.Name,  
 address.TargetRef.Name,  
 port.Name,  
 address.IP,  
 port.Port,  
 )  
 if err != nil {  
 log.Printf("помилка при створенні endpoint: %v", err)  

 return  
 }  

 err = clusterip.AddEndpointToClusterIP(  
 service.Metadata.Namespace,  
 service.Metadata.Name,  
 address.TargetRef.Name,  
 port.Name,  
 address.IP,  
 port.Port,  
 float32(len(subset.Addresses)-index)/float32(len(subset.Addresses)),  
 )  
 if err != nil {  
 log.Printf("помилка при додаванні pod до clusterIP: %v", err)  

 return  
 }  
 }  
 }  
 }  
 }  
}

У першій функції CreateEndpointChain, ми створюємо новий ланцюг IPTables, який маскує пакет з IP-адресою джерела PodCIDR і потім виконує DNAT для IP-адреси та порту призначення pod:

func CreateEndpointChain(namespace string, serviceName string, podName string, portName string, podIP string, podPort int) error {  
 serviceEndpointChain := fmt.Sprintf("%s-%s", serviceEndpointPrefix, chainHashPrefix(podName, namespace, portName))  

 // iptables -t nat -N KUBE-SEP-id  
 if err := newIPTablesChain(serviceEndpointChain); err != nil {  
 return err  
 }  

 // -A KUBE-SEP-id -s podIP/32 -m comment --comment "namespace/serviceName" -j KUBE-MARK-MASQ  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-s %s/32 -j %s", podIP, kubeServicesMark),  
 serviceEndpointChain,  
 fmt.Sprintf("%s/%s:%s-clusterIP", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 // -A KUBE-SEP-id -p tcp -m comment --comment "namespace/serviceName" -m tcp -j DNAT --to-destination podIP:podPort  
 if err := appendNewIPTablesRule(  
 netTable,  
 fmt.Sprintf("-p tcp -m tcp -j DNAT --to-destination %s:%d", podIP, podPort),  
 serviceEndpointChain,  
 fmt.Sprintf("%s/%s:%s-clusterIP", namespace, serviceName, portName),  
 ); err != nil {  
 return err  
 }  

 return nil  
}

Вищезгаданий ланцюг додається до ланцюга, який ми створили в попередньому розділі (ланцюг сервісу ClusterIP).
Додається модуль статистики з ймовірністю, яку ми згадували раніше, щоб досягти ефекту балансування навантаження.

func AddEndpointToServiceChain(namespace string, serviceName string, podName string, portName string, podIP string, podPort int, probability float32) error {  
 serviceNameChain := fmt.Sprintf("%s-%s", clusterIPServicePrefix, chainHashPrefix(serviceName, namespace, portName))  
 serviceEndpointChain := fmt.Sprintf("%s-%s", serviceEndpointPrefix, chainHashPrefix(podName, namespace, portName))  

 // iptables -A KUBE-SVC-id -m comment --comment "namespace/serviceName->podIP:podPort" -m statistic --mode random --probability probability -j KUBE-SEP-id  
 rule := ""  
 if probability != 0 {  
 rule = fmt.Sprintf("-m statistic --mode random --probability %f -j %s", probability, serviceEndpointChain)  
 } else {  
 rule = fmt.Sprintf("-j %s", serviceEndpointChain)  
 }  
 if err := insertNewIPTablesRule(  
 netTable,  
 rule,  
 serviceNameChain,  
 2,  
 fmt.Sprintf("%s/%s->%s:%d", namespace, serviceName, podIP, podPort),  
 ); err != nil {  
 return err  
 }  

 return nil  
}

І тепер у нас є новий робочий сервіс NodePort! Ми можемо отримати до нього доступ з нашої локальної машини, яка буде балансувати запити до pod через селектор сервісу!
Застереження: при запуску kube-proxy додаються додаткові ланцюги IPTable до таблиці nat, щоб все "танцювало" разом.

Тепер, що станеться, якщо новий pod буде створено або зупинено? Ми не хочемо передавати трафік до нього.
Отже, ми отримуємо ще одного прослуховувача (Listener) у kube-proxy, який слухає дії з pod та співвідносить їх з існуючими точками доступу (endpoints). Коли існуючий pod виходить з ладу або видаляється, адреса в цій точці доступу також видаляється. Точно так же, коли створюється новий pod, додається нова адреса.

Рекомендую вам детальніше ознайомитися з вихідним кодом у секції pkg/kube-proxy.

Я лише торкнувся поверхні реалізації деяких функціональностей у своєму Kubernetes. Я намагався зробити цю статтю якнайпростішою і не заглиблюватися в деталі кожного аспекту коду.

Ця реалізація навчила мене багато про те, як працює Kubernetes "під капотом", а також про деякі нові аспекти програмування.

Якщо у вас є питання, не соромтеся залишити коментар 🙂

Перекладено з: A journey of writing my own Kubernetes

Leave a Reply

Your email address will not be published. Required fields are marked *