Kubernetes Operator开发入门(Go)

Kubernetes Operator 是扩展 K8s 能力的标准方式——用代码把运维知识编码成自动化控制器。本文用 kubebuilder 从零创建一个简单的 Operator,覆盖 CRD 定义、Reconcile 循环、Status 更新等核心概念。

Operator 模式简述

Operator 的核心思想:用自定义资源(CR)描述期望状态,用控制器(Controller)驱动实际状态向期望状态收敛。

用户创建/修改 CR (Custom Resource)
        │
        ▼
Controller 监听到变化,触发 Reconcile
        │
        ▼
Reconcile 对比期望状态与实际状态
        │
        ├── 不一致 → 执行操作(创建/更新/删除子资源)
        └── 一致   → 无操作
        │
        ▼
更新 CR 的 Status(汇报实际状态)

这就是 K8s 的声明式 API 模式。内置的 Deployment、StatefulSet 本质上也是这个模式。

CRD 概念

CRD(Custom Resource Definition)让你定义自己的资源类型。比如我们要做一个简单的 WebApp Operator——用户只需要声明应用名称、镜像和副本数,Operator 自动创建 Deployment 和 Service。

目标 CR 长这样:

apiVersion: app.example.com/v1
kind: WebApp
metadata:
  name: my-app
spec:
  image: nginx:1.25
  replicas: 3
  port: 80

kubebuilder 脚手架

# 安装 kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/

# 初始化项目
mkdir webapp-operator && cd webapp-operator
kubebuilder init --domain example.com --repo github.com/example/webapp-operator

# 创建 API(CRD + Controller)
kubebuilder create api --group app --version v1 --kind WebApp

选择 "y" 同时创建 Resource 和 Controller。

定义 CRD 类型

编辑 api/v1/webapp_types.go

package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// WebAppSpec 定义 WebApp 的期望状态
type WebAppSpec struct {
    // 容器镜像
    // +kubebuilder:validation:Required
    Image string `json:"image"`

    // 副本数
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=10
    // +kubebuilder:default=1
    Replicas int32 `json:"replicas,omitempty"`

    // 服务端口
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=65535
    // +kubebuilder:default=80
    Port int32 `json:"port,omitempty"`
}

// WebAppStatus 定义 WebApp 的实际状态
type WebAppStatus struct {
    // 当前可用副本数
    AvailableReplicas int32 `json:"availableReplicas,omitempty"`

    // 状态: Pending / Running / Failed
    // +kubebuilder:validation:Enum=Pending;Running;Failed
    Phase string `json:"phase,omitempty"`

    // 最后更新时间
    LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image`
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Available",type=integer,JSONPath=`.status.availableReplicas`
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// WebApp is the Schema for the webapps API
type WebApp struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   WebAppSpec   `json:"spec,omitempty"`
    Status WebAppStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true
type WebAppList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []WebApp `json:"items"`
}

func init() {
    SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}

注释中的 +kubebuilder 标记非常重要——它们会被代码生成器读取,生成 CRD 的验证规则、打印列等。

实现 Reconcile 循环

编辑 internal/controller/webapp_controller.go

package controller

import (
    "context"
    "fmt"
    "time"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/nuntime"
    "k8s.io/apimachinery/pkg/types"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"

    appv1 "github.com/example/webapp-operator/api/v1"
)

type WebAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=app.example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=app.example.com,resources=webapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)

    // 1. 获取 WebApp 资源
    webapp := &appv1.WebApp{}
    if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
        if errors.IsNotFound(err) {
            logger.Info("WebApp resource not found, probably deleted")
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }

    // 2. 确保 Deployment 存在
    deploy := &appsv1.Deployment{}
    deployName := types.NamespacedName{
        Name:      webapp.Name + "-deploy",
        Namespace: webapp.Namespace,
    }

    err := r.Get(ctx, deployName, deploy)
    if err != nil && errors.IsNotFound(err) {
        // 创建 Deployment
        deploy = r.buildDeployment(webapp)
        if err := controllerutil.SetControllerReference(webapp, deploy, r.Scheme); err != nil {
            return ctrl.Result{}, err
        }
        logger.Info("Creating Deployment", "name", deploy.Name)
        if err := r.Create(ctx, deploy); err != nil {
            return ctrl.Result{}, err
        }
    } else if err != nil {
        return ctrl.Result{}, err
    } else {
        // 更新已有 Deployment
        needUpdate := false
        if *deploy.Spec.Replicas != webapp.Spec.Replicas {
            deploy.Spec.Replicas = &webapp.Spec.Replicas
            needUpdate = true
        }
        container := &deploy.Spec.Template.Spec.Containers[0]
        if container.Image != webapp.Spec.Image {
            container.Image = webapp.Spec.Image
            needUpdate = true
        }
        if needUpdate {
            logger.Info("Updating Deployment", "name", deploy.Name)
            if err := r.Update(ctx, deploy); err != nil {
                return ctrl.Result{}, err
            }
        }
    }

    // 3. 确保 Service 存在
    svc := &corev1.Service{}
    svcName := types.NamespacedName{
        Name:      webapp.Name + "-svc",
        Namespace: webapp.Namespace,
    }
    if err := r.Get(ctx, svcName, svc); err != nil && errors.IsNotFound(err) {
        svc = r.buildService(webapp)
        if err := controllerutil.SetControllerReference(webapp, svc, r.Scheme); err != nil {
            return ctrl.Result{}, err
        }
        logger.Info("Creating Service", "name", svc.Name)
        if err := r.Create(ctx, svc); err != nil {
            return ctrl.Result{}, err
        }
    }

    // 4. 更新 Status
    r.Get(ctx, deployName, deploy)
    now := metav1.NewTime(time.Now())
    webapp.Status.AvailableReplicas = deploy.Status.AvailableReplicas
    if deploy.Status.AvailableReplicas == webapp.Spec.Replicas {
        webapp.Status.Phase = "Running"
    } else {
        webapp.Status.Phase = "Pending"
    }
    webapp.Status.LastUpdateTime = &now
    if err := r.Status().Update(ctx, webapp); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

func (r *WebAppReconciler) buildDeployment(webapp *appv1.WebApp) *appsv1.Deployment {
    labels := map[string]string{"app": webapp.Name}
    replicas := webapp.Spec.Replicas

    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name + "-deploy",
            Namespace: webapp.Namespace,
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{Labels: labels},
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  "app",
                        Image: webapp.Spec.Image,
                        Ports: []corev1.ContainerPort{{
                            ContainerPort: webapp.Spec.Port,
                        }},
                    }},
                },
            },
        },
    }
}

func (r *WebAppReconciler) buildService(webapp *appv1.WebApp) *corev1.Service {
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      fmt.Sprintf("%s-svc", webapp.Name),
            Namespace: webapp.Namespace,
        },
        Spec: corev1.ServiceSpec{
            Selector: map[string]string{"app": webapp.Name},
            Ports: []corev1.ServicePort{{
                Port:       webapp.Spec.Port,
                TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
            }},
            Type: corev1.ServiceTypeClusterIP,
        },
    }
}

func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appv1.WebApp{}).
        Owns(&appsv1.Deployment{}).
        Owns(&corev1.Service{}).
        Complete(r)
}

几个核心要点:

Reconcile 循环:每次 CR 变更或关联资源变更都会触发 Reconcile。核心逻辑就是"对比 → 操作 → 更新状态"。

SetControllerReference:建立 owner reference,确保 WebApp 被删除时,子资源(Deployment、Service)自动被 GC 回收。

Status 子资源更新:Status 用独立的 r.Status().Update() 更新,不能和 Spec 一起用普通 Update() 改。

RBAC 注释+kubebuilder:rbac 注释声明 controller 需要的权限,make manifests 会自动生成 ClusterRole。

生成与部署

# 生成 CRD manifests、RBAC、webhook 配置
make manifests

# 安装 CRD 到集群
make install

# 本地运行 controller(开发调试用)
make run

# 构建镜像并部署到集群
make docker-build docker-push IMG=myregistry/webapp-operator:v0.1
make deploy IMG=myregistry/webapp-operator:v0.1

测试

创建一个 WebApp 资源:

# config/samples/app_v1_webapp.yaml
apiVersion: app.example.com/v1
kind: WebApp
metadata:
  name: demo
  namespace: default
spec:
  image: nginx:1.25
  replicas: 2
  port: 80
kubectl apply -f config/samples/app_v1_webapp.yaml

# 查看创建的资源
kubectl get webapp
kubectl get deploy
kubectl get svc

修改副本数测试 Reconcile:

kubectl patch webapp demo --type merge -p '{"spec":{"replicas":3}}'

# 观察 Deployment 副本数是否自动调整
kubectl get deploy -w

总结

Operator 开发的核心在于把运维逻辑编码到 Reconcile 循环中。kubebuilder 提供了完善的脚手架,自动处理了事件监听、队列管理、leader election 等基础设施。实际项目中还需要考虑错误重试策略、Finalizer(优雅删除)、Webhook(准入验证)等,这些留到后续再展开。