Skip to content

Commit 46d6ed4

Browse files
authored
feat(pubsub): support new forms of topic ingestion (#11537)
* feat(pubsub): support new forms of topic ingestion * add test for kafka topics * rename msk state, remove output only field
1 parent 0dd7d3d commit 46d6ed4

File tree

2 files changed

+314
-1
lines changed

2 files changed

+314
-1
lines changed

pubsub/topic.go

Lines changed: 245 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,189 @@ func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestio
592592
return true
593593
}
594594

595+
// EventHubsState denotes the possible states for ingestion from Event Hubs.
596+
type EventHubsState int
597+
598+
const (
599+
// EventHubsStateUnspecified is the default value. This value is unused.
600+
EventHubsStateUnspecified = iota
601+
602+
// EventHubsStateActive means the state is active.
603+
EventHubsStateActive
604+
605+
// EventHubsStatePermissionDenied indicates encountered permission denied error
606+
// while consuming data from Event Hubs.
607+
// This can happen when `client_id`, or `tenant_id` are invalid. Or the
608+
// right permissions haven't been granted.
609+
EventHubsStatePermissionDenied
610+
611+
// EventHubsStatePublishPermissionDenied indicates permission denied encountered
612+
// while publishing to the topic.
613+
EventHubsStatePublishPermissionDenied
614+
615+
// EventHubsStateNamespaceNotFound indicates the provided Event Hubs namespace couldn't be found.
616+
EventHubsStateNamespaceNotFound
617+
618+
// EventHubsStateNotFound indicates the provided Event Hub couldn't be found.
619+
EventHubsStateNotFound
620+
621+
// EventHubsStateSubscriptionNotFound indicates the provided Event Hubs subscription couldn't be found.
622+
EventHubsStateSubscriptionNotFound
623+
624+
// EventHubsStateResourceGroupNotFound indicates the provided Event Hubs resource group couldn't be found.
625+
EventHubsStateResourceGroupNotFound
626+
)
627+
628+
// IngestionDataSourceAzureEventHubs are ingestion settings for Azure Event Hubs.
629+
type IngestionDataSourceAzureEventHubs struct {
630+
// Output only field that indicates the state of the Event Hubs ingestion source.
631+
State EventHubsState
632+
633+
// Name of the resource group within the Azure subscription
634+
ResourceGroup string
635+
636+
// Name of the Event Hubs namespace
637+
Namespace string
638+
639+
// Rame of the Event Hub.
640+
EventHub string
641+
642+
// Client ID of the Azure application that is being used to authenticate Pub/Sub.
643+
ClientID string
644+
645+
// Tenant ID of the Azure application that is being used to authenticate Pub/Sub.
646+
TenantID string
647+
648+
// The Azure subscription ID
649+
SubscriptionID string
650+
651+
// GCPServiceAccount is the GCP service account to be used for Federated Identity
652+
// authentication.
653+
GCPServiceAccount string
654+
}
655+
656+
var _ IngestionDataSource = (*IngestionDataSourceAzureEventHubs)(nil)
657+
658+
func (i *IngestionDataSourceAzureEventHubs) isIngestionDataSource() bool {
659+
return true
660+
}
661+
662+
// AmazonMSKState denotes the possible states for ingestion from Amazon MSK.
663+
type AmazonMSKState int
664+
665+
const (
666+
// AmazonMSKStateUnspecified is the default value. This value is unused.
667+
AmazonMSKStateUnspecified = iota
668+
669+
// AmazonMSKActive indicates MSK topic is active.
670+
AmazonMSKActive
671+
672+
// AmazonMSKPermissionDenied indicates permission denied encountered while consuming data from Amazon MSK.
673+
AmazonMSKPermissionDenied
674+
675+
// AmazonMSKPublishPermissionDenied indicates permission denied encountered while publishing to the topic.
676+
AmazonMSKPublishPermissionDenied
677+
678+
// AmazonMSKClusterNotFound indicates the provided Msk cluster wasn't found.
679+
AmazonMSKClusterNotFound
680+
681+
// AmazonMSKTopicNotFound indicates the provided topic wasn't found.
682+
AmazonMSKTopicNotFound
683+
)
684+
685+
// IngestionDataSourceAmazonMSK are ingestion settings for Amazon MSK.
686+
type IngestionDataSourceAmazonMSK struct {
687+
// An output-only field that indicates the state of the Amazon
688+
// MSK ingestion source.
689+
State AmazonMSKState
690+
691+
// The Amazon Resource Name (ARN) that uniquely identifies the
692+
// cluster.
693+
ClusterARN string
694+
695+
// The name of the topic in the Amazon MSK cluster that Pub/Sub
696+
// will import from.
697+
Topic string
698+
699+
// AWS role ARN to be used for Federated Identity authentication
700+
// with Amazon MSK. Check the Pub/Sub docs for how to set up this role and
701+
// the required permissions that need to be attached to it.
702+
AWSRoleARN string
703+
704+
// The GCP service account to be used for Federated Identity
705+
// authentication with Amazon MSK (via a `AssumeRoleWithWebIdentity` call
706+
// for the provided role). The `aws_role_arn` must be set up with
707+
// `accounts.google.com:sub` equals to this service account number.
708+
GCPServiceAccount string
709+
}
710+
711+
var _ IngestionDataSource = (*IngestionDataSourceAmazonMSK)(nil)
712+
713+
func (i *IngestionDataSourceAmazonMSK) isIngestionDataSource() bool {
714+
return true
715+
}
716+
717+
// ConfluentCloudState denotes state of ingestion topic with confluent cloud
718+
type ConfluentCloudState int
719+
720+
const (
721+
// ConfluentCloudStateUnspecified is the default value. This value is unused.
722+
ConfluentCloudStateUnspecified = iota
723+
724+
// ConfluentCloudActive indicates the state is active.
725+
ConfluentCloudActive = 1
726+
727+
// ConfluentCloudPermissionDenied indicates permission denied encountered
728+
// while consuming data from Confluent Cloud.
729+
ConfluentCloudPermissionDenied = 2
730+
731+
// ConfluentCloudPublishPermissionDenied indicates permission denied encountered
732+
// while publishing to the topic.
733+
ConfluentCloudPublishPermissionDenied = 3
734+
735+
// ConfluentCloudUnreachableBootstrapServer indicates the provided bootstrap
736+
// server address is unreachable.
737+
ConfluentCloudUnreachableBootstrapServer = 4
738+
739+
// ConfluentCloudClusterNotFound indicates the provided cluster wasn't found.
740+
ConfluentCloudClusterNotFound = 5
741+
742+
// ConfluentCloudTopicNotFound indicates the provided topic wasn't found.
743+
ConfluentCloudTopicNotFound = 6
744+
)
745+
746+
// IngestionDataSourceConfluentCloud are ingestion settings for confluent cloud.
747+
type IngestionDataSourceConfluentCloud struct {
748+
// An output-only field that indicates the state of the
749+
// Confluent Cloud ingestion source.
750+
State ConfluentCloudState
751+
752+
// The address of the bootstrap server. The format is url:port.
753+
BootstrapServer string
754+
755+
// The id of the cluster.
756+
ClusterID string
757+
758+
// The name of the topic in the Confluent Cloud cluster that
759+
// Pub/Sub will import from.
760+
Topic string
761+
762+
// The id of the identity pool to be used for Federated Identity
763+
// authentication with Confluent Cloud. See
764+
// https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/identity-providers/oauth/identity-pools.html#add-oauth-identity-pools.
765+
IdentityPoolID string
766+
767+
// The GCP service account to be used for Federated Identity
768+
// authentication with `identity_pool_id`.
769+
GCPServiceAccount string
770+
}
771+
772+
var _ IngestionDataSource = (*IngestionDataSourceConfluentCloud)(nil)
773+
774+
func (i *IngestionDataSourceConfluentCloud) isIngestionDataSource() bool {
775+
return true
776+
}
777+
595778
func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
596779
if pbs == nil {
597780
return nil
@@ -625,6 +808,34 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
625808
MinimumObjectCreateTime: cs.GetMinimumObjectCreateTime().AsTime(),
626809
MatchGlob: cs.GetMatchGlob(),
627810
}
811+
} else if e := pbs.GetAzureEventHubs(); e != nil {
812+
s.Source = &IngestionDataSourceAzureEventHubs{
813+
State: EventHubsState(e.GetState()),
814+
ResourceGroup: e.GetResourceGroup(),
815+
Namespace: e.GetNamespace(),
816+
EventHub: e.GetEventHub(),
817+
ClientID: e.GetClientId(),
818+
TenantID: e.GetTenantId(),
819+
SubscriptionID: e.GetSubscriptionId(),
820+
GCPServiceAccount: e.GetGcpServiceAccount(),
821+
}
822+
} else if m := pbs.GetAwsMsk(); m != nil {
823+
s.Source = &IngestionDataSourceAmazonMSK{
824+
State: AmazonMSKState(m.GetState()),
825+
ClusterARN: m.GetClusterArn(),
826+
Topic: m.GetTopic(),
827+
AWSRoleARN: m.GetAwsRoleArn(),
828+
GCPServiceAccount: m.GetGcpServiceAccount(),
829+
}
830+
} else if c := pbs.GetConfluentCloud(); c != nil {
831+
s.Source = &IngestionDataSourceConfluentCloud{
832+
State: ConfluentCloudState(c.GetState()),
833+
BootstrapServer: c.GetBootstrapServer(),
834+
ClusterID: c.GetClusterId(),
835+
Topic: c.GetTopic(),
836+
IdentityPoolID: c.GetIdentityPoolId(),
837+
GCPServiceAccount: c.GetGcpServiceAccount(),
838+
}
628839
}
629840

