Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static class Configuration {
private String topic;
private @Nullable String idAttribute;
private @Nullable String timestampAttribute;
private boolean publishWithOrderingKey = false;

public void setTopic(String topic) {
this.topic = topic;
Expand All @@ -65,6 +66,10 @@ public void setIdLabel(@Nullable String idAttribute) {
public void setTimestampAttribute(@Nullable String timestampAttribute) {
this.timestampAttribute = timestampAttribute;
}

public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) {
this.publishWithOrderingKey = publishWithOrderingKey != null && publishWithOrderingKey;
}
}

public static class WriteBuilder
Expand All @@ -85,6 +90,9 @@ public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration config
if (config.timestampAttribute != null) {
writeBuilder.setTimestampAttribute(config.timestampAttribute);
}
if (config.publishWithOrderingKey) {
writeBuilder.setPublishWithOrderingKey(true);
}
writeBuilder.setDynamicDestinations(false);
return writeBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws IOException {
this.pubsubClient =
getPubsubClientFactory()
.newClient(
getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class));
getTimestampAttribute(),
null,
c.getPipelineOptions().as(PubsubOptions.class),
Write.this.getPubsubRootUrl());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, why was this necessary?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it:

PubsubIO.java: fixed a pre-existing bug where PubsubBoundedWriter.startBundle ignored the transform-level pubsubRootUrl, always falling back to pipeline options

Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if Write.this.getPubsubRootUrl somehow returned Null, would newClient still initialize with the default (as per pipeline options)?

Copy link
Copy Markdown
Author

@nikitagrover19 nikitagrover19 Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both PubsubJsonClient and PubsubGrpcClient use MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl()) - so if Write.this.getPubsubRootUrl() returns null, it falls back to options.getPubsubRootUrl() which has @Default.String("https://pubsub.googleapis.com") . So yes, the default is preserved correctly in the null case. No change needed here unless you'd prefer an explicit null guard for clarity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, thanks for checking, no changes needed

}

@ProcessElement
Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/io/external/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def expand(self, pbegin):
# this is not implemented yet on the Java side:
# ('with_attributes', bool),
('timestamp_attribute', typing.Optional[str]),
('publish_with_ordering_key', bool),
])


Expand All @@ -135,6 +136,7 @@ def __init__(
with_attributes=False,
id_label=None,
timestamp_attribute=None,
publish_with_ordering_key=False,
expansion_service=None):
"""Initializes ``WriteToPubSub``.

Expand All @@ -150,18 +152,23 @@ def __init__(
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the value.
publish_with_ordering_key: If True, enables ordering key support when
publishing messages. The ordering key must be set on each
PubsubMessage via the ``ordering_key`` attribute.
"""
self.params = WriteToPubsubSchema(
topic=topic,
id_label=id_label,
# with_attributes=with_attributes,
timestamp_attribute=timestamp_attribute)
timestamp_attribute=timestamp_attribute,
publish_with_ordering_key=publish_with_ordering_key)
self.expansion_service = expansion_service
self.with_attributes = with_attributes

def expand(self, pvalue):
if self.with_attributes:
pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
pcoll = pvalue | 'ToProto' >> Map(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: Map(pubsub.WriteToPubSub.to_proto_str) - was this a typo that was meant to be Map(pubsub.WriteToPubSub.message_to_proto_str) ? The latter seems to have some type checking which probably wouldn't hurt.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, message_to_proto_str is the right choice - it has proper type checking. The original to_proto_str reference was a pre-existing bug (the method doesn't exist on WriteToPubSub), which is why with_attributes=True was broken before this PR. Will switch to message_to_proto_str.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

