Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,14 @@ def wait(self, timeout=None):

def get(self, timeout=None):
# type: (Optional[float]) -> T
return self._func(*(arg.get(timeout) for arg in self._args))
# List comprehension, not generator: *(gen) causes CPython to build the
# argument tuple incrementally via _PyTuple_Resize, which asserts
# Py_REFCNT(v)==1. A GC cycle between yields can increment that refcount,
# raising SystemError (Objects/tupleobject.c:927). See
# https://github.com/python/cpython/issues/127058 (fixed in 3.14.0a3+:
# https://github.com/python/cpython/commit/5a23994). *[list] allocates the
# tuple once at its final size, avoiding the resize entirely.
return self._func(*[arg.get(timeout) for arg in self._args])

def set(self, value):
# type: (T) -> _Future[T]
Expand Down
32 changes: 32 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,38 @@ def testShortIdAssignment(self):
% case.info)


class DeferredCallTest(unittest.TestCase):
"""Tests for _DeferredCall.get()."""
def test_get_single_arg(self):
f = sdk_worker._Future().set(42)
call = sdk_worker._DeferredCall(lambda x: x, f)
self.assertEqual(call.get(), 42)

def test_get_multiple_args(self):
futures = [sdk_worker._Future().set(i) for i in range(5)]
call = sdk_worker._DeferredCall(lambda *args: sum(args), *futures)
self.assertEqual(call.get(), sum(range(5)))

def test_get_non_future_args_are_wrapped(self):
# __init__ wraps non-Future values in _Future().set(v); get() must work.
call = sdk_worker._DeferredCall(lambda x, y: x * y, 3, 7)
self.assertEqual(call.get(), 21)

def test_get_mixed_future_and_value_args(self):
a = sdk_worker._Future().set(10)
call = sdk_worker._DeferredCall(lambda x, y: x + y, a, 5)
self.assertEqual(call.get(), 15)

def test_get_zero_args(self):
call = sdk_worker._DeferredCall(lambda: 99)
self.assertEqual(call.get(), 99)

def test_get_preserves_return_value_type(self):
f = sdk_worker._Future().set({'key': 'val'})
call = sdk_worker._DeferredCall(lambda d: d, f)
self.assertEqual(call.get(), {'key': 'val'})


def monitoringInfoMetadata(info):
return {
descriptor.name: value
Expand Down
Loading