Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion k3k-kubelet/controller/syncer/persistentvolumeclaims.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-helpers/storage/volume"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -22,6 +24,7 @@ import (
const (
pvcControllerName = "pvc-syncer-controller"
pvcFinalizerName = "pvc.k3k.io/finalizer"
pseudoPVLabel = "pod.k3k.io/pseudoPV"
)

type PVCReconciler struct {
Expand Down Expand Up @@ -105,6 +108,12 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
if err := r.HostClient.Delete(ctx, syncedPVC); err != nil && !apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}

// delete the synced virtual PV
if err := r.VirtualClient.Delete(ctx, newPersistentVolume(&virtPVC)); err != nil && !apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}

// remove the finalizer after cleaning up the synced pvc
if controllerutil.RemoveFinalizer(&virtPVC, pvcFinalizerName) {
if err := r.VirtualClient.Update(ctx, &virtPVC); err != nil {
Expand All @@ -127,7 +136,13 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r

// note that we dont need to update the PVC on the host cluster, only syncing the PVC to allow being
// handled by the host cluster.
return reconcile.Result{}, ctrlruntimeclient.IgnoreAlreadyExists(r.HostClient.Create(ctx, syncedPVC))
if err := r.HostClient.Create(ctx, syncedPVC); err != nil && !apierrors.IsAlreadyExists(err) {
return reconcile.Result{}, err
}

// Creating a virtual PV to bound the existing PVC in the virtual cluster - needed for scheduling of
// the consumer pods
return reconcile.Result{}, r.createVirtualPersistentVolume(ctx, virtPVC)
}

func (r *PVCReconciler) pvc(obj *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
Expand All @@ -136,3 +151,82 @@ func (r *PVCReconciler) pvc(obj *v1.PersistentVolumeClaim) *v1.PersistentVolumeC

return hostPVC
}

func (r *PVCReconciler) createVirtualPersistentVolume(ctx context.Context, pvc v1.PersistentVolumeClaim) error {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Creating virtual PersistentVolume")

pv := newPersistentVolume(&pvc)

if err := r.VirtualClient.Create(ctx, pv); err != nil && !apierrors.IsAlreadyExists(err) {
return err
}

orig := pv.DeepCopy()
pv.Status = v1.PersistentVolumeStatus{
Phase: v1.VolumeBound,
}

if err := r.VirtualClient.Status().Patch(ctx, pv, ctrlruntimeclient.MergeFrom(orig)); err != nil {
return err
}

log.V(1).Info("Patch the status of PersistentVolumeClaim to Bound")

pvcPatch := pvc.DeepCopy()
if pvcPatch.Annotations == nil {
pvcPatch.Annotations = make(map[string]string)
}

pvcPatch.Annotations[volume.AnnBoundByController] = "yes"
pvcPatch.Annotations[volume.AnnBindCompleted] = "yes"
pvcPatch.Status.Phase = v1.ClaimBound
pvcPatch.Status.AccessModes = pvcPatch.Spec.AccessModes

return r.VirtualClient.Status().Update(ctx, pvcPatch)
}

func newPersistentVolume(obj *v1.PersistentVolumeClaim) *v1.PersistentVolume {
var storageClass string

if obj.Spec.StorageClassName != nil {
storageClass = *obj.Spec.StorageClassName
}

return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Labels: map[string]string{
pseudoPVLabel: "true",
},
Annotations: map[string]string{
volume.AnnBoundByController: "true",
volume.AnnDynamicallyProvisioned: "k3k-kubelet",
},
},
TypeMeta: metav1.TypeMeta{
Kind: "PersistentVolume",
APIVersion: "v1",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
FlexVolume: &v1.FlexPersistentVolumeSource{
Driver: "pseudopv",
},
},
StorageClassName: storageClass,
VolumeMode: obj.Spec.VolumeMode,
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: obj.Spec.AccessModes,
Capacity: obj.Spec.Resources.Requests,
ClaimRef: &v1.ObjectReference{
APIVersion: obj.APIVersion,
UID: obj.UID,
ResourceVersion: obj.ResourceVersion,
Kind: obj.Kind,
Namespace: obj.Namespace,
Name: obj.Name,
},
},
}
}
8 changes: 7 additions & 1 deletion k3k-kubelet/controller/syncer/persistentvolumeclaims_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var PVCTests = func() {
Expect(err).NotTo(HaveOccurred())
})

It("creates a pvc on the host cluster", func() {
It("creates a pvc on the host cluster and virtual pv in virtual cluster", func() {
ctx := context.Background()

pvc := &v1.PersistentVolumeClaim{
Expand Down Expand Up @@ -100,5 +100,11 @@ var PVCTests = func() {
Expect(*hostPVC.Spec.StorageClassName).To(Equal("test-sc"))

GinkgoWriter.Printf("labels: %v\n", hostPVC.Labels)

var virtualPV v1.PersistentVolume
key := client.ObjectKey{Name: pvc.Name}

err = virtTestEnv.k8sClient.Get(ctx, key, &virtualPV)
Expect(err).NotTo(HaveOccurred())
})
}
215 changes: 0 additions & 215 deletions k3k-kubelet/controller/syncer/pod.go

This file was deleted.

6 changes: 0 additions & 6 deletions k3k-kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,6 @@ func addControllers(ctx context.Context, hostMgr, virtualMgr manager.Manager, c
return errors.New("failed to add pvc syncer controller: " + err.Error())
}

logger.Info("adding pod pvc controller")

if err := syncer.AddPodPVCController(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace); err != nil {
return errors.New("failed to add pod pvc controller: " + err.Error())
}

logger.Info("adding priorityclass controller")

if err := syncer.AddPriorityClassSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace); err != nil {
Expand Down
Loading