pubsub.WriteToPubSub.message_to_proto_str)
else:
pcoll = pvalue | 'ToProto' >> Map(
lambda x: pubsub.PubsubMessage(x, {})._to_proto_str())
Expand Down
39 changes: 30 additions & 9 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
"""

# pytype: skip-file

import logging
import re
import time
from typing import Any
from typing import NamedTuple
from typing import Optional
Expand Down Expand Up @@ -430,7 +431,16 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes:
def expand(self, pcoll):
# Store pipeline options for use in DoFn
self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None

# Warn Dataflow users to use the XLang path for ordering key support,
# since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation.
runner = self.pipeline_options.get_all_options().get(
'runner', '') if self.pipeline_options else ''
if 'Dataflow' in str(runner):
logging.warning(
'WriteToPubSub ordering_key support is not available on Dataflow '
'via this transform. Use the XLang WriteToPubSub path instead: '
'apache_beam.io.external.gcp.pubsub.WriteToPubSub with '
'publish_with_ordering_key=True.')
if self.with_attributes:
pcoll = pcoll | 'ToProtobufX' >> ParDo(
_AddMetricsAndMap(
Expand Down Expand Up @@ -597,7 +607,7 @@ def __init__(self, transform):
output_labels_supported = False

# Log debug information for troubleshooting
import logging

runner_info = getattr(
pipeline_options, 'runner',
'None') if pipeline_options else 'No options'
Expand Down Expand Up @@ -628,7 +638,10 @@ def __init__(self, transform):

def setup(self):
from google.cloud import pubsub
self._pub_client = pubsub.PublisherClient()
self._pub_client = pubsub.PublisherClient(
publisher_options=pubsub.types.PublisherOptions(
enable_message_ordering=True,
))
self._topic = self._pub_client.topic_path(
self.project, self.short_topic_name)

Expand All @@ -647,21 +660,29 @@ def _flush(self):
if not self._buffer:
return

import time

# The elements in buffer are serialized protobuf bytes from the previous
# transforms. We need to deserialize them to extract data and attributes.
futures = []
for elem in self._buffer:
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)

# Publish with the correct data and attributes
# Publish with the correct data, attributes, and ordering_key
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Comment on lines 671 to +685
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to improve readability and reduce code duplication. You can determine the attributes and ordering_key to use and then have a single call to self._pub_client.publish. This makes the code easier to maintain.

      attributes = {}
      if self.with_attributes and pubsub_msg.attributes:
        attributes = pubsub_msg.attributes

      if pubsub_msg.ordering_key:
        future = self._pub_client.publish(
            self._topic,
            pubsub_msg.data,
            ordering_key=pubsub_msg.ordering_key,
            **attributes)
      else:
        future = self._pub_client.publish(
            self._topic, pubsub_msg.data, **attributes)

Comment on lines 671 to +685
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to reduce code duplication and improve readability. Instead of separate code paths for messages with and without attributes, you can build a dictionary of keyword arguments for the publish call. This makes the code cleaner and easier to maintain.

      publish_kwargs = {}
      if self.with_attributes and pubsub_msg.attributes:
        publish_kwargs.update(pubsub_msg.attributes)

      if pubsub_msg.ordering_key:
        publish_kwargs['ordering_key'] = pubsub_msg.ordering_key

      future = self._pub_client.publish(
          self._topic, pubsub_msg.data, **publish_kwargs)


futures.append(future)

Expand Down
93 changes: 91 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import uuid

import pytest
from hamcrest.core.core.allof import all_of

from apache_beam.io.gcp import pubsub_it_pipeline
from apache_beam.io.gcp.pubsub import PubsubMessage
from apache_beam.io.gcp.pubsub import WriteToPubSub
Expand All @@ -36,6 +34,7 @@
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from hamcrest.core.core.allof import all_of

INPUT_TOPIC = 'psit_topic_input'
OUTPUT_TOPIC = 'psit_topic_output'
Expand Down Expand Up @@ -305,6 +304,96 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)

@pytest.mark.it_postcommit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will work only in direct runner, but not in dataflow runner - correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also wondering, if we can have a warning for Dataflow that that advises users to use xlang version if they wish to use ordering key. IIRC we have something like that in Java SDK?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - the _flush fix in _PubSubWriteDoFn only applies to Direct Runner since Dataflow overrides this with its own implementation. Will add a comment to the integration test clarifying this scope. For the warning - I'll add a logging.warning in _PubSubWriteDoFn when ordering_key is set on any message, advising Dataflow users to use the WriteToPubSub XLang path instead. Does that approach sound right, or would you prefer it at pipeline construction time (e.g., in WriteToPubSub.expand())?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a comment to the integration test clarifying this scope.

We need to configure the test to run in direct runner suite, but not in dataflow suite.

    if self.runner_name == 'TestDataflowRunner':
      self.skipTest('<Reasons>')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you prefer it at pipeline construction time

We need to show the error at job submission, so at construction time

I think if we put the warning inside _PubSubWriteFun, then the warning emiting code will be serialized and become part of the pipeline definition, but won't get executed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if somehow the pipeline options to determine the runner are not available, we can access them like so:

if opts is None or not opts.is_compat_version_prior_to("2.68.0"):
during constriction time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, to trigger a particular postcommit suite, you can trivially modify a corresponding trigger file, such as https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Python.json

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please trigger this test suite and let me know once this change is ready for review? thanks!

def test_batch_write_with_ordering_key(self):
"""Test WriteToPubSub in batch mode with ordering keys.

