Skip to content

Commit 098f631

Browse files
feat(PubSub): Added samples for Exactly Once Delivery (GoogleCloudPlatform#1672)
* PubSub: Added samples for Exactly Once Delivery * Pubsub: Added tests for excatly once delivery samples * PubSub: Addressed PR comments
1 parent 6609166 commit 098f631

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
2+
/**
3+
* Copyright 2022 Google Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* For instructions on how to run the full sample:
20+
*
21+
* @see https://github.com/GoogleCloudPlatform/php-docs-samples/blob/master/pubsub/api/README.md
22+
*/
23+
24+
namespace Google\Cloud\Samples\PubSub;
25+
26+
# [START pubsub_create_subscription_with_exactly_once_delivery]
27+
use Google\Cloud\PubSub\PubSubClient;
28+
29+
/**
30+
* Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
31+
*
32+
* @param string $projectId The Google project ID.
33+
* @param string $topicName The Pub/Sub topic name.
34+
* @param string $subscriptionName The Pub/Sub subscription name.
35+
*/
36+
function create_subscription_with_exactly_once_delivery(
37+
string $projectId,
38+
string $topicName,
39+
string $subscriptionName
40+
): void {
41+
$pubsub = new PubSubClient([
42+
'projectId' => $projectId,
43+
]);
44+
$topic = $pubsub->topic($topicName);
45+
$subscription = $topic->subscription($subscriptionName);
46+
$subscription->create([
47+
'enableExactlyOnceDelivery' => true
48+
]);
49+
50+
// Exactly Once Delivery status for the subscription
51+
$status = $subscription->info()['enableExactlyOnceDelivery'];
52+
53+
printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
54+
}
55+
# [END pubsub_create_subscription_with_exactly_once_delivery]
56+
57+
// The following 2 lines are only needed to run the samples
58+
require_once __DIR__ . '/../../../testing/sample_helpers.php';
59+
\Google\Cloud\Samples\execute_sample(__FILE__, __NAMESPACE__, $argv);
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
2+
/**
3+
* Copyright 2022 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* For instructions on how to run the full sample:
20+
*
21+
* @see https://github.com/GoogleCloudPlatform/php-docs-samples/blob/master/pubsub/api/README.md
22+
*/
23+
namespace Google\Cloud\Samples\PubSub;
24+
25+
# [START pubsub_subscriber_exactly_once]
26+
use Google\Cloud\PubSub\PubSubClient;
27+
28+
/**
29+
* Subscribe and pull messages from a subscription
30+
* with `Exactly Once Delivery` enabled.
31+
*
32+
* @param string $projectId
33+
* @param string $subscriptionId
34+
*/
35+
function subscribe_exactly_once_delivery(
36+
string $projectId,
37+
string $subscriptionId
38+
): void {
39+
$pubsub = new PubSubClient([
40+
'projectId' => $projectId,
41+
]);
42+
43+
$subscription = $pubsub->subscription($subscriptionId);
44+
$messages = $subscription->pull();
45+
46+
foreach ($messages as $message) {
47+
// When exactly once delivery is enabled on the subscription,
48+
// the message is guaranteed to not be delivered again if the ack succeeds.
49+
// Passing the `returnFailures` flag retries any temporary failures received
50+
// while acking the msg and also returns any permanently failed msgs.
51+
// Passing this flag on a subscription with exactly once delivery disabled
52+
// will always return an empty array.
53+
$failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);
54+
55+
if (empty($failedMsgs)) {
56+
printf('Acknowledged message: %s' . PHP_EOL, $message->data());
57+
} else {
58+
// Either log or store the $failedMsgs to be retried later
59+
}
60+
}
61+
}
62+
# [END pubsub_subscriber_exactly_once]
63+
64+
// The following 2 lines are only needed to run the samples
65+
require_once __DIR__ . '/../../../testing/sample_helpers.php';
66+
\Google\Cloud\Samples\execute_sample(__FILE__, __NAMESPACE__, $argv);

pubsub/api/test/pubsubTest.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ class PubSubTest extends TestCase
3131
use ExecuteCommandTrait;
3232
use EventuallyConsistentTestTrait;
3333

34+
private static $eodSubscriptionId;
35+
36+
public static function setUpBeforeClass(): void
37+
{
38+
self::$eodSubscriptionId = 'test-eod-subscription-' . rand();
39+
}
40+
3441
public function testSubscriptionPolicy()
3542
{
3643
$subscription = $this->requireEnv('GOOGLE_PUBSUB_SUBSCRIPTION');
@@ -225,6 +232,20 @@ public function testCreateAndDeleteSubscriptionWithFilter()
225232
), $output);
226233
}
227234

235+
public function testCreateSubscriptionWithExactlyOnceDelivery()
236+
{
237+
$topic = $this->requireEnv('GOOGLE_PUBSUB_TOPIC');
238+
$subscription = self::$eodSubscriptionId;
239+
240+
$output = $this->runFunctionSnippet('create_subscription_with_exactly_once_delivery', [
241+
self::$projectId,
242+
$topic,
243+
$subscription
244+
]);
245+
246+
$this->assertStringContainsString('Subscription created with exactly once delivery status: true', $output);
247+
}
248+
228249
public function testCreateAndDeletePushSubscription()
229250
{
230251
$topic = $this->requireEnv('GOOGLE_PUBSUB_TOPIC');
@@ -332,4 +353,36 @@ public function testPullMessagesBatchPublisher()
332353
shell_exec('kill -9 ' . $pid);
333354
putenv('IS_BATCH_DAEMON_RUNNING=');
334355
}
356+
357+
/**
358+
* @depends testCreateSubscriptionWithExactlyOnceDelivery
359+
*/
360+
public function testSubscribeExactlyOnceDelivery()
361+
{
362+
$topic = $this->requireEnv('GOOGLE_PUBSUB_TOPIC');
363+
$subscription = self::$eodSubscriptionId;
364+
365+
$output = $this->runFunctionSnippet('publish_message', [
366+
self::$projectId,
367+
$topic,
368+
'This is a test message',
369+
]);
370+
371+
$this->runEventuallyConsistentTest(function () use ($subscription) {
372+
$output = $this->runFunctionSnippet('subscribe_exactly_once_delivery', [
373+
self::$projectId,
374+
$subscription,
375+
]);
376+
377+
// delete the subscription
378+
$this->runFunctionSnippet('delete_subscription', [
379+
self::$projectId,
380+
$subscription,
381+
]);
382+
383+
// There should be at least one acked message
384+
// pulled from the subscription.
385+
$this->assertRegExp('/Acknowledged message:/', $output);
386+
});
387+
}
335388
}

0 commit comments

Comments
 (0)