From 60bd6b4be4d68065026024522e769805e62161f0 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 13 Oct 2025 15:45:24 -0400 Subject: [PATCH 1/3] Softens the GBEK determinism requirement --- sdks/python/apache_beam/transforms/util.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 5af9d904895a..9ae52274cd03 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -546,13 +546,16 @@ def expand(self, pcoll): pcoll.element_type).tuple_types) kv_type_hint = typehints.KV[key_type, value_type] if kv_type_hint and kv_type_hint != typehints.Any: - coder = coders.registry.get_coder(kv_type_hint).as_deterministic_coder( - f'GroupByEncryptedKey {self.label}' - 'The key coder is not deterministic. This may result in incorrect ' - 'pipeline output. This can be fixed by adding a type hint to the ' - 'operation preceding the GroupByKey step, and for custom key ' - 'classes, by writing a deterministic custom Coder. Please see the ' - 'documentation for more details.') + coder = coders.registry.get_coder(kv_type_hint) + try: + coder = coder.as_deterministic_coder() + except ValueError: + logging.warning(f'GroupByEncryptedKey {self.label}: ' + 'The key coder is not deterministic. This may result in incorrect ' + 'pipeline output. This can be fixed by adding a type hint to the ' + 'operation preceding the GroupByKey step, and for custom key ' + 'classes, by writing a deterministic custom Coder. Please see the ' + 'documentation for more details.') if not coder.is_kv_coder(): raise ValueError( 'Input elements to the transform %s with stateful DoFn must be ' From 0d63da14162dac0d30098f44e84254f391ac7b01 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 13 Oct 2025 16:00:41 -0400 Subject: [PATCH 2/3] fmt --- sdks/python/apache_beam/transforms/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9ae52274cd03..5f78d7a8c945 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -550,7 +550,8 @@ def expand(self, pcoll): try: coder = coder.as_deterministic_coder() except ValueError: - logging.warning(f'GroupByEncryptedKey {self.label}: ' + logging.warning( + f'GroupByEncryptedKey {self.label}: ' 'The key coder is not deterministic. This may result in incorrect ' 'pipeline output. This can be fixed by adding a type hint to the ' 'operation preceding the GroupByKey step, and for custom key ' From d0bd8c4f35b5c73d170b5c40f9dc15e6e9e60738 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 13 Oct 2025 16:59:02 -0400 Subject: [PATCH 3/3] Fix lint/test --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 2 +- sdks/python/apache_beam/transforms/util.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 889d3f1e96e3..d2fa7627a800 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -75,7 +75,7 @@ from google.api_core.exceptions import ClientError, GoogleAPICallError from google.api_core.client_info import ClientInfo from google.cloud import bigquery as gcp_bigquery -except ImportError: +except Exception: gcp_bigquery = None pass diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 5f78d7a8c945..182d6faa2271 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -548,15 +548,16 @@ def expand(self, pcoll): if kv_type_hint and kv_type_hint != typehints.Any: coder = coders.registry.get_coder(kv_type_hint) try: - coder = coder.as_deterministic_coder() + coder = coder.as_deterministic_coder(self.label) except ValueError: logging.warning( - f'GroupByEncryptedKey {self.label}: ' + 'GroupByEncryptedKey %s: ' 'The key coder is not deterministic. This may result in incorrect ' 'pipeline output. This can be fixed by adding a type hint to the ' 'operation preceding the GroupByKey step, and for custom key ' 'classes, by writing a deterministic custom Coder. Please see the ' - 'documentation for more details.') + 'documentation for more details.', + self.label) if not coder.is_kv_coder(): raise ValueError( 'Input elements to the transform %s with stateful DoFn must be '