Dataflow's Native Pub/Sub Sink does not support ordering_key
(see https://github.com/apache/beam/issues/36201), so this test
only applies to runners using Beam's Python WriteToPubSub Sink.
Dataflow users should use the XLang WriteToPubSub path instead
(apache_beam.io.external.gcp.pubsub.WriteToPubSub with
publish_with_ordering_key=True).
"""
if self.runner_name == 'TestDataflowRunner':
self.skipTest(
'Dataflow Native PubSub Sink does not support ordering_key '
'(see https://github.com/apache/beam/issues/36201). '
'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub '
'with publish_with_ordering_key=True instead.')

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import Create
from google.pubsub_v1.types import Subscription

ordering_topic = self.pub_client.create_topic(
name=self.pub_client.topic_path(
self.project, 'psit_topic_ordering' + self.uuid))
ordering_sub = self.sub_client.create_subscription(
request=Subscription(
name=self.sub_client.subscription_path(
self.project, 'psit_sub_ordering' + self.uuid),
topic=ordering_topic.name,
enable_message_ordering=True,
))
time.sleep(10)

try:
test_messages = [
PubsubMessage(
b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
PubsubMessage(
b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
PubsubMessage(
b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
]

pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = False

with TestPipeline(options=pipeline_options) as p:
messages = p | 'CreateMessages' >> Create(test_messages)
_ = messages | 'WriteToPubSub' >> WriteToPubSub(
ordering_topic.name, with_attributes=True)

time.sleep(10)

# Retry pulling to handle PubSub delivery delays
received_messages = []
deadline = time.time() + 60 # wait up to 60 seconds
while time.time() < deadline:
response = self.sub_client.pull(
request={
'subscription': ordering_sub.name,
'max_messages': 10,
})
received_messages.extend(response.received_messages)
if len(received_messages) >= len(test_messages):
break
time.sleep(5)

self.assertEqual(len(received_messages), len(test_messages))

received_map = {
msg.message.data: msg.message
for msg in received_messages
}
self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1')
self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')

ack_ids = [msg.ack_id for msg in received_messages]
self.sub_client.acknowledge(
request={
'subscription': ordering_sub.name,
'ack_ids': ack_ids,
})
finally:
self.sub_client.delete_subscription(
request={'subscription': ordering_sub.name})
self.pub_client.delete_topic(request={'topic': ordering_topic.name})


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
65 changes: 65 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))

def test_write_messages_with_ordering_key(self, mock_pubsub):
"""Test WriteToPubSub with ordering_key in messages."""
data = b'data'
ordering_key = 'order-123'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called with ordering_key
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args

# Check that ordering_key was passed as a keyword argument
self.assertIn('ordering_key', call_args.kwargs)
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)

def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
"""Test WriteToPubSub with ordering_key but no attributes."""
data = b'data'
ordering_key = 'order-456'
payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called with ordering_key
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args

# Check that ordering_key was passed
self.assertIn('ordering_key', call_args.kwargs)
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)

def test_write_messages_without_ordering_key(self, mock_pubsub):
"""Test WriteToPubSub without ordering_key (backward compatibility)."""
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)] # No ordering_key

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
Comment on lines +1163 to +1164
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make this test more robust, you should also verify that ordering_key is not passed to the publish method when no ordering key is provided in the PubsubMessage. You can do this by checking the call_args of the mock.

Suggested change
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args
self.assertNotIn('ordering_key', call_args.kwargs)



if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading