Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from google.api_core.exceptions import ClientError, GoogleAPICallError
from google.api_core.client_info import ClientInfo
from google.cloud import bigquery as gcp_bigquery
except ImportError:
except Exception:
gcp_bigquery = None
pass

Expand Down
19 changes: 12 additions & 7 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,18 @@ def expand(self, pcoll):
pcoll.element_type).tuple_types)
kv_type_hint = typehints.KV[key_type, value_type]
if kv_type_hint and kv_type_hint != typehints.Any:
coder = coders.registry.get_coder(kv_type_hint).as_deterministic_coder(
f'GroupByEncryptedKey {self.label}'
'The key coder is not deterministic. This may result in incorrect '
'pipeline output. This can be fixed by adding a type hint to the '
'operation preceding the GroupByKey step, and for custom key '
'classes, by writing a deterministic custom Coder. Please see the '
'documentation for more details.')
coder = coders.registry.get_coder(kv_type_hint)
try:
coder = coder.as_deterministic_coder(self.label)
except ValueError:
logging.warning(
'GroupByEncryptedKey %s: '
'The key coder is not deterministic. This may result in incorrect '
'pipeline output. This can be fixed by adding a type hint to the '
'operation preceding the GroupByKey step, and for custom key '
'classes, by writing a deterministic custom Coder. Please see the '
'documentation for more details.',
self.label)
if not coder.is_kv_coder():
raise ValueError(
'Input elements to the transform %s with stateful DoFn must be '
Expand Down
Loading