kubernetes 存储流程
PV 与 PVC
PVC (PersistentVolumeClaim),命名空间(namespace)级别的资源,由 用户 or StatefulSet 控制器(根据VolumeClaimTemplate) 创建。PVC 类似于 Pod,Pod 消耗 Node 资源,PVC 消耗 PV 资源。Pod 可以请求特定级别的资源(CPU 和内存),而 PVC 可以请求特定存储卷的大小及访问模式(Access Mode PV(PersistentVolume)是集群中的一块存储资源,可以是 NFS、iSCSI、Ceph、GlusterFS 等存储卷,PV 由集群管理员创建,然后由开发者使用 PVC 来申请 PV,PVC 是对 PV 的申请,类似于 Pod 对 Node 的申请。
静态创建存储卷
也就是我们手动创建一个pv和pvc,然后将pv和pvc绑定,然后pod使用pvc,这样就可以使用pv了。
创建一个 nfs 的 pv 以及 对应的 pvc
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs-pv
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Delete
nfs:
server: 192.168.203.110
path: /data/nfs
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nfs-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
查看 pvc
$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
nfs-pvc Bound nfs-pv 10Gi RWO 101s
创建一个 pod 使用 pvc
apiVersion: v1
kind: Pod
metadata:
name: test-nfs
spec:
containers:
- image: ubuntu:22.04
name: ubuntu
command:
- /bin/sh
- -c
- sleep 10000
volumeMounts:
- mountPath: /data
name: nfs-volume
volumes:
- name: nfs-volume
persistentVolumeClaim:
claimName: nfs-pvc
❯ kubectl exec -it test-nfs -- cat /data/nfs
192.168.203.110:/data/nfs
pvc pv 绑定流程
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
logger := klog.FromContext(ctx)
if claim.Spec.VolumeName == "" {
// 是不是延迟绑定 也就是 VolumeBindingMode 为 WaitForFirstConsumer
delayBinding, err := storagehelpers.IsDelayBindingMode(claim, ctrl.classLister)
if err != nil {
return err
}
// 通过 pvc 找到最合适的 pv
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
logger.V(2).Info("Synchronizing unbound PersistentVolumeClaim, Error finding PV for claim", "PVC", klog.KObj(claim), "err", err)
return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)
}
if volume == nil {
//// No PV found for this claim
} else /* pv != nil */ {
claimKey := claimToClaimKey(claim)
logger.V(4).Info("Synchronizing unbound PersistentVolumeClaim, volume found", "PVC", klog.KObj(claim), "volumeName", volume.Name, "volumeStatus", getVolumeStatusForLogging(volume))
// 绑定 pv 和 pvc
// 这里会处理 pvc 的 spec.volumeName status 和 pv 的 status
if err = ctrl.bind(ctx, volume, claim); err != nil {
return err
}
return nil
}
} else /* pvc.Spec.VolumeName != nil */ {
/*
......
*/
}
}
// 选择
func FindMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {
var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedClass := GetPersistentVolumeClaimClass(claim)
var selector labels.Selector
if claim.Spec.Selector != nil {
internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
}
selector = internalSelector
}
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}
if volume.Spec.ClaimRef != nil && !IsVolumeBoundToClaim(volume, claim) {
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// filter out mismatching volumeModes
if CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
// CheckNodeAffinity is the most expensive call in this loop.
// We should check cheaper conditions first or consider optimizing this function.
err := CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
}
}
if IsVolumeBoundToClaim(volume, claim) {
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
if !nodeAffinityValid {
return nil, nil
}
return volume, nil
}
if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}
// filter out:
// - volumes in non-available phase
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
if volume.Status.Phase != v1.VolumeAvailable {
// We ignore volumes in non-available phase, because volumes that
// satisfies matching criteria will be updated to available, binding
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if GetPersistentVolumeClass(volume) != requestedClass {
continue
}
if !nodeAffinityValid {
continue
}
if node != nil {
// Scheduler path
// Check that the access modes match
if !CheckAccessModes(claim, volume) {
continue
}
}
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
return nil, nil
}
kubelet 绑定
if err := os.MkdirAll(dir, 0750); err != nil {
return err
}
source := fmt.Sprintf("%s:%s", nfsMounter.server, nfsMounter.exportPath)
options := []string{}
if nfsMounter.readOnly {
options = append(options, "ro")
}
mountOptions := util.JoinMountOptions(nfsMounter.mountOptions, options)
err = nfsMounter.mounter.MountSensitiveWithoutSystemd(source, dir, "nfs", mountOptions, nil)
kubelet 就会在调用 sudo mount -t nfs ...
命令把 nfs 绑定到主机上 绑定的目录大概为 /var/lib/kubelet/pods/[POD-ID]/volumes/
StorageClass
StorageClass 是 Kubernetes 中用来定义存储卷的类型的资源对象,StorageClass 用来定义存储卷的类型,比如 NFS、iSCSI、Ceph、GlusterFS 等存储卷。StorageClass 是集群级别的资源,由集群管理员创建,用户可以使用 StorageClass 来动态创建 PV。
动态创建存储卷
动态创建存储卷相比静态创建存储卷,少了集群管理员的干预,流程如下图所示:
创建一个 StorageClass pvc pod
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: local-storage
provisioner: rancher.io/local-path
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: my-local-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 128Mi
storageClassName: local-storage
---
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- image: ubuntu:22.04
name: ubuntu
command:
- /bin/sh
- -c
- sleep 10000
volumeMounts:
- mountPath: /data
name: my-local-pvc
volumes:
- name: my-local-pvc
persistentVolumeClaim:
claimName: my-local-pvc
查看 pv
❯ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pvc-9d257d8a-29a8-4abf-a1e2-c7e4953fc0ca 128Mi RWO Delete Bound default/my-local-pvc local-storage 85s
StorageClass 创建 pv 流程
// 还是 syncUnboundClaim 中
// volume 为空,说明没有找到合适的 pv 那么去检查 如果 pvc 的 storageClassName 不为空,那么就会去找到对应的 storageClass
if volume == nil {
switch {
case delayBinding && !storagehelpers.IsDelayBindingProvisioning(claim):
// ......
case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
// 如果 pvc 的 storageClassName 不为空,那么就会去找到对应的 storageClass
if err = ctrl.provisionClaim(ctx, claim); err != nil {
return err
}
return nil
default:
}
return nil
}
func (ctrl *PersistentVolumeController) provisionClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
ctrl.scheduleOperation(logger, opName, func() error {
var err error
if plugin == nil {
// 如果是外部的 provisioner 这里我们就安装了 rancher.io/local-path 这个插件
// 所以这里会调用 provisionClaimOperationExternal
_, err = ctrl.provisionClaimOperationExternal(ctx, claim, storageClass)
} else {
// 内部的 provisioner 直接处理
_, err = ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass)
}
return err
})
return nil
}
// 如果是外部的 provisioner 会在 pvc 的 annotations 加入 volume.beta.kubernetes.io/storage-provisioner: rancher.io/local-path 和 volume.kubernetes.io/storage-provisioner: rancher.io/local-path
func (ctrl *PersistentVolumeController) setClaimProvisioner(ctx context.Context, claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
if val, ok := claim.Annotations[storagehelpers.AnnStorageProvisioner]; ok && val == provisionerName {
// annotation is already set, nothing to do
return claim, nil
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
// TODO: remove the beta storage provisioner anno after the deprecation period
logger := klog.FromContext(ctx)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, storagehelpers.AnnBetaStorageProvisioner, provisionerName)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, storagehelpers.AnnStorageProvisioner, provisionerName)
updateMigrationAnnotations(logger, ctrl.csiMigratedPluginManager, ctrl.translator, claimClone.Annotations, true)
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claimClone, metav1.UpdateOptions{})
if err != nil {
return newClaim, err
}
_, err = ctrl.storeClaimUpdate(logger, newClaim)
if err != nil {
return newClaim, err
}
return newClaim, nil
}
kubernetes external provisioner
kubernetes external provisioner 是一个独立的进程,用来动态创建 PV,它通过监听 StorageClass 的事件,当 StorageClass 的 ReclaimPolicy 为 Retain 时,会创建 PV。
在这里我新建一个 关于 nfs 的 external provisioner
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/sig-storage-lib-external-provisioner/v10/controller"
)
const provisionerName = "provisioner.test.com/nfs"
var _ controller.Provisioner = &nfsProvisioner{}
type nfsProvisioner struct {
client kubernetes.Interface
}
func (p *nfsProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
if options.PVC.Spec.Selector != nil {
return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported")
}
glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options)
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: options.PVName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: *options.StorageClass.ReclaimPolicy,
AccessModes: options.PVC.Spec.AccessModes,
MountOptions: options.StorageClass.MountOptions,
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
},
PersistentVolumeSource: v1.PersistentVolumeSource{
NFS: &v1.NFSVolumeSource{
Server: options.StorageClass.Parameters["server"],
Path: options.StorageClass.Parameters["path"],
ReadOnly: options.StorageClass.Parameters["readOnly"] == "true",
},
},
},
}
return pv, controller.ProvisioningFinished, nil
}
func (p *nfsProvisioner) Delete(ctx context.Context, volume *v1.PersistentVolume) error {
// 因为是 nfs 没有产生实际的资源,所以这里不需要删除
// 如果在 provisioner 中创建了资源,那么这里需要删除
// 一般是调用 csi 创建/删除资源
return nil
}
func main() {
l := log.FromContext(context.Background())
config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
if err != nil {
glog.Fatalf("Failed to create kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}
clientNFSProvisioner := &nfsProvisioner{
client: clientset,
}
pc := controller.NewProvisionController(l,
clientset,
provisionerName,
clientNFSProvisioner,
controller.LeaderElection(true),
)
glog.Info("Starting provision controller")
pc.Run(context.Background())
}
创建一个 nfs 的 storageClass
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: my-nfs
provisioner: provisioner.test.com/nfs
reclaimPolicy: Delete
volumeBindingMode: Immediate
parameters:
server: "192.168.203.110"
path: /data/nfs
readOnly: "false"
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: my-nfs-pvc
spec:
storageClassName: my-nfs
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
apiVersion: v1
kind: Pod
metadata:
name: test-nfs
spec:
containers:
- image: ubuntu:22.04
name: ubuntu
command:
- /bin/sh
- -c
- sleep 10000
volumeMounts:
- mountPath: /data
name: my-nfs-pvc
volumes:
- name: my-nfs-pvc
persistentVolumeClaim:
claimName: my-nfs-pvc
❯ kubectl exec -it test-nfs -- cat /data/nfs
192.168.203.110:/data/nfs
CSI 流程
持久化存储流程图如下:
Provisioner
当部署 csi-controller 时 ,会启动一个伴生容器,项目地址为 https://github.com/kubernetes-csi/external-provisioner
这个项目是一个 csi 的 provisioner
它会监控属于自己的pvc,当有新的pvc创建时,会调用 csi 的 createVolume 方法,创建一个 volume,然后创建一个 pv。当 pvc 删除时,会调用 csi 的 deleteVolume 方法,然后删除 volume 和 pv。
Attacher
external-attacher 也是 csi-controller 的伴生容器,项目地址为 https://github.com/kubernetes-csi/external-attacher
这个项目是一个 csi 的 attacher, 它会监控 AttachDetachController 资源,当有新的资源创建时,会调用 csi 的 controllerPublishVolume 方法,挂载 volume 到 node 上。当资源删除时,会调用 csi 的 controllerUnpublishVolume 方法,卸载 volume。
Snapshotter
external-snapshotter 也是 csi-controller 的伴生容器,项目地址为 https://github.com/kubernetes-csi/external-snapshotter
这个项目是一个 csi 的 snapshotter, 它会监控 VolumeSnapshot 资源,当有新的资源创建时,会调用 csi 的 createSnapshot 方法,创建一个快照。当资源删除时,会调用 csi 的 deleteSnapshot 方法,删除快照。
csi-node
csi-node 是一个 kubelet 的插件,所以它需要每个节点上都运行,当 pod 创建时,并且 VolumeAttachment 的 .spec.Attached 时,kubelet 会调用 csi 的 NodeStageVolume 函数,之后插件(csiAttacher)调用内部 in-tree CSI 插件(csiMountMgr)的 SetUp 函数,该函数内部会调用 csi 的 NodePublishVolume 函数,挂载 volume 到 pod 上。当 pod 删除时,kubelet 观察到包含 CSI 存储卷的 Pod 被删除,于是调用内部 in-tree CSI 插件(csiMountMgr)的 TearDown 函数,该函数内部会通过 unix domain socket 调用外部 CSI 插件的 NodeUnpublishVolume 函数。kubelet 调用内部 in-tree CSI 插件(csiAttacher)的 UnmountDevice 函数,该函数内部会通过 unix domain socket 调用外部 CSI 插件的 NodeUnstageVolume 函数。
csi-node-driver-registrar
这个是 csi-node 的伴生容器,项目地址为 https://github.com/kubernetes-csi/node-driver-registrar
,
它的主要作用是向 kubelet 注册 csi 插件,kubelet 会调用 csi 插件的 Probe 方法,如果返回成功,kubelet 会调用 csi 插件的 NodeGetInfo 方法,获取节点信息。
csi-livenessprobe
这个是 csi-node 的伴生容器,项目地址为 https://github.com/kubernetes-csi/livenessprobe
, 它的主要作用是给 kubernetes 的 livenessprobe 提供一个接口,用来检查 csi 插件是否正常运行。它在 /healthz
时,会调用 csi 的 Probe 方法,如果返回成功,返回 200,否则返回 500。