Skip to content

Commit f167d66

Browse files
authored
Merge pull request #2081 from cemakd/gkemt
GKE-MT Support for PDCSI
2 parents f39cd24 + 03c842b commit f167d66

File tree

448 files changed

+36980
-37
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

448 files changed

+36980
-37
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
7777
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
7878
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
79+
enableMultitenancyFlag = flag.Bool("enable-multitenancy", false, "If set to true, the CSI Driver will support running on multitenant GKE clusters")
7980
nodeName = flag.String("node-name", "", "The node this driver is running on")
8081

8182
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
@@ -232,11 +233,17 @@ func handle() {
232233
// Initialize requirements for the controller service
233234
var controllerServer *driver.GCEControllerServer
234235
if *runControllerService {
235-
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig)
236+
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig, *enableMultitenancyFlag)
236237
if err != nil {
237238
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
238239
}
239240

241+
if *enableMultitenancyFlag {
242+
ctx, cancel := context.WithCancel(ctx)
243+
defer cancel()
244+
go cloudProvider.TenantInformer.Run(ctx.Done())
245+
}
246+
240247
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
241248
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
242249
// TODO(2042): Move more of the constructor args into this struct

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --enable-multitenancy
148149
command:
149150
- /gce-pd-csi-driver
150151
env:

deploy/kubernetes/base/node_linux/node.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ spec:
4747
- "--endpoint=unix:/csi/csi.sock"
4848
- "--run-controller-service=false"
4949
- "--enable-data-cache"
50+
- "--enable-multitenancy"
5051
- "--node-name=$(KUBE_NODE_NAME)"
5152
securityContext:
5253
privileged: true

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ metadata:
4444
name: imagetag-gcepd-driver
4545
imageTag:
4646
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
47-
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
48-
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
49-
newTag: "v1.17.2"
47+
# Don't change stable image without changing pdImagePlaceholder in
48+
# test/k8s-integration/main.go
49+
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
50+
newTag: "latest"
5051
---
5152

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
k8s.io/mount-utils v0.33.1
4242
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
4343
sigs.k8s.io/boskos v0.0.0-20220711194915-6cb8a6fb2dd1
44+
sigs.k8s.io/yaml v1.4.0
4445
)
4546

4647
require (
@@ -115,7 +116,6 @@ require (
115116
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
116117
sigs.k8s.io/randfill v1.0.0 // indirect
117118
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
118-
sigs.k8s.io/yaml v1.4.0 // indirect
119119
)
120120

121121
replace k8s.io/client-go => k8s.io/client-go v0.32.2

pkg/gce-cloud-provider/compute/fake-gce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func (cloud *FakeCloudProvider) InsertInstance(instance *computev1.Instance, ins
378378
return
379379
}
380380

381-
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
381+
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
382382
instance, ok := cloud.instances[instanceName]
383383
if !ok {
384384
return nil, notFoundError()

pkg/gce-cloud-provider/compute/gce-compute.go

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type GCECompute interface {
117117
// Regional Disk Methods
118118
GetReplicaZoneURI(project string, zone string) string
119119
// Instance Methods
120-
GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error)
120+
GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error)
121121
// Zone Methods
122122
ListZones(ctx context.Context, region string) ([]string, error)
123123
ListSnapshots(ctx context.Context, filter string) ([]*computev1.Snapshot, string, error)
@@ -160,40 +160,79 @@ func (cloud *CloudProvider) listDisksInternal(ctx context.Context, fields []goog
160160
if err != nil {
161161
return nil, "", err
162162
}
163-
items := []*computev1.Disk{}
163+
disks := []*computev1.Disk{}
164+
165+
klog.Infof("Getting regional disks for project: %s", cloud.project)
166+
rDisks, err := listRegionalDisksForProject(cloud.service, cloud.project, region, fields, filter)
167+
if err != nil {
168+
return nil, "", err
169+
}
170+
disks = append(disks, rDisks...)
171+
// listing out regional disks in the region for each tenant project
172+
for p, s := range cloud.tenantServiceMap {
173+
klog.Infof("Getting regional disks for tenant project: %s", p)
174+
rDisks, err := listRegionalDisksForProject(s, p, region, fields, filter)
175+
if err != nil {
176+
return nil, "", err
177+
}
178+
disks = append(disks, rDisks...)
179+
}
180+
181+
klog.Infof("Getting zonal disks for project: %s", cloud.project)
182+
zDisks, err := listZonalDisksForProject(cloud.service, cloud.project, zones, fields, filter)
183+
if err != nil {
184+
return nil, "", err
185+
}
186+
disks = append(disks, zDisks...)
187+
// listing out zonal disks in all zones of the region for each tenant project
188+
for p, s := range cloud.tenantServiceMap {
189+
klog.Infof("Getting zonal disks for tenant project: %s", p)
190+
zDisks, err := listZonalDisksForProject(s, p, zones, fields, filter)
191+
if err != nil {
192+
return nil, "", err
193+
}
194+
disks = append(disks, zDisks...)
195+
}
164196

165-
// listing out regional disks in the region
166-
rlCall := cloud.service.RegionDisks.List(cloud.project, region)
197+
return disks, "", nil
198+
}
199+
200+
func listRegionalDisksForProject(service *computev1.Service, project string, region string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
201+
items := []*computev1.Disk{}
202+
rlCall := service.RegionDisks.List(project, region)
167203
rlCall.Fields(fields...)
168204
rlCall.Filter(filter)
169205
nextPageToken := "pageToken"
170206
for nextPageToken != "" {
171207
rDiskList, err := rlCall.Do()
172208
if err != nil {
173-
return nil, "", err
209+
return nil, err
174210
}
175211
items = append(items, rDiskList.Items...)
176212
nextPageToken = rDiskList.NextPageToken
177213
rlCall.PageToken(nextPageToken)
178214
}
215+
return items, nil
216+
}
179217

180-
// listing out zonal disks in all zones of the region
218+
func listZonalDisksForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
219+
items := []*computev1.Disk{}
181220
for _, zone := range zones {
182-
lCall := cloud.service.Disks.List(cloud.project, zone)
221+
lCall := service.Disks.List(project, zone)
183222
lCall.Fields(fields...)
184223
lCall.Filter(filter)
185224
nextPageToken := "pageToken"
186225
for nextPageToken != "" {
187226
diskList, err := lCall.Do()
188227
if err != nil {
189-
return nil, "", err
228+
return nil, err
190229
}
191230
items = append(items, diskList.Items...)
192231
nextPageToken = diskList.NextPageToken
193232
lCall.PageToken(nextPageToken)
194233
}
195234
}
196-
return items, "", nil
235+
return items, nil
197236
}
198237

