Skip to content

Commit a7a019d

Browse files
committed
[Spark Runner] Hoist Spark 3 structured-streaming to shared base, adopt Flink-style version overrides
Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3 shipped) into the shared runners/spark/src/, and replace the existing copySourceBase block in runners/spark/spark_runner.gradle with the per-version source-overrides layout used by runners/flink/flink_runner.gradle: the lowest spark_major builds straight from the shared base; higher majors get a Copy task with DuplicatesStrategy.INCLUDE that merges shared + previous majors + ./src so per-version files override. Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree for the Spark 4 runner (#36841 / #38212), which lands as a small overrides layer on top.
1 parent f300e59 commit a7a019d

76 files changed

Lines changed: 96 additions & 25 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

gradle.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,7 @@ docker_image_default_repo_prefix=beam_
4040

4141
# supported flink versions
4242
flink_versions=1.17,1.18,1.19,1.20,2.0
43+
# supported spark versions
44+
spark_versions=3
4345
# supported python versions
4446
python_versions=3.10,3.11,3.12,3.13,3.14

runners/spark/3/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
def basePath = '..'
2020
/* All properties required for loading the Spark build script */
2121
project.ext {
22+
spark_major = '3'
2223
// Spark 3 version as defined in BeamModulePlugin
2324
spark_version = spark3_version
2425
spark_scala_version = '2.12'
25-
copySourceBase = false // disabled to use Spark 3 as primary dev version
2626
archives_base_name = 'beam-runners-spark-3'
2727
}
2828

runners/spark/spark_runner.gradle

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,38 +89,107 @@ def hadoopVersions = [
8989

9090
hadoopVersions.each { kv -> configurations.create("hadoopVersion$kv.key") }
9191

92-
def sourceBase = "${project.projectDir}/../src"
93-
def sourceBaseCopy = "${project.buildDir}/sourcebase/src"
94-
95-
def useCopiedSourceSet = { scope, type, trigger ->
96-
def taskName = "copy${scope.capitalize()}${type.capitalize()}"
97-
trigger.dependsOn tasks.register(taskName, Copy) {
98-
from "$sourceBase/$scope/$type"
99-
into "$sourceBaseCopy/$scope/$type"
100-
duplicatesStrategy DuplicatesStrategy.INCLUDE
92+
/*
93+
* Per-version source overrides (mirrors runners/flink/flink_runner.gradle).
94+
*
95+
* Layout:
96+
* runners/spark/src/ -- shared base (lowest supported version uses these directly)
97+
* runners/spark/<major>/src/ -- version-specific overrides (later overrides win)
98+
*
99+
* The lowest supported `spark_major` builds straight from the shared base.
100+
* Higher versions copy <shared> + <previous majors> + <current> into a single
101+
* source-overrides directory using DuplicatesStrategy.INCLUDE so the current
102+
* version's files override earlier ones.
103+
*/
104+
def base_path = ".."
105+
106+
def overrides = { versions, type, group = 'java' ->
107+
// order matters: later entries override earlier ones during the Copy
108+
["${base_path}/src/${type}/${group}"] +
109+
versions.collect { "${base_path}/${it}/src/${type}/${group}" } +
110+
["./src/${type}/${group}"]
111+
}
112+
113+
def all_versions = spark_versions.split(",")
114+
def previous_versions = all_versions.findAll { it < spark_major }
115+
116+
def main_source_overrides = overrides(previous_versions, "main")
117+
def test_source_overrides = overrides(previous_versions, "test")
118+
def main_resources_overrides = overrides(previous_versions, "main", "resources")
119+
def test_resources_overrides = overrides(previous_versions, "test", "resources")
120+
121+
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
122+
123+
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
124+
copyTask.from main_source_overrides
125+
copyTask.into "${sourceOverridesBase}/main/java"
126+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
127+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
128+
project.ext.excluded_files.main.each { f -> copyTask.exclude "**/${f}" }
101129
}
102-
// append copied sources to srcDirs
103-
sourceSets."$scope"."$type".srcDirs "$sourceBaseCopy/$scope/$type"
104130
}
105131

106-
if (copySourceBase) {
107-
// Copy source base into build directory.
108-
// While this is not necessary, having multiple source sets referencing the same shared base will typically confuse an IDE and harm developer experience.
109-
// The copySourceBase flag can be swapped without any implications and allows to pick a main version that is actively worked on.
110-
useCopiedSourceSet("main", "java", compileJava)
111-
useCopiedSourceSet("main", "resources", processResources)
112-
useCopiedSourceSet("test", "java", compileTestJava)
113-
useCopiedSourceSet("test", "resources", processTestResources)
132+
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
133+
it.from main_resources_overrides
134+
it.into "${sourceOverridesBase}/main/resources"
135+
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
136+
}
137+
138+
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask ->
139+
copyTask.from test_source_overrides
140+
copyTask.into "${sourceOverridesBase}/test/java"
141+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
142+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) {
143+
project.ext.excluded_files.test.each { f -> copyTask.exclude "**/${f}" }
144+
}
145+
}
146+
147+
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
148+
it.from test_resources_overrides
149+
it.into "${sourceOverridesBase}/test/resources"
150+
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
151+
}
152+
153+
def use_override = (spark_major != all_versions.first())
154+
def sourceBase = "${project.projectDir}/../src"
155+
156+
if (use_override) {
157+
compileJava.dependsOn copySourceOverrides
158+
processResources.dependsOn copyResourcesOverrides
159+
compileTestJava.dependsOn copyTestSourceOverrides
160+
processTestResources.dependsOn copyTestResourcesOverrides
161+
162+
def sourcesJar = project.tasks.findByName('sourcesJar')
163+
if (sourcesJar != null) {
164+
sourcesJar.dependsOn copySourceOverrides
165+
sourcesJar.dependsOn copyResourcesOverrides
166+
}
167+
def testSourcesJar = project.tasks.findByName('testSourcesJar')
168+
if (testSourcesJar != null) {
169+
testSourcesJar.dependsOn copyTestSourceOverrides
170+
testSourcesJar.dependsOn copyTestResourcesOverrides
171+
}
172+
// Pin srcDirs explicitly so each higher version sees only its merged overrides tree.
173+
sourceSets {
174+
main {
175+
java { srcDirs = ["${sourceOverridesBase}/main/java"] }
176+
resources { srcDirs = ["${sourceOverridesBase}/main/resources"] }
177+
}
178+
test {
179+
java { srcDirs = ["${sourceOverridesBase}/test/java"] }
180+
resources { srcDirs = ["${sourceOverridesBase}/test/resources"] }
181+
}
182+
}
114183
} else {
115-
// append shared base sources to srcDirs
184+
// Lowest supported Spark version: build straight from the shared base, no copy step.
116185
sourceSets {
117186
main {
118-
java.srcDirs "${sourceBase}/main/java"
119-
resources.srcDirs "${sourceBase}/main/resources"
187+
java { srcDirs = ["${sourceBase}/main/java"] }
188+
resources { srcDirs = ["${sourceBase}/main/resources"] }
120189
}
121190
test {
122-
java.srcDirs "${sourceBase}/test/java"
123-
resources.srcDirs "${sourceBase}/test/resources"
191+
java { srcDirs = ["${sourceBase}/test/java"] }
192+
resources { srcDirs = ["${sourceBase}/test/resources"] }
124193
}
125194
}
126195
}

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/examples/WordCount.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/examples/WordCount.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/package-info.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/package-info.java

File renamed without changes.

0 commit comments

Comments
 (0)