Skip to content

Commit 62df216

Browse files
authored
Merge pull request #36472 from apache/fix-flink-cogbk
Fix LoadTests Python CoGBK Flink Batch job
2 parents d4438b6 + 590ece2 commit 62df216

6 files changed

Lines changed: 39 additions & 24 deletions

File tree

.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ jobs:
8989
test-type: load
9090
test-language: python
9191
argument-file-paths: |
92-
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
93-
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
94-
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
92+
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
93+
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
94+
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
9595
- name: Start Flink with parallelism 5
9696
env:
9797
FLINK_NUM_WORKERS: 5
@@ -108,28 +108,31 @@ jobs:
108108
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
109109
arguments: |
110110
--info \
111+
-PpythonVersion=3.9 \
111112
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
112-
-Prunner=FlinkRunner \
113+
-Prunner=PortableRunner \
113114
'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} --job_name=load-tests-python-flink-batch-cogbk-1-${{ steps.datetime.outputs.datetime }}' \
114115
- name: run CoGBK 2GB of 100B records with multiple keys
115116
uses: ./.github/actions/gradle-command-self-hosted-action
116117
with:
117118
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
118119
arguments: |
119120
--info \
121+
-PpythonVersion=3.9 \
120122
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
121-
-Prunner=FlinkRunner \
123+
-Prunner=PortableRunner \
122124
'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-2-${{ steps.datetime.outputs.datetime }}' \
123125
- name: run CoGBK reiterate 4 times 10kB values
124126
uses: ./.github/actions/gradle-command-self-hosted-action
125127
with:
126128
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
127129
arguments: |
128130
--info \
131+
-PpythonVersion=3.9 \
129132
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
130-
-Prunner=FlinkRunner \
133+
-Prunner=PortableRunner \
131134
'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-3-${{ steps.datetime.outputs.datetime }}' \
132135
- name: Teardown Flink
133136
if: always()
134137
run: |
135-
${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
138+
${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete

.github/workflows/beam_Publish_Docker_Snapshots.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ jobs:
8383
arguments: |
8484
-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \
8585
-Pdocker-tag-list=${{ github.sha }}${LATEST_TAG}
86-
- name: run Publish Docker Snapshots script for Flink
86+
- name: run Publish Docker Snapshots script for Flink 1.17
8787
uses: ./.github/actions/gradle-command-self-hosted-action
8888
with:
8989
gradle-command: :runners:flink:1.17:job-server-container:dockerPush

.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt renamed to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
--temp_location=gs://temp-storage-for-perf-tests/loadtests
1817
--publish_to_big_query=true
1918
--metrics_dataset=load_test
2019
--metrics_table=python_flink_batch_cogbk_2
2120
--influx_measurement=python_batch_cogbk_2
22-
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
23-
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
21+
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
22+
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
2423
--iterations=1
2524
--parallelism=5
26-
--endpoint=localhost:8099
25+
--runner=PortableRunner
26+
--job_endpoint=localhost:8099
2727
--environment_type=DOCKER
28-
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
28+
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest

.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt renamed to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
--temp_location=gs://temp-storage-for-perf-tests/loadtests
1817
--publish_to_big_query=true
1918
--metrics_dataset=load_test
2019
--metrics_table=python_flink_batch_cogbk_1
2120
--influx_measurement=python_batch_cogbk_1
22-
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
23-
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
21+
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
22+
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":100,\\"hot_key_fraction\\":1}''
2423
--iterations=1
2524
--parallelism=5
26-
--endpoint=localhost:8099
25+
--runner=PortableRunner
26+
--job_endpoint=localhost:8099
2727
--environment_type=DOCKER
28-
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
28+
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest

.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt renamed to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
--temp_location=gs://temp-storage-for-perf-tests/loadtests
1817
--publish_to_big_query=true
1918
--metrics_dataset=load_test
2019
--metrics_table=python_flink_batch_cogbk_3
2120
--influx_measurement=python_batch_cogbk_3
22-
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
23-
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
21+
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
22+
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
2423
--iterations=4
2524
--parallelism=5
26-
--endpoint=localhost:8099
25+
--runner=PortableRunner
26+
--job_endpoint=localhost:8099
2727
--environment_type=DOCKER
28-
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
28+
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest

sdks/python/apache_beam/runners/portability/prism_runner.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import logging
2929
import os
3030
import platform
31+
import re
3132
import shutil
3233
import stat
3334
import subprocess
@@ -121,7 +122,18 @@ def filter(self, record):
121122
try:
122123
message = record.getMessage()
123124
json_record = json.loads(message)
124-
record.levelno = getattr(logging, json_record["level"])
125+
level_str = json_record["level"]
126+
# Example level with offset: 'ERROR+2'
127+
if "+" in level_str or "-" in level_str:
128+
match = re.match(r"([A-Z]+)([+-]\d+)", level_str)
129+
if match:
130+
base, offset = match.groups()
131+
base_level = getattr(logging, base, logging.INFO)
132+
record.levelno = base_level + int(offset)
133+
else:
134+
record.levelno = getattr(logging, level_str, logging.INFO)
135+
else:
136+
record.levelno = getattr(logging, level_str, logging.INFO)
125137
record.levelname = logging.getLevelName(record.levelno)
126138
if "source" in json_record:
127139
record.funcName = json_record["source"]["function"]

0 commit comments

Comments
 (0)