@@ -354,7 +354,7 @@ def _replace_if_needed(self, original_transform_node):
354354 if replacement_transform is original_transform_node .transform :
355355 return
356356 replacement_transform .side_inputs = tuple (
357- original_transform_node .transform . side_inputs )
357+ getattr ( original_transform_node .transform , ' side_inputs' , ()) )
358358
359359 replacement_transform_node = AppliedPTransform (
360360 original_transform_node .parent ,
@@ -1027,7 +1027,9 @@ def visit_transform(self, transform_node):
10271027 # type: (AppliedPTransform) -> None
10281028 if not transform_node .transform :
10291029 return
1030- if transform_node .transform .runner_api_requires_keyed_input ():
1030+ if hasattr (
1031+ transform_node .transform , 'runner_api_requires_keyed_input'
1032+ ) and transform_node .transform .runner_api_requires_keyed_input ():
10311033 pcoll = transform_node .inputs [0 ]
10321034 pcoll .element_type = typehints .coerce_to_kv_type (
10331035 pcoll .element_type , transform_node .full_label )
@@ -1046,7 +1048,7 @@ def visit_transform(self, transform_node):
10461048 == output .element_type .tuple_types [0 ]):
10471049 output .requires_deterministic_key_coder = (
10481050 deterministic_key_coders and transform_node .full_label )
1049- for side_input in transform_node .transform . side_inputs :
1051+ for side_input in getattr ( transform_node .transform , ' side_inputs' , []) :
10501052 if side_input .requires_keyed_input ():
10511053 side_input .pvalue .element_type = typehints .coerce_to_kv_type (
10521054 side_input .pvalue .element_type ,
@@ -1240,10 +1242,10 @@ def __init__(
12401242 # once environment is a first-class citizen in Beam graph and we have
12411243 # access to actual environment, not just an id.
12421244 self .resource_hints = dict (
1243- transform .get_resource_hints ()) if transform else {
1244- } # type: Dict[str, bytes]
1245+ transform .get_resource_hints ()) if transform and hasattr (
1246+ transform , 'get_resource_hints' ) else { } # type: Dict[str, bytes]
12451247
1246- if transform :
1248+ if transform and hasattr ( transform , 'annotations' ) :
12471249 annotations = {
12481250 ** (annotations or {}), ** encode_annotations (transform .annotations ())
12491251 }
@@ -1399,8 +1401,11 @@ def named_inputs(self):
13991401 assert not self .main_inputs and not self .side_inputs
14001402 return {}
14011403 else :
1402- named_inputs = self .transform ._named_inputs (
1403- self .main_inputs , self .side_inputs )
1404+ if hasattr (self .transform , '_named_inputs' ):
1405+ named_inputs = self .transform ._named_inputs (
1406+ self .main_inputs , self .side_inputs )
1407+ else :
1408+ named_inputs = {}
14041409 if not self .parts :
14051410 for name , pc_out in self .outputs .items ():
14061411 if pc_out .producer is not self and pc_out not in named_inputs .values (
@@ -1414,7 +1419,10 @@ def named_outputs(self):
14141419 assert not self .outputs
14151420 return {}
14161421 else :
1417- return self .transform ._named_outputs (self .outputs )
1422+ if hasattr (self .transform , '_named_outputs' ):
1423+ return self .transform ._named_outputs (self .outputs )
1424+ else :
1425+ return {}
14181426
14191427 def to_runner_api (self , context ):
14201428 # type: (PipelineContext) -> beam_runner_api_pb2.PTransform
@@ -1441,7 +1449,9 @@ def transform_to_runner_api(
14411449 context ,
14421450 has_parts = bool (self .parts ),
14431451 named_inputs = self .named_inputs ())
1444- return transform .to_runner_api (context , has_parts = bool (self .parts ))
1452+ elif hasattr (transform , 'to_runner_api' ):
1453+ return transform .to_runner_api (context , has_parts = bool (self .parts ))
1454+ return None
14451455
14461456 # Iterate over inputs and outputs by sorted key order, so that ids are
14471457 # consistently generated for multiple runs of the same pipeline.
@@ -1527,7 +1537,8 @@ def from_runner_api(
15271537 environment_id = None ,
15281538 annotations = proto .annotations )
15291539
1530- if result .transform and result .transform .side_inputs :
1540+ if result .transform and hasattr (
1541+ result .transform , 'side_inputs' ) and result .transform .side_inputs :
15311542 for si , pcoll in zip (result .transform .side_inputs , side_inputs ):
15321543 si .pvalue = pcoll
15331544 result .side_inputs = tuple (result .transform .side_inputs )
0 commit comments