Skip to content

Commit 3a962f7

Browse files
authored
feat: add support for parallel kind processing with threads (#840)
* refactor: pass executor to _load_tasks_parallel This will make a subsequent commit that adds support for a ThreadPoolExecutor cleaner. * fix: make schema code thread-safe with locking Voluptuous contains thread-unsafe code; work around this by locking before calling into it. * feat: add support for parallel kind processing with threads Even with 3.14 free-threaded python, this is still a bit slower than multiprocessing on Linux, but it will allow us to start experimenting with it more, and may allow users on macOS and Windows to immediately see a speed-up. * feat: add multithreading tests
1 parent 9a27f0b commit 3a962f7

File tree

5 files changed

+121
-41
lines changed

5 files changed

+121
-41
lines changed

src/taskgraph/generator.py

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from concurrent.futures import (
1212
FIRST_COMPLETED,
1313
ProcessPoolExecutor,
14+
ThreadPoolExecutor,
1415
wait,
1516
)
1617
from dataclasses import dataclass
@@ -312,15 +313,13 @@ def _load_tasks_serial(self, kinds, kind_graph, parameters):
312313

313314
return all_tasks
314315

315-
def _load_tasks_parallel(self, kinds, kind_graph, parameters):
316+
def _load_tasks_parallel(self, kinds, kind_graph, parameters, executor):
316317
all_tasks = {}
317318
futures_to_kind = {}
318319
futures = set()
319320
edges = set(kind_graph.edges)
320321

321-
with ProcessPoolExecutor(
322-
mp_context=multiprocessing.get_context("fork")
323-
) as executor:
322+
with executor:
324323

325324
def submit_ready_kinds():
326325
"""Create the next batch of tasks for kinds without dependencies."""
@@ -434,29 +433,50 @@ def _run(self):
434433
yield "kind_graph", kind_graph
435434

436435
logger.info("Generating full task set")
437-
# The short version of the below is: we only support parallel kind
438-
# processing on Linux.
436+
437+
# The next block deals with enabling parallel kind processing, which
438+
# currently has different support on different platforms. In summary:
439+
# * Parallel kind processing is supported and enabled by default on
440+
# Linux. We use multiple processes by default, but experimental
441+
# support for multiple threads can be enabled instead.
442+
# * On other platforms, we have experimental support for parallel
443+
# kind processing with multiple threads.
439444
#
440-
# Current parallel generation relies on multiprocessing, and more
441-
# specifically: the "fork" multiprocessing method. This is not supported
442-
# at all on Windows (it uses "spawn"). Forking is supported on macOS,
443-
# but no longer works reliably in all cases, and our usage of it here
444-
# causes crashes. See https://github.com/python/cpython/issues/77906
445-
# and http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
446-
# for more details on that.
447-
# Other methods of multiprocessing (both "spawn" and "forkserver")
448-
# do not work for our use case, because they cause global variables
449-
# to be reinitialized, which are sometimes modified earlier in graph
450-
# generation. These issues can theoretically be worked around by
451-
# eliminating all reliance on globals as part of task generation, but
452-
# is far from a small amount of work in users like Gecko/Firefox.
453-
# In the long term, the better path forward is likely to be switching
454-
# to threading with a free-threaded python to achieve similar parallel
455-
# processing.
456-
if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"):
457-
all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters)
458-
else:
459-
all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters)
445+
# On all platforms serial kind processing can be enabled by setting
446+
# TASKGRAPH_SERIAL in the environment.
447+
#
448+
# On all platforms, multiple threads can be enabled by setting
449+
# TASKGRAPH_USE_THREADS in the environment. Taskgraph must be running
450+
# from a free-threaded Python build to see any performance benefits.
451+
#
452+
# In the long term, the goal is turn enabled parallel kind processing for
453+
# all platforms by default using threads, and remove support for multiple
454+
# processes altogether.
455+
def load_tasks():
456+
if platform.system() == "Linux":
457+
if os.environ.get("TASKGRAPH_SERIAL"):
458+
return self._load_tasks_serial(kinds, kind_graph, parameters)
459+
elif os.environ.get("TASKGRAPH_USE_THREADS"):
460+
executor = ThreadPoolExecutor(max_workers=os.process_cpu_count())
461+
else:
462+
executor = ProcessPoolExecutor(
463+
mp_context=multiprocessing.get_context("fork")
464+
)
465+
return self._load_tasks_parallel(
466+
kinds, kind_graph, parameters, executor
467+
)
468+
else:
469+
if os.environ.get("TASKGRAPH_SERIAL") or not os.environ.get(
470+
"TASKGRAPH_USE_THREADS"
471+
):
472+
return self._load_tasks_serial(kinds, kind_graph, parameters)
473+
else:
474+
executor = ThreadPoolExecutor(max_workers=os.process_cpu_count())
475+
return self._load_tasks_parallel(
476+
kinds, kind_graph, parameters, executor
477+
)
478+
479+
all_tasks = load_tasks()
460480

461481
full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset()))
462482
yield self.verify("full_task_set", full_task_set, graph_config, parameters)

src/taskgraph/util/schema.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pprint
66
import re
7+
import threading
78
from collections.abc import Mapping
89
from functools import reduce
910
from typing import Any, Literal, Optional, Union
@@ -14,6 +15,8 @@
1415
import taskgraph
1516
from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path
1617

