Skip to content

Commit 9076b1a

Browse files
authored
Add labels to unlabeled transforms (#36381)
* Add labels to unlabeled transforms * yapf * context + a few more * correct a few lines * Undo bad edit
1 parent 1a6ec3a commit 9076b1a

3 files changed

Lines changed: 70 additions & 17 deletions

File tree

sdks/python/apache_beam/transforms/core.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,11 +2352,15 @@ def expand(pcoll):
23522352
else:
23532353
return pcoll
23542354

2355+
# Map(lambda) produces a label formatted like this, but it cannot be
2356+
# changed without breaking update compat. Here, we pin to the transform
2357+
# name used in the 2.68 release to avoid breaking changes when the line
2358+
# number changes. Context: https://github.com/apache/beam/pull/36381
23552359
input_count_view = pcoll | 'CountTotal' >> (
2356-
MaybeWindow() | Map(lambda _: 1)
2360+
MaybeWindow() | "Map(<lambda at core.py:2346>)" >> Map(lambda _: 1)
23572361
| CombineGlobally(sum).as_singleton_view())
23582362
bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
2359-
MaybeWindow() | Map(lambda _: 1)
2363+
MaybeWindow() | "Map(<lambda at core.py:2349>)" >> Map(lambda _: 1)
23602364
| CombineGlobally(sum).without_defaults())
23612365

23622366
def check_threshold(bad, total, threshold, window=DoFn.WindowParam):
@@ -3538,9 +3542,14 @@ def default_label(self):
35383542