630841
if pbs.PlatformLogsSettings != nil {
@@ -681,7 +892,6 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
681892
case *IngestionDataSourceCloudStorageAvroFormat:
682893
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
683894
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
684-
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
685895
Bucket: cs.Bucket,
686896
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_{
687897
AvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat{},
@@ -704,6 +914,40 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
704914
}
705915
}
706916
}
917+
if e, ok := out.(*IngestionDataSourceAzureEventHubs); ok {
918+
pbs.Source = &pb.IngestionDataSourceSettings_AzureEventHubs_{
919+
AzureEventHubs: &pb.IngestionDataSourceSettings_AzureEventHubs{
920+
ResourceGroup: e.ResourceGroup,
921+
Namespace: e.Namespace,
922+
EventHub: e.EventHub,
923+
ClientId: e.ClientID,
924+
TenantId: e.TenantID,
925+
SubscriptionId: e.SubscriptionID,
926+
GcpServiceAccount: e.GCPServiceAccount,
927+
},
928+
}
929+
}
930+
if m, ok := out.(*IngestionDataSourceAmazonMSK); ok {
931+
pbs.Source = &pb.IngestionDataSourceSettings_AwsMsk_{
932+
AwsMsk: &pb.IngestionDataSourceSettings_AwsMsk{
933+
ClusterArn: m.ClusterARN,
934+
Topic: m.Topic,
935+
AwsRoleArn: m.AWSRoleARN,
936+
GcpServiceAccount: m.GCPServiceAccount,
937+
},
938+
}
939+
}
940+
if c, ok := out.(*IngestionDataSourceConfluentCloud); ok {
941+
pbs.Source = &pb.IngestionDataSourceSettings_ConfluentCloud_{
942+
ConfluentCloud: &pb.IngestionDataSourceSettings_ConfluentCloud{
943+
BootstrapServer: c.BootstrapServer,
944+
ClusterId: c.ClusterID,
945+
Topic: c.Topic,
946+
IdentityPoolId: c.IdentityPoolID,
947+
GcpServiceAccount: c.GCPServiceAccount,
948+
},
949+
}
950+
}
707951
}
708952
return pbs
709953
}

