Skip to content

Commit fd49db5

Browse files
authored
feat(pubsub): support payload wrapping for push subs (#8292)
* feat(pubsub): support payload wrapping for push subs * fix lint issues * fix lint issues
1 parent d3f60b3 commit fd49db5

File tree

5 files changed

+98
-18
lines changed

5 files changed

+98
-18
lines changed

.github/workflows/vet.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ golint ./... 2>&1 | (
5555
grep -vE " executeStreamingSql(Min|Rnd)Time" |
5656
grep -vE " executeSql(Min|Rnd)Time" |
5757
grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" |
58+
grep -vE "pubsub\/subscription\.go.+ type name will be used as pubsub.PubsubWrapper by other packages" |
5859
grep -v "ClusterId" |
5960
grep -v "InstanceId" |
6061
grep -v "firestore.arrayUnion" |

pubsub/pstest/fake.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
498498
}
499499
if ps.PushConfig == nil {
500500
ps.PushConfig = &pb.PushConfig{}
501+
} else if ps.PushConfig.Wrapper == nil {
502+
// Wrapper should default to PubsubWrapper.
503+
ps.PushConfig.Wrapper = &pb.PushConfig_PubsubWrapper_{
504+
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
505+
}
501506
}
502507
// Consider any table set to mean the config is active.
503508
// We don't convert nil config to empty like with PushConfig above

pubsub/pstest/fake_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,6 +1543,9 @@ func TestSubscriptionPushPull(t *testing.T) {
15431543
// Create a push subscription.
15441544
pc := &pb.PushConfig{
15451545
PushEndpoint: "some-endpoint",
1546+
Wrapper: &pb.PushConfig_PubsubWrapper_{
1547+
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
1548+
},
15461549
}
15471550
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
15481551
AckDeadlineSeconds: minAckDeadlineSecs,

pubsub/subscription.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ type PushConfig struct {
148148
// This field is optional and should be set only by users interested in
149149
// authenticated push.
150150
AuthenticationMethod AuthenticationMethod
151+
152+
// The format of the delivered message to the push endpoint is defined by
153+
// the chosen wrapper. When unset, `PubsubWrapper` is used.
154+
Wrapper Wrapper
151155
}
152156

153157
func (pc *PushConfig) toProto() *pb.PushConfig {
@@ -165,12 +169,19 @@ func (pc *PushConfig) toProto() *pb.PushConfig {
165169
default: // TODO: add others here when GAIC adds more definitions.
166170
}
167171
}
172+
if w := pc.Wrapper; w != nil {
173+
switch wt := w.(type) {
174+
case *PubsubWrapper:
175+
pbCfg.Wrapper = wt.toProto()
176+
case *NoWrapper:
177+
pbCfg.Wrapper = wt.toProto()
178+
default:
179+
}
180+
}
168181
return pbCfg
169182
}
170183

171-
// AuthenticationMethod is used by push points to verify the source of push requests.
172-
// This interface defines fields that are part of a closed alpha that may not be accessible
173-
// to all users.
184+
// AuthenticationMethod is used by push subscriptions to verify the source of push requests.
174185
type AuthenticationMethod interface {
175186
isAuthMethod() bool
176187
}
@@ -212,6 +223,49 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
212223
}
213224
}
214225

226+
// Wrapper defines the format of message delivered to push endpoints.
227+
type Wrapper interface {
228+
isWrapper() bool
229+
}
230+
231+
// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
232+
// representation of a PubsubMessage
233+
// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
234+
type PubsubWrapper struct{}
235+
236+
var _ Wrapper = (*PubsubWrapper)(nil)
237+
238+
func (p *PubsubWrapper) isWrapper() bool { return true }
239+
240+
func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
241+
if p == nil {
242+
return nil
243+
}
244+
return &pb.PushConfig_PubsubWrapper_{
245+
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
246+
}
247+
}
248+
249+
// NoWrapper denotes not wrapping the payload sent to the push endpoint.
250+
type NoWrapper struct {
251+
WriteMetadata bool
252+
}
253+
254+
var _ Wrapper = (*NoWrapper)(nil)
255+
256+
func (n *NoWrapper) isWrapper() bool { return true }
257+
258+
func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
259+
if n == nil {
260+
return nil
261+
}
262+
return &pb.PushConfig_NoWrapper_{
263+
NoWrapper: &pb.PushConfig_NoWrapper{
264+
WriteMetadata: n.WriteMetadata,
265+
},
266+
}
267+
}
268+
215269
// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
216270
type BigQueryConfigState int
217271

