-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Fix WriteToPubSub to pass ordering_key to publish() method #37345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0e31f24
851d9a8
c14008b
1797741
75816e7
34d536d
4466389
1dd4772
9441a3c
35c2dae
aa16914
ef77013
d2ab2f2
559ff38
2f424a2
627f9da
e167661
506efa3
f0505e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,7 @@ def expand(self, pbegin): | |
| # this is not implemented yet on the Java side: | ||
| # ('with_attributes', bool), | ||
| ('timestamp_attribute', typing.Optional[str]), | ||
| ('publish_with_ordering_key', bool), | ||
| ]) | ||
|
|
||
|
|
||
|
|
@@ -135,6 +136,7 @@ def __init__( | |
| with_attributes=False, | ||
| id_label=None, | ||
| timestamp_attribute=None, | ||
| publish_with_ordering_key=False, | ||
| expansion_service=None): | ||
| """Initializes ``WriteToPubSub``. | ||
|
|
||
|
|
@@ -150,18 +152,23 @@ def __init__( | |
| in a ReadFromPubSub PTransform to deduplicate messages. | ||
| timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub | ||
| message with the given name and the message's publish time as the value. | ||
| publish_with_ordering_key: If True, enables ordering key support when | ||
| publishing messages. The ordering key must be set on each | ||
| PubsubMessage via the ``ordering_key`` attribute. | ||
| """ | ||
| self.params = WriteToPubsubSchema( | ||
| topic=topic, | ||
| id_label=id_label, | ||
| # with_attributes=with_attributes, | ||
| timestamp_attribute=timestamp_attribute) | ||
| timestamp_attribute=timestamp_attribute, | ||
| publish_with_ordering_key=publish_with_ordering_key) | ||
| self.expansion_service = expansion_service | ||
| self.with_attributes = with_attributes | ||
|
|
||
| def expand(self, pvalue): | ||
| if self.with_attributes: | ||
| pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) | ||
| pcoll = pvalue | 'ToProto' >> Map( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good |
||
| pubsub.WriteToPubSub.message_to_proto_str) | ||
| else: | ||
| pcoll = pvalue | 'ToProto' >> Map( | ||
| lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,8 +31,9 @@ | |
| """ | ||
|
|
||
| # pytype: skip-file | ||
|
|
||
| import logging | ||
| import re | ||
| import time | ||
| from typing import Any | ||
| from typing import NamedTuple | ||
| from typing import Optional | ||
|
|
@@ -430,7 +431,16 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes: | |
| def expand(self, pcoll): | ||
| # Store pipeline options for use in DoFn | ||
| self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None | ||
|
|
||
| # Warn Dataflow users to use the XLang path for ordering key support, | ||
| # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation. | ||
| runner = self.pipeline_options.get_all_options().get( | ||
| 'runner', '') if self.pipeline_options else '' | ||
| if 'Dataflow' in str(runner): | ||
| logging.warning( | ||
| 'WriteToPubSub ordering_key support is not available on Dataflow ' | ||
| 'via this transform. Use the XLang WriteToPubSub path instead: ' | ||
| 'apache_beam.io.external.gcp.pubsub.WriteToPubSub with ' | ||
| 'publish_with_ordering_key=True.') | ||
| if self.with_attributes: | ||
| pcoll = pcoll | 'ToProtobufX' >> ParDo( | ||
| _AddMetricsAndMap( | ||
|
|
@@ -597,7 +607,7 @@ def __init__(self, transform): | |
| output_labels_supported = False | ||
|
|
||
| # Log debug information for troubleshooting | ||
| import logging | ||
|
|
||
| runner_info = getattr( | ||
| pipeline_options, 'runner', | ||
| 'None') if pipeline_options else 'No options' | ||
|
|
@@ -628,7 +638,10 @@ def __init__(self, transform): | |
|
|
||
| def setup(self): | ||
| from google.cloud import pubsub | ||
| self._pub_client = pubsub.PublisherClient() | ||
| self._pub_client = pubsub.PublisherClient( | ||
| publisher_options=pubsub.types.PublisherOptions( | ||
| enable_message_ordering=True, | ||
| )) | ||
| self._topic = self._pub_client.topic_path( | ||
| self.project, self.short_topic_name) | ||
|
|
||
|
|
@@ -647,21 +660,29 @@ def _flush(self): | |
| if not self._buffer: | ||
| return | ||
|
|
||
| import time | ||
|
|
||
| # The elements in buffer are serialized protobuf bytes from the previous | ||
| # transforms. We need to deserialize them to extract data and attributes. | ||
| futures = [] | ||
| for elem in self._buffer: | ||
| # Deserialize the protobuf to get the original PubsubMessage | ||
| pubsub_msg = PubsubMessage._from_proto_str(elem) | ||
|
|
||
| # Publish with the correct data and attributes | ||
| # Publish with the correct data, attributes, and ordering_key | ||
| if self.with_attributes and pubsub_msg.attributes: | ||
| future = self._pub_client.publish( | ||
| self._topic, pubsub_msg.data, **pubsub_msg.attributes) | ||
| self._topic, | ||
| pubsub_msg.data, | ||
| ordering_key=pubsub_msg.ordering_key | ||
| if pubsub_msg.ordering_key else '', | ||
| **pubsub_msg.attributes) | ||
| else: | ||
| future = self._pub_client.publish(self._topic, pubsub_msg.data) | ||
| if pubsub_msg.ordering_key: | ||
| future = self._pub_client.publish( | ||
| self._topic, | ||
| pubsub_msg.data, | ||
| ordering_key=pubsub_msg.ordering_key) | ||
| else: | ||
| future = self._pub_client.publish(self._topic, pubsub_msg.data) | ||
|
Comment on lines
671
to
+685
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for publishing messages can be simplified to improve readability and reduce code duplication. You can determine the attributes and attributes = {}
if self.with_attributes and pubsub_msg.attributes:
attributes = pubsub_msg.attributes
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key,
**attributes)
else:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **attributes)
Comment on lines
671
to
+685
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for publishing messages can be simplified to reduce code duplication and improve readability. Instead of separate code paths for messages with and without attributes, you can build a dictionary of keyword arguments for the publish_kwargs = {}
if self.with_attributes and pubsub_msg.attributes:
publish_kwargs.update(pubsub_msg.attributes)
if pubsub_msg.ordering_key:
publish_kwargs['ordering_key'] = pubsub_msg.ordering_key
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **publish_kwargs) |
||
|
|
||
| futures.append(future) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -26,8 +26,6 @@ | |||
| import uuid | ||||
|
|
||||
| import pytest | ||||
| from hamcrest.core.core.allof import all_of | ||||
|
|
||||
| from apache_beam.io.gcp import pubsub_it_pipeline | ||||
| from apache_beam.io.gcp.pubsub import PubsubMessage | ||||
| from apache_beam.io.gcp.pubsub import WriteToPubSub | ||||
|
|
@@ -36,6 +34,7 @@ | |||
| from apache_beam.testing import test_utils | ||||
| from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher | ||||
| from apache_beam.testing.test_pipeline import TestPipeline | ||||
| from hamcrest.core.core.allof import all_of | ||||
|
|
||||
| INPUT_TOPIC = 'psit_topic_input' | ||||
| OUTPUT_TOPIC = 'psit_topic_output' | ||||
|
|
@@ -305,6 +304,96 @@ def test_batch_write_with_attributes(self): | |||
| """Test WriteToPubSub in batch mode with attributes.""" | ||||
| self._test_batch_write(with_attributes=True) | ||||
|
|
||||
| @pytest.mark.it_postcommit | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will work only in direct runner, but not in dataflow runner - correct?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also wondering, if we can have a warning for Dataflow that that advises users to use xlang version if they wish to use ordering key. IIRC we have something like that in Java SDK?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct - the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We need to configure the test to run in direct runner suite, but not in dataflow suite.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We need to show the error at job submission, so at construction time I think if we put the warning inside _PubSubWriteFun, then the warning emiting code will be serialized and become part of the pipeline definition, but won't get executed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if somehow the pipeline options to determine the runner are not available, we can access them like so: beam/sdks/python/apache_beam/coders/coders.py Line 1012 in 4da3e55
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, to trigger a particular postcommit suite, you can trivially modify a corresponding trigger file, such as https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Python.json
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you please trigger this test suite and let me know once this change is ready for review? thanks! |
||||
| def test_batch_write_with_ordering_key(self): | ||||
| """Test WriteToPubSub in batch mode with ordering keys. | ||||
|
|
||||
| Dataflow's Native Pub/Sub Sink does not support ordering_key | ||||
| (see https://github.com/apache/beam/issues/36201), so this test | ||||
| only applies to runners using Beam's Python WriteToPubSub Sink. | ||||
| Dataflow users should use the XLang WriteToPubSub path instead | ||||
| (apache_beam.io.external.gcp.pubsub.WriteToPubSub with | ||||
| publish_with_ordering_key=True). | ||||
| """ | ||||
| if self.runner_name == 'TestDataflowRunner': | ||||
| self.skipTest( | ||||
| 'Dataflow Native PubSub Sink does not support ordering_key ' | ||||
| '(see https://github.com/apache/beam/issues/36201). ' | ||||
| 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub ' | ||||
| 'with publish_with_ordering_key=True instead.') | ||||
|
|
||||
| from apache_beam.options.pipeline_options import PipelineOptions | ||||
| from apache_beam.options.pipeline_options import StandardOptions | ||||
| from apache_beam.transforms import Create | ||||
| from google.pubsub_v1.types import Subscription | ||||
|
|
||||
| ordering_topic = self.pub_client.create_topic( | ||||
| name=self.pub_client.topic_path( | ||||
| self.project, 'psit_topic_ordering' + self.uuid)) | ||||
| ordering_sub = self.sub_client.create_subscription( | ||||
| request=Subscription( | ||||
| name=self.sub_client.subscription_path( | ||||
| self.project, 'psit_sub_ordering' + self.uuid), | ||||
| topic=ordering_topic.name, | ||||
| enable_message_ordering=True, | ||||
| )) | ||||
| time.sleep(10) | ||||
|
|
||||
| try: | ||||
| test_messages = [ | ||||
| PubsubMessage( | ||||
| b'order_data001', {'attr': 'value1'}, ordering_key='key1'), | ||||
| PubsubMessage( | ||||
| b'order_data002', {'attr': 'value2'}, ordering_key='key1'), | ||||
| PubsubMessage( | ||||
| b'order_data003', {'attr': 'value3'}, ordering_key='key2'), | ||||
| ] | ||||
|
|
||||
| pipeline_options = PipelineOptions() | ||||
| pipeline_options.view_as(StandardOptions).streaming = False | ||||
|
|
||||
| with TestPipeline(options=pipeline_options) as p: | ||||
| messages = p | 'CreateMessages' >> Create(test_messages) | ||||
| _ = messages | 'WriteToPubSub' >> WriteToPubSub( | ||||
| ordering_topic.name, with_attributes=True) | ||||
|
|
||||
| time.sleep(10) | ||||
|
|
||||
| # Retry pulling to handle PubSub delivery delays | ||||
| received_messages = [] | ||||
| deadline = time.time() + 60 # wait up to 60 seconds | ||||
| while time.time() < deadline: | ||||
| response = self.sub_client.pull( | ||||
| request={ | ||||
| 'subscription': ordering_sub.name, | ||||
| 'max_messages': 10, | ||||
| }) | ||||
| received_messages.extend(response.received_messages) | ||||
| if len(received_messages) >= len(test_messages): | ||||
| break | ||||
| time.sleep(5) | ||||
|
|
||||
| self.assertEqual(len(received_messages), len(test_messages)) | ||||
|
|
||||
| received_map = { | ||||
| msg.message.data: msg.message | ||||
| for msg in received_messages | ||||
| } | ||||
| self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') | ||||
| self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') | ||||
| self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') | ||||
|
|
||||
| ack_ids = [msg.ack_id for msg in received_messages] | ||||
| self.sub_client.acknowledge( | ||||
| request={ | ||||
| 'subscription': ordering_sub.name, | ||||
| 'ack_ids': ack_ids, | ||||
| }) | ||||
| finally: | ||||
| self.sub_client.delete_subscription( | ||||
| request={'subscription': ordering_sub.name}) | ||||
| self.pub_client.delete_topic(request={'topic': ordering_topic.name}) | ||||
|
|
||||
|
|
||||
| if __name__ == '__main__': | ||||
| logging.getLogger().setLevel(logging.DEBUG) | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1098,6 +1098,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): | |||||||||||||
| Lineage.query(p.result.metrics(), Lineage.SINK), | ||||||||||||||
| set(["pubsub:topic:fakeprj.a_topic"])) | ||||||||||||||
|
|
||||||||||||||
| def test_write_messages_with_ordering_key(self, mock_pubsub): | ||||||||||||||
| """Test WriteToPubSub with ordering_key in messages.""" | ||||||||||||||
| data = b'data' | ||||||||||||||
| ordering_key = 'order-123' | ||||||||||||||
| attributes = {'key': 'value'} | ||||||||||||||
| payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)] | ||||||||||||||
|
|
||||||||||||||
| options = PipelineOptions([]) | ||||||||||||||
| options.view_as(StandardOptions).streaming = True | ||||||||||||||
| with TestPipeline(options=options) as p: | ||||||||||||||
| _ = ( | ||||||||||||||
| p | ||||||||||||||
| | Create(payloads) | ||||||||||||||
| | WriteToPubSub( | ||||||||||||||
| 'projects/fakeprj/topics/a_topic', with_attributes=True)) | ||||||||||||||
|
|
||||||||||||||
| # Verify that publish was called with ordering_key | ||||||||||||||
| mock_pubsub.return_value.publish.assert_called() | ||||||||||||||
| call_args = mock_pubsub.return_value.publish.call_args | ||||||||||||||
|
|
||||||||||||||
| # Check that ordering_key was passed as a keyword argument | ||||||||||||||
| self.assertIn('ordering_key', call_args.kwargs) | ||||||||||||||
| self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) | ||||||||||||||
|
|
||||||||||||||
| def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub): | ||||||||||||||
| """Test WriteToPubSub with ordering_key but no attributes.""" | ||||||||||||||
| data = b'data' | ||||||||||||||
| ordering_key = 'order-456' | ||||||||||||||
| payloads = [PubsubMessage(data, None, ordering_key=ordering_key)] | ||||||||||||||
|
|
||||||||||||||
| options = PipelineOptions([]) | ||||||||||||||
| options.view_as(StandardOptions).streaming = True | ||||||||||||||
| with TestPipeline(options=options) as p: | ||||||||||||||
| _ = ( | ||||||||||||||
| p | ||||||||||||||
| | Create(payloads) | ||||||||||||||
| | WriteToPubSub( | ||||||||||||||
| 'projects/fakeprj/topics/a_topic', with_attributes=True)) | ||||||||||||||
|
|
||||||||||||||
| # Verify that publish was called with ordering_key | ||||||||||||||
| mock_pubsub.return_value.publish.assert_called() | ||||||||||||||
| call_args = mock_pubsub.return_value.publish.call_args | ||||||||||||||
|
|
||||||||||||||
| # Check that ordering_key was passed | ||||||||||||||
| self.assertIn('ordering_key', call_args.kwargs) | ||||||||||||||
| self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) | ||||||||||||||
|
|
||||||||||||||
| def test_write_messages_without_ordering_key(self, mock_pubsub): | ||||||||||||||
| """Test WriteToPubSub without ordering_key (backward compatibility).""" | ||||||||||||||
| data = b'data' | ||||||||||||||
| attributes = {'key': 'value'} | ||||||||||||||
| payloads = [PubsubMessage(data, attributes)] # No ordering_key | ||||||||||||||
|
|
||||||||||||||
| options = PipelineOptions([]) | ||||||||||||||
| options.view_as(StandardOptions).streaming = True | ||||||||||||||
| with TestPipeline(options=options) as p: | ||||||||||||||
| _ = ( | ||||||||||||||
| p | ||||||||||||||
| | Create(payloads) | ||||||||||||||
| | WriteToPubSub( | ||||||||||||||
| 'projects/fakeprj/topics/a_topic', with_attributes=True)) | ||||||||||||||
|
|
||||||||||||||
| # Verify that publish was called | ||||||||||||||
| mock_pubsub.return_value.publish.assert_called() | ||||||||||||||
|
Comment on lines
+1163
to
+1164
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make this test more robust, you should also verify that
Suggested change
|
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| if __name__ == '__main__': | ||||||||||||||
| logging.getLogger().setLevel(logging.INFO) | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my education, why was this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found it:
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if
Write.this.getPubsubRootUrlsomehow returned Null, would newClient still initialize with the default (as per pipeline options)?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both
PubsubJsonClientandPubsubGrpcClientuseMoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())- so ifWrite.this.getPubsubRootUrl()returns null, it falls back tooptions.getPubsubRootUrl()which has@Default.String("https://pubsub.googleapis.com"). So yes, the default is preserved correctly in the null case. No change needed here unless you'd prefer an explicit null guard for clarity?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, thanks for checking, no changes needed