diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 68e8d6922f20..9d85e4d1e664 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -309,7 +309,6 @@ def _run_job(self): message_text=traceback.format_exc())) _LOGGER.exception('Error running pipeline.') self.set_state(beam_job_api_pb2.JobState.FAILED) - raise def _invoke_runner(self): self.set_state(beam_job_api_pb2.JobState.RUNNING) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 43ca6ca3c38c..94a467d5a249 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -528,14 +528,17 @@ def wait_until_finish(self, duration=None): the execution. If None or zero, will wait until the pipeline finishes. :return: The result of the pipeline, i.e. PipelineResult. """ + last_error_text = None + def read_messages() -> None: + nonlocal last_error_text previous_state = -1 for message in self._message_stream: if message.HasField('message_response'): - logging.log( - MESSAGE_LOG_LEVELS[message.message_response.importance], - "%s", - message.message_response.message_text) + mr = message.message_response + logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text) + if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR: + last_error_text = mr.message_text else: current_state = message.state_response.state if current_state != previous_state: @@ -566,6 +569,9 @@ def read_messages() -> None: if self._runtime_exception: raise self._runtime_exception + from apache_beam.runners.runner import PipelineState + if self._state == PipelineState.FAILED: + raise RuntimeError(last_error_text or "Pipeline failed.") return self._state