199238
// ListInstances lists instances based on maxEntries and pageToken for the project and region
@@ -209,9 +248,28 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
209248
return nil, "", err
210249
}
211250
items := []*computev1.Instance{}
251+
instances, err := cloud.listInstancesForProject(cloud.service, cloud.project, zones, fields)
252+
if err != nil {
253+
return nil, "", err
254+
}
255+
items = append(items, instances...)
256+
257+
for p, s := range cloud.tenantServiceMap {
258+
instances, err := cloud.listInstancesForProject(s, p, zones, fields)
259+
if err != nil {
260+
return nil, "", err
261+
}
262+
items = append(items, instances...)
263+
}
264+
265+
return items, "", nil
266+
}
267+
268+
func (cloud *CloudProvider) listInstancesForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field) ([]*computev1.Instance, error) {
269+
items := []*computev1.Instance{}
212270

213271
for _, zone := range zones {
214-
lCall := cloud.service.Instances.List(cloud.project, zone)
272+
lCall := service.Instances.List(project, zone)
215273
for _, filter := range cloud.listInstancesConfig.Filters {
216274
lCall = lCall.Filter(filter)
217275
}
@@ -220,15 +278,14 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
220278
for nextPageToken != "" {
221279
instancesList, err := lCall.Do()
222280
if err != nil {
223-
return nil, "", err
281+
return nil, err
224282
}
225283
items = append(items, instancesList.Items...)
226284
nextPageToken = instancesList.NextPageToken
227285
lCall.PageToken(nextPageToken)
228286
}
229287
}
230-
231-
return items, "", nil
288+
return items, nil
232289
}
233290

234291
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
@@ -857,7 +914,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
857914
ForceAttach: forceAttach,
858915
}
859916

860-
op, err := cloud.service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
917+
service := cloud.service
918+
if _, ok := cloud.tenantServiceMap[project]; ok {
919+
service = cloud.tenantServiceMap[project]
920+
}
921+
op, err := service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
861922
if err != nil {
862923
return fmt.Errorf("failed cloud service attach disk call: %w", err)
863924
}
@@ -872,7 +933,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
872933

873934
func (cloud *CloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
874935
klog.V(5).Infof("Detaching disk %v from %v", deviceName, instanceName)
875-
op, err := cloud.service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
936+
service := cloud.service
937+
if _, ok := cloud.tenantServiceMap[project]; ok {
938+
service = cloud.tenantServiceMap[project]
939+
}
940+
op, err := service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
876941
if err != nil {
877942
return err
878943
}
@@ -1041,7 +1106,7 @@ func (cloud *CloudProvider) waitForAttachOnInstance(ctx context.Context, project
10411106
start := time.Now()
10421107
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) {
10431108
klog.V(6).Infof("Polling instances.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
1044-
instance, err := cloud.GetInstanceOrError(ctx, instanceZone, instanceName)
1109+
instance, err := cloud.GetInstanceOrError(ctx, project, instanceZone, instanceName)
10451110
if err != nil {
10461111
return false, fmt.Errorf("GetInstance failed to get instance: %w", err)
10471112
}
@@ -1145,10 +1210,13 @@ func opIsDone(op *computev1.Operation) (bool, error) {
11451210
return true, nil
11461211
}
11471212

1148-
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
1213+
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
11491214
klog.V(5).Infof("Getting instance %v from zone %v", instanceName, instanceZone)
1150-
project := cloud.project
1151-
instance, err := cloud.service.Instances.Get(project, instanceZone, instanceName).Do()
1215+
service := cloud.service
1216+
if _, ok := cloud.tenantServiceMap[project]; ok {
1217+
service = cloud.tenantServiceMap[project]
1218+
}
1219+
instance, err := service.Instances.Get(project, instanceZone, instanceName).Do()
11521220
if err != nil {
11531221
return nil, err
11541222
}

0 commit comments

Comments
 (0)