pubsub/topic_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,75 @@ func TestTopic_IngestionCloudStorage(t *testing.T) {
230230
}
231231
}
232232

233+
func TestTopic_Ingestion(t *testing.T) {
234+
c, srv := newFake(t)
235+
defer c.Close()
236+
defer srv.Close()
237+
id := "test-topic-3p-ingestion"
238+
gcpSA := "[email protected]"
239+
azureIngestion := &IngestionDataSourceAzureEventHubs{
240+
ResourceGroup: "fake-resource-group",
241+
Namespace: "fake-namespace",
242+
EventHub: "fake-event-hub",
243+
ClientID: "11111111-1111-1111-1111-111111111111",
244+
TenantID: "22222222-2222-2222-2222-222222222222",
245+
SubscriptionID: "33333333-3333-3333-3333-333333333333",
246+
GCPServiceAccount: gcpSA,
247+
}
248+
mskIngestion := &IngestionDataSourceAmazonMSK{
249+
ClusterARN: "arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1",
250+
Topic: "fake-msk-topic-name",
251+
AWSRoleARN: "arn:aws:iam::111111111111:role/fake-role-name",
252+
GCPServiceAccount: gcpSA,
253+
}
254+
confluentCloud := &IngestionDataSourceConfluentCloud{
255+
BootstrapServer: "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092",
256+
ClusterID: "fake-cluster-id",
257+
Topic: "fake-confluent-topic-name",
258+
IdentityPoolID: "fake-pool-id",
259+
GCPServiceAccount: gcpSA,
260+
}
261+
want := TopicConfig{
262+
IngestionDataSourceSettings: &IngestionDataSourceSettings{
263+
Source: azureIngestion,
264+
},
265+
}
266+
topic := mustCreateTopicWithConfig(t, c, id, &want)
267+
got, err := topic.Config(context.Background())
268+
if err != nil {
269+
t.Fatalf("error getting topic config: %v", err)
270+
}
271+
want.State = TopicStateActive
272+
opt := cmpopts.IgnoreUnexported(TopicConfig{})
273+
if !testutil.Equal(got, want, opt) {
274+
t.Errorf("got %v, want %v", got, want)
275+
}
276+
277+
// Update ingestion settings to use MSK
278+
ctx := context.Background()
279+
settings := &IngestionDataSourceSettings{
280+
Source: mskIngestion,
281+
}
282+
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
283+
if err != nil {
284+
t.Fatal(err)
285+
}
286+
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
287+
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
288+
}
289+
290+
settings = &IngestionDataSourceSettings{
291+
Source: confluentCloud,
292+
}
293+
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
294+
if err != nil {
295+
t.Fatal(err)
296+
}
297+
if !testutil.Equal(config3.IngestionDataSourceSettings, settings, opt) {
298+
t.Errorf("\ngot %+v\nwant %+v", config3.IngestionDataSourceSettings, settings)
299+
}
300+
}
301+
233302
func TestListTopics(t *testing.T) {
234303
ctx := context.Background()
235304
c, srv := newFake(t)

0 commit comments

Comments
 (0)