Skip to content

Commit e0dd1f5

Browse files
committed
address gemini comment
1 parent e4f97db commit e0dd1f5

2 files changed

Lines changed: 45 additions & 8 deletions

File tree

sdks/python/apache_beam/yaml/yaml_mapping.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,25 +243,29 @@ def __init__(self, fields, original_fields, input_schema):
243243
self.passthrough_fields = []
244244

245245
script = []
246-
for name, expr in fields.items():
246+
for i, (name, expr) in enumerate(fields.items()):
247247
if isinstance(expr, str) and expr in input_schema:
248248
self.passthrough_fields.append((name, expr))
249249
continue
250250

251251
if isinstance(expr, str):
252252
expr = {'expression': expr}
253253

254+
# We use numeric indexing (func_{i}) instead of reusing the output field
255+
# name to prevent syntax errors if output names contain spaces or hyphens.
256+
# We also use bracket notation for robustness against input field names
257+
# that aren't compliant dot-access identifiers.
254258
if 'expression' in expr:
255259
e = expr['expression']
256-
code = f"var func_{name} = (__row__) => {{ " + " ".join(
257-
[f"const {n} = __row__.{n};"
260+
code = f"var func_{i} = (__row__) => {{ " + " ".join(
261+
[f"const {n} = __row__['{n}'];"
258262
for n in original_fields if n in e]) + f" return ({e}); }}"
259263
script.append(code)
260-
self.field_funcs[name] = f"func_{name}"
264+
self.field_funcs[name] = f"func_{i}"
261265
elif 'callable' in expr:
262-
code = f"var func_{name} = {expr['callable']}"
266+
code = f"var func_{i} = {expr['callable']}"
263267
script.append(code)
264-
self.field_funcs[name] = f"func_{name}"
268+
self.field_funcs[name] = f"func_{i}"
265269
elif 'path' in expr and 'name' in expr:
266270
path = expr['path']
267271
func_name = expr['name']
@@ -320,8 +324,11 @@ def _get_javascript_udf_code(
320324
udf_code = FileSystems.open(path).read().decode()
321325
return udf_code, name
322326
elif expression:
327+
# We use bracket notation for robustness against field names that
328+
# aren't compliant dot-access identifiers.
323329
udf_code = f"var {function_name} = (__row__) => {{ " + " ".join([
324-
f"const {n} = __row__.{n};" for n in original_fields if n in expression
330+
f"const {n} = __row__['{n}'];"
331+
for n in original_fields if n in expression
325332
]) + f" return ({expression}); }}"
326333
return udf_code, function_name
327334
elif callable:

sdks/python/apache_beam/yaml/yaml_udf_test.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,37 @@ def test_map_to_fields_js_date(self):
397397
expected_date = datetime.datetime(
398398
2026, 4, 17, 18, tzinfo=datetime.timezone.utc)
399399

400-
assert_that(result | as_rows(), equal_to([beam.Row(date=expected_date)]))
400+
@unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.')
401+
def test_map_to_fields_js_special_names(self):
402+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
403+
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
404+
])) as p:
405+
elements = p | beam.Create([beam.Row(label='test')])
406+
result = elements | YamlTransform(
407+
'''
408+
type: MapToFields
409+
config:
410+
language: javascript
411+
fields:
412+
'weird output-name':
413+
expression: "label + '-ok'"
414+
''')
415+
416+
# Verify that it yields a single row with the transformed output.
417+
# We use as_dict here because comparing typed Row with spaces can fail
418+
# downstream assertions if mapped back to a python tuple.
419+
def row_to_dict(r):
420+
return {
421+
k: getattr(r, k)
422+
for k in dir(r)
423+
if not k.startswith('_') and not callable(getattr(r, k))
424+
}
425+
426+
assert_that(
427+
result | beam.Map(lambda r: dict(r._asdict())),
428+
equal_to([{
429+
'weird output-name': 'test-ok'
430+
}]))
401431

402432

403433
if __name__ == '__main__':

0 commit comments

Comments
 (0)