35393543
def expand(self, pcoll):
35403544
input_type = pcoll.element_type or typing.Any
3545+
# Map(lambda) produces a label formatted like this, but it cannot be
3546+
# changed without breaking update compat. Here, we pin to the transform
3547+
# name used in the 2.68 release to avoid breaking changes when the line
3548+
# number changes. Context: https://github.com/apache/beam/pull/36381
35413549
return (
35423550
pcoll
3543-
| Map(lambda x: (self._key_func()(x), x)).with_output_types(
3551+
| "Map(<lambda at core.py:3503>)" >>
3552+
Map(lambda x: (self._key_func()(x), x)).with_output_types(
35443553
typehints.Tuple[self._key_type_hint(input_type), input_type])
35453554
| GroupByKey())
35463555

@@ -3595,14 +3604,19 @@ def expand(self, pcoll):
35953604
key_type_hint = self._grouping.force_tuple_keys(True)._key_type_hint(
35963605
pcoll.element_type)
35973606

3607+
# Map(lambda) produces a label formatted like this, but it cannot be
3608+
# changed without breaking update compat. Here, we pin to the transform
3609+
# name used in the 2.68 release to avoid breaking changes when the line
3610+
# number changes. Context: https://github.com/apache/beam/pull/36381
35983611
return (
35993612
pcoll
3600-
| Map(lambda x: (key_func(x), value_func(x))).with_output_types(
3613+
| "Map(<lambda at core.py:3560>)" >>
3614+
Map(lambda x: (key_func(x), value_func(x))).with_output_types(
36013615
typehints.Tuple[key_type_hint, typing.Any])
36023616
| CombinePerKey(
36033617
TupleCombineFn(
36043618
*[combine_fn for _, combine_fn, __ in self._aggregations]))
3605-
| MapTuple(
3619+
| "MapTuple(<lambda at core.py:3565>)" >> MapTuple(
36063620
lambda key, value: _dynamic_named_tuple('Result', result_fields)
36073621
(*(key + value))))
36083622

@@ -3618,7 +3632,7 @@ class Select(PTransform):
36183632
36193633
is the same as
36203634
3621-
pcoll | beam.Map(lambda x: beam.Row(a=x.a, b=foo(x)))
3635+
pcoll | 'label' >> beam.Map(lambda x: beam.Row(a=x.a, b=foo(x)))
36223636
"""
36233637
def __init__(
36243638
self,
@@ -3640,8 +3654,13 @@ def default_label(self):
36403654
return 'ToRows(%s)' % ', '.join(name for name, _ in self._fields)
36413655

36423656
def expand(self, pcoll):
3657+
# Map(lambda) produces a label formatted like this, but it cannot be
3658+
# changed without breaking update compat. Here, we pin to the transform
3659+
# name used in the 2.68 release to avoid breaking changes when the line
3660+
# number changes. Context: https://github.com/apache/beam/pull/36381
36433661
return (
3644-
_MaybePValueWithErrors(pcoll, self._exception_handling_args) | Map(
3662+
_MaybePValueWithErrors(pcoll, self._exception_handling_args)
3663+
| "Map(<lambda at core.py:3605>)" >> Map(
36453664
lambda x: pvalue.Row(
36463665
**{
36473666
name: expr(x)
@@ -4128,10 +4147,15 @@ def expand(self, pcoll):
41284147
else:
41294148
return pcoll
41304149

4150+
# Map(lambda) produces a label formatted like this, but it cannot be
4151+
# changed without breaking update compat. Here, we pin to the transform
4152+
# name used in the 2.68 release to avoid breaking changes when the line
4153+
# number changes. Context: https://github.com/apache/beam/pull/36381
41314154
return (
41324155
pbegin
41334156
| Impulse()
4134-
| FlatMap(lambda _: serialized_values).with_output_types(bytes)
4157+
| "FlatMap(<lambda at core.py:4094>)" >>
4158+
FlatMap(lambda _: serialized_values).with_output_types(bytes)
41354159
| MaybeReshuffle().with_output_types(bytes)
41364160
| Map(self._coder.decode).with_output_types(self.get_output_type()))
41374161

sdks/python/apache_beam/transforms/external_java.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True):
145145
ImplicitSchemaPayloadBuilder({'data': 'middle'}),
146146
expansion_service)
147147
| beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service)
148-
| beam.Map(lambda kv: '%s: %s' % kv))
148+
# Map(lambda) produces a label formatted like this, but it cannot be
149+
# changed without breaking update compat. Here, we pin to the transform
150+
# name used in the 2.68 release to avoid breaking changes when the line
151+
# number changes. Context: https://github.com/apache/beam/pull/36381
152+
| "Map(<lambda at external_java.py:148>)" >>
153+
beam.Map(lambda kv: '%s: %s' % kv))
149154

150155
assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2']))
151156

sdks/python/apache_beam/transforms/util.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,13 +1441,18 @@ def WithKeys(pcoll, k, *args, **kwargs):
14411441
if all(isinstance(arg, AsSideInput)
14421442
for arg in args) and all(isinstance(kwarg, AsSideInput)
14431443
for kwarg in kwargs.values()):
1444-
return pcoll | Map(
1444+
# Map(lambda) produces a label formatted like this, but it cannot be
1445+
# changed without breaking update compat. Here, we pin to the transform
1446+
# name used in the 2.68 release to avoid breaking changes when the line
1447+
# number changes. Context: https://github.com/apache/beam/pull/36381
1448+
return pcoll | "Map(<lambda at util.py:1189>)" >> Map(
14451449
lambda v, *args, **kwargs: (k(v, *args, **kwargs), v),
14461450
*args,
14471451
**kwargs)
1448-
return pcoll | Map(lambda v: (k(v, *args, **kwargs), v))
1449-
return pcoll | Map(lambda v: (k(v), v))
1450-
return pcoll | Map(lambda v: (k, v))
1452+
return pcoll | "Map(<lambda at util.py:1192>)" >> Map(
1453+
lambda v: (k(v, *args, **kwargs), v))
1454+
return pcoll | "Map(<lambda at util.py:1193>)" >> Map(lambda v: (k(v), v))
1455+
return pcoll | "Map(<lambda at util.py:1194>)" >> Map(lambda v: (k, v))
14511456

14521457

14531458
@typehints.with_input_types(tuple[K, V])
@@ -1527,7 +1532,11 @@ def __init__(
15271532

15281533
def expand(self, pcoll):
15291534
key_type, value_type = pcoll.element_type.tuple_types
1530-
sharded_pcoll = pcoll | Map(
1535+
# Map(lambda) produces a label formatted like this, but it cannot be
1536+
# changed without breaking update compat. Here, we pin to the transform
1537+
# name used in the 2.68 release to avoid breaking changes when the line
1538+
# number changes. Context: https://github.com/apache/beam/pull/36381
1539+
sharded_pcoll = pcoll | "Map(<lambda at util.py:1275>)" >> Map(
15311540
lambda key_value: (
15321541
ShardedKey(
15331542
key_value[0],
@@ -2032,7 +2041,12 @@ def replace_all(pcoll, regex, replacement):
20322041
replacement: the string to be substituted for each match.
20332042
"""
20342043
regex = Regex._regex_compile(regex)
2035-
return pcoll | Map(lambda elem: regex.sub(replacement, elem))
2044+
# Map(lambda) produces a label formatted like this, but it cannot be
2045+
# changed without breaking update compat. Here, we pin to the transform
2046+
# name used in the 2.68 release to avoid breaking changes when the line
2047+
# number changes. Context: https://github.com/apache/beam/pull/36381
2048+
return pcoll | "Map(<lambda at util.py:1779>)" >> Map(
2049+
lambda elem: regex.sub(replacement, elem))
20362050

20372051
@staticmethod
20382052
@typehints.with_input_types(str)
@@ -2048,7 +2062,12 @@ def replace_first(pcoll, regex, replacement):
20482062
replacement: the string to be substituted for each match.
20492063
"""
20502064
regex = Regex._regex_compile(regex)
2051-
return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1))
2065+
# Map(lambda) produces a label formatted like this, but it cannot be
2066+
# changed without breaking update compat. Here, we pin to the transform
2067+
# name used in the 2.68 release to avoid breaking changes when the line
2068+
# number changes. Context: https://github.com/apache/beam/pull/36381
2069+
return pcoll | "Map(<lambda at util.py:1795>)" >> Map(
2070+
lambda elem: regex.sub(replacement, elem, 1))
20522071

20532072
@staticmethod
20542073
@typehints.with_input_types(str)
@@ -2139,4 +2158,9 @@ def expand(self, pcoll):
21392158
| f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey()))
21402159
for (ix, side) in enumerate(self._to_be_waited_on)
21412160
]
2142-
return pcoll | beam.Map(lambda x, *unused_sides: x, *sides)
2161+
# Map(lambda) produces a label formatted like this, but it cannot be
2162+
# changed without breaking update compat. Here, we pin to the transform
2163+
# name used in the 2.68 release to avoid breaking changes when the line
2164+
# number changes. Context: https://github.com/apache/beam/pull/36381
2165+
return pcoll | "Map(<lambda at util.py:1886>)" >> beam.Map(
2166+
lambda x, *unused_sides: x, *sides)

0 commit comments

Comments
 (0)