18+
_schema_creation_lock = threading.RLock()
19+
1720
# Common type definitions that are used across multiple schemas
1821
TaskPriority = Literal[
1922
"highest", "very-high", "high", "medium", "low", "very-low", "lowest"
@@ -253,28 +256,36 @@ class LegacySchema(voluptuous.Schema):
253256
"""
254257
Operates identically to voluptuous.Schema, but applying some taskgraph-specific checks
255258
in the process.
259+
260+
voluptuous.Schema's `_compile` method is thread-unsafe. Any usage (whether direct or
261+
indirect) of it must be protected by a lock.
256262
"""
257263

258264
def __init__(self, *args, check=True, **kwargs):
259-
super().__init__(*args, **kwargs)
265+
with _schema_creation_lock:
266+
# this constructor may call `_compile`
267+
super().__init__(*args, **kwargs)
260268

261-
self.check = check
262-
if not taskgraph.fast and self.check:
263-
check_schema(self)
269+
self.check = check
270+
if not taskgraph.fast and self.check:
271+
check_schema(self)
264272

265273
def extend(self, *args, **kwargs):
266-
schema = super().extend(*args, **kwargs)
274+
with _schema_creation_lock:
275+
# `extend` may create a new Schema object, which may call `_compile`
276+
schema = super().extend(*args, **kwargs)
267277

268-
if self.check:
269-
check_schema(schema)
270-
# We want twice extend schema to be checked too.
271-
schema.__class__ = LegacySchema
272-
return schema
278+
if self.check:
279+
check_schema(schema)
280+
# We want twice extend schema to be checked too.
281+
schema.__class__ = LegacySchema
282+
return schema
273283

274284
def _compile(self, schema):
275-
if taskgraph.fast:
276-
return
277-
return super()._compile(schema)
285+
with _schema_creation_lock:
286+
if taskgraph.fast:
287+
return
288+
return super()._compile(schema)
278289

279290
def __getitem__(self, item):
280291
return self.schema[item] # type: ignore

taskcluster/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ treeherder:
77
'check': 'Checks and lints'
88
'doc': 'Documentation tasks'
99
'unit': 'Unit test tasks'
10+
'unit-multithread': 'Unit test tasks with multithreading enabled'
1011
'integration': 'Integration test tasks'
1112

1213
index:

taskcluster/kinds/test/linux.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,26 @@ unit:
5050
command: >-
5151
uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv
5252
53+
unit-multithread:
54+
description: "Run unit tests with py{matrix[python]} on Linux with multithreading enabled"
55+
matrix:
56+
set-name: "unit-multithread-py{matrix[python]}"
57+
substitution-fields: [description, run.command, treeherder, worker, attributes]
58+
python: ["314t"]
59+
worker:
60+
docker-image: {in-tree: python}
61+
env:
62+
TASKGRAPH_USE_THREADS: "1"
63+
artifacts:
64+
- type: file
65+
path: "/builds/worker/artifacts/coverage"
66+
name: "public/coverage.py{matrix[python]}"
67+
treeherder:
68+
symbol: unit-multithread(py{matrix[python]})
69+
run:
70+
command: >-
71+
uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv
72+
5373
integration:
5474
description: "Run unit tests with py{matrix[python]} on Linux with resolution {matrix[resolution]}"
5575
attributes:

test/test_generator.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55

6+
import os
67
import platform
78
from concurrent.futures import ProcessPoolExecutor
89

@@ -14,9 +15,13 @@
1415
from taskgraph.loader.default import loader as default_loader
1516

1617
linuxonly = pytest.mark.skipif(
17-
platform.system() != "Linux",
18+
platform.system() != "Linux" or os.environ.get("TASKGRAPH_USE_THREADS"),
1819
reason="requires Linux and 'fork' multiprocessing support",
1920
)
21+
threadsonly = pytest.mark.skipif(
22+
not os.environ.get("TASKGRAPH_USE_THREADS"),
23+
reason="requires multithreading to be enabled",
24+
)
2025

2126

2227
class FakePPE(ProcessPoolExecutor):
@@ -27,8 +32,16 @@ def submit(self, kind_load_tasks, *args):
2732
return super().submit(kind_load_tasks, *args)
2833

2934

35+
class FakeTPE(ProcessPoolExecutor):
36+
loaded_kinds = []
37+
38+
def submit(self, kind_load_tasks, *args):
39+
self.loaded_kinds.append(kind_load_tasks.__self__.name)
40+
return super().submit(kind_load_tasks, *args)
41+
42+
3043
@linuxonly
31-
def test_kind_ordering(mocker, maketgg):
44+
def test_kind_ordering_multiprocess(mocker, maketgg):
3245
"When task kinds depend on each other, they are loaded in postorder"
3346
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
3447
tgg = maketgg(
@@ -42,6 +55,21 @@ def test_kind_ordering(mocker, maketgg):
4255
assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
4356

4457

58+
@threadsonly
59+
def test_kind_ordering_multithread(mocker, maketgg):
60+
"When task kinds depend on each other, they are loaded in postorder"
61+
mocked_tpe = mocker.patch.object(generator, "ThreadPoolExecutor", new=FakeTPE)
62+
tgg = maketgg(
63+
kinds=[
64+
("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}),
65+
("_fake2", {"kind-dependencies": ["_fake1"]}),
66+
("_fake1", {"kind-dependencies": []}),
67+
]
68+
)
69+
tgg._run_until("full_task_set")
70+
assert mocked_tpe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
71+
72+
4573
def test_full_task_set(maketgg):
4674
"The full_task_set property has all tasks"
4775
tgg = maketgg()

0 commit comments

Comments
 (0)