Skip to content

Commit 107ea0f

Browse files
jdpedriebshaffer
authored andcommitted
feat: add pubsub batch publishing sample (GoogleCloudPlatform#984)
1 parent 1137aa0 commit 107ea0f

File tree

4 files changed

+108
-0
lines changed

4 files changed

+108
-0
lines changed

pubsub/api/composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"src/get_topic_policy.php",
1616
"src/list_subscriptions.php",
1717
"src/list_topics.php",
18+
"src/publish_message_batch.php",
1819
"src/publish_message.php",
1920
"src/pull_messages.php",
2021
"src/set_subscription_policy.php",

pubsub/api/pubsub.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
->addArgument('message', InputArgument::OPTIONAL, 'A message to publish to the topic')
7676
->addOption('create', null, InputOption::VALUE_NONE, 'Create the topic. ')
7777
->addOption('delete', null, InputOption::VALUE_NONE, 'Delete the topic. ')
78+
->addOption('batch', null, InputOption::VALUE_NONE, 'Use the batch publisher.')
7879
->setCode(function ($input, $output) {
7980
$projectId = $input->getArgument('project');
8081
$topicName = $input->getArgument('topic');
@@ -84,6 +85,8 @@
8485
create_topic($projectId, $topicName);
8586
} elseif ($input->getOption('delete')) {
8687
delete_topic($projectId, $topicName);
88+
} elseif ($input->getOption('batch') && $message = $input->getArgument('message')) {
89+
publish_message_batch($projectId, $topicName, $message);
8790
} elseif ($message = $input->getArgument('message')) {
8891
publish_message($projectId, $topicName, $message);
8992
} else {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
2+
/**
3+
* Copyright 2019 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* For instructions on how to run the full sample:
20+
*
21+
* @see https://github.com/GoogleCloudPlatform/php-docs-samples/blob/master/pubsub/api/README.md
22+
*/
23+
24+
namespace Google\Cloud\Samples\PubSub;
25+
26+
# [START pubsub_publisher_batch_settings]
27+
use Google\Cloud\PubSub\PubSubClient;
28+
29+
/**
30+
* Publishes a message for a Pub/Sub topic.
31+
*
32+
* The publisher should be used in conjunction with the `google-cloud-batch`
33+
* daemon, which should be running in the background.
34+
*
35+
* To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
36+
*
37+
* @param string $projectId The Google project ID.
38+
* @param string $topicName The Pub/Sub topic name.
39+
* @param string $message The message to publish.
40+
*/
41+
function publish_message_batch($projectId, $topicName, $message)
42+
{
43+
// Check if the batch daemon is running.
44+
if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
45+
trigger_error(
46+
'The batch daemon is not running. Call ' .
47+
'`vendor/bin/google-cloud-batch daemon` from ' .
48+
'your project root to start the daemon.',
49+
E_USER_NOTICE
50+
);
51+
}
52+
53+
$batchOptions = [
54+
'batchSize' => 100, // Max messages for each batch.
55+
'callPeriod' => 0.01, // Max time in seconds between each batch publish.
56+
];
57+
58+
$pubsub = new PubSubClient([
59+
'projectId' => $projectId,
60+
]);
61+
$topic = $pubsub->topic($topicName);
62+
$publisher = $topic->batchPublisher([
63+
'batchOptions' => $batchOptions
64+
]);
65+
66+
for ($i = 0; $i < 10; $i++) {
67+
$publisher->publish(['data' => $message]);
68+
}
69+
70+
print('Messages enqueued for publication.' . PHP_EOL);
71+
}
72+
# [END pubsub_publisher_batch_settings]

pubsub/api/test/pubsubTest.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,36 @@ public function testPullMessages()
252252
$this->assertRegExp('/This is a test message/', $output);
253253
});
254254
}
255+
256+
public function testPullMessagesBatchPublisher()
257+
{
258+
$topic = $this->requireEnv('GOOGLE_PUBSUB_TOPIC');
259+
$subscription = $this->requireEnv('GOOGLE_PUBSUB_SUBSCRIPTION');
260+
$messageData = uniqid('message-');
261+
262+
$pid = shell_exec(
263+
'php ' . __DIR__ . '/../vendor/bin/google-cloud-batch daemon > /dev/null 2>&1 & echo $!'
264+
);
265+
putenv('IS_BATCH_DAEMON_RUNNING=true');
266+
267+
$output = $this->runCommand('topic', [
268+
'project' => self::$projectId,
269+
'topic' => $topic,
270+
'message' => $messageData,
271+
'--batch' => true
272+
]);
273+
274+
$this->assertRegExp('/Messages enqueued for publication/', $output);
275+
276+
$this->runEventuallyConsistentTest(function () use ($subscription, $messageData) {
277+
$output = $this->runCommand('subscription', [
278+
'subscription' => $subscription,
279+
'project' => self::$projectId,
280+
]);
281+
$this->assertContains($messageData, $output);
282+
});
283+
284+
shell_exec('kill -9 ' . $pid);
285+
putenv('IS_BATCH_DAEMON_RUNNING=');
286+
}
255287
}

0 commit comments

Comments
 (0)