@@ -648,6 +702,16 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
648702
}
649703
}
650704
}
705+
if w := pbPc.Wrapper; w != nil {
706+
switch wt := w.(type) {
707+
case *pb.PushConfig_PubsubWrapper_:
708+
pc.Wrapper = &PubsubWrapper{}
709+
case *pb.PushConfig_NoWrapper_:
710+
pc.Wrapper = &NoWrapper{
711+
WriteMetadata: wt.NoWrapper.WriteMetadata,
712+
}
713+
}
714+
}
651715
return pc
652716
}
653717

pubsub/subscription_test.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func TestListTopicSubscriptions(t *testing.T) {
154154

155155
const defaultRetentionDuration = 168 * time.Hour
156156

157-
func TestUpdateSubscription(t *testing.T) {
157+
func TestSubscriptionConfig(t *testing.T) {
158158
ctx := context.Background()
159159
client, srv := newFake(t)
160160
defer client.Close()
@@ -191,13 +191,14 @@ func TestUpdateSubscription(t *testing.T) {
191191
ServiceAccountEmail: "[email protected]",
192192
Audience: "client-12345",
193193
},
194+
Wrapper: &PubsubWrapper{},
194195
},
195196
EnableExactlyOnceDelivery: false,
196197
State: SubscriptionStateActive,
197198
}
198199
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
199-
if !testutil.Equal(cfg, want, opt) {
200-
t.Fatalf("\ngot %+v\nwant %+v", cfg, want)
200+
if diff := testutil.Diff(cfg, want, opt); diff != "" {
201+
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
201202
}
202203

203204
got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
@@ -206,10 +207,13 @@ func TestUpdateSubscription(t *testing.T) {
206207
Labels: map[string]string{"label": "value"},
207208
ExpirationPolicy: 72 * time.Hour,
208209
PushConfig: &PushConfig{
209-
Endpoint: "https://example.com/push",
210+
Endpoint: "https://example2.com/push",
210211
AuthenticationMethod: &OIDCToken{
211-
ServiceAccountEmail: "[email protected]",
212-
Audience: "client-12345",
212+
ServiceAccountEmail: "[email protected]",
213+
Audience: "client-98765",
214+
},
215+
Wrapper: &NoWrapper{
216+
WriteMetadata: true,
213217
},
214218
},
215219
EnableExactlyOnceDelivery: true,
@@ -225,17 +229,20 @@ func TestUpdateSubscription(t *testing.T) {
225229
Labels: map[string]string{"label": "value"},
226230
ExpirationPolicy: 72 * time.Hour,
227231
PushConfig: PushConfig{
228-
Endpoint: "https://example.com/push",
232+
Endpoint: "https://example2.com/push",
229233
AuthenticationMethod: &OIDCToken{
230-
ServiceAccountEmail: "[email protected]",
231-
Audience: "client-12345",
234+
ServiceAccountEmail: "[email protected]",
235+
Audience: "client-98765",
236+
},
237+
Wrapper: &NoWrapper{
238+
WriteMetadata: true,
232239
},
233240
},
234241
EnableExactlyOnceDelivery: true,
235242
State: SubscriptionStateActive,
236243
}
237-
if !testutil.Equal(got, want, opt) {
238-
t.Fatalf("\ngot %+v\nwant %+v", got, want)
244+
if diff := testutil.Diff(got, want, opt); diff != "" {
245+
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
239246
}
240247

241248
got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
@@ -247,8 +254,8 @@ func TestUpdateSubscription(t *testing.T) {
247254
}
248255
want.RetentionDuration = 2 * time.Hour
249256
want.Labels = nil
250-
if !testutil.Equal(got, want, opt) {
251-
t.Fatalf("\ngot %+v\nwant %+v", got, want)
257+
if diff := testutil.Diff(got, want, opt); diff != "" {
258+
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
252259
}
253260

254261
_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
@@ -264,8 +271,8 @@ func TestUpdateSubscription(t *testing.T) {
264271
t.Fatal(err)
265272
}
266273
want.ExpirationPolicy = time.Duration(0)
267-
if !testutil.Equal(got, want, opt) {
268-
t.Fatalf("\ngot %+v\nwant %+v", got, want)
274+
if diff := testutil.Diff(got, want, opt); diff != "" {
275+
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
269276
}
270277
}
271278

0 commit comments

Comments
 (0)