-
Notifications
You must be signed in to change notification settings - Fork 337
feat: Stage based fallback #4519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
karuppayya
wants to merge
8
commits into
apache:main
Choose a base branch
from
karuppayya:COMET-4518
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
be016a0
Stage based fallback
karuppayya 15e6715
Add suites to yml
karuppayya c2b87f1
Fix tests
karuppayya 43d00ef
Address review comments
karuppayya 03fad26
Address review comments
karuppayya 13af676
Fix attempt
karuppayya 13d0160
Address review comments
karuppayya 1f21e24
Fix scalstyle check, revert the tansitions to 2(wfrom 5 which was don…
karuppayya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
179 changes: 179 additions & 0 deletions
179
spark/src/main/scala/org/apache/comet/rules/RevertNativeForTransitionHeavyStages.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.rules | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometExec, CometNativeColumnarToRowExec, CometSparkToColumnarExec} | ||
| import org.apache.spark.sql.execution.{ColumnarToRowExec, ColumnarToRowTransition, RowToColumnarExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.adaptive.QueryStageExec | ||
| import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike} | ||
|
|
||
| import org.apache.comet.CometConf | ||
| import org.apache.comet.CometSparkSessionExtensions.withFallbackReason | ||
|
|
||
| /** | ||
| * Reverts a query stage to Spark row-based execution when it has too many columnar-to-row (C2R) | ||
| * transitions. Each C2R indicates Comet could not keep execution columnar and had to fall back. | ||
| * With columnar shuffle enabled, each C2R implies a corresponding R2C round-trip. | ||
| */ | ||
| case class RevertNativeForTransitionHeavyStages(session: SparkSession) | ||
| extends Rule[SparkPlan] | ||
| with Logging { | ||
|
|
||
| private def enabled = CometConf.COMET_EXEC_TRANSITION_REVERT_ENABLED.get() | ||
| private def maxTransitions = CometConf.COMET_EXEC_TRANSITION_REVERT_MAX_TRANSITIONS.get() | ||
|
|
||
| override def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!enabled) return plan | ||
|
|
||
| if (session.sessionState.conf.adaptiveExecutionEnabled) { | ||
| applyForAQE(plan) | ||
| } else { | ||
| applyForNonAQE(plan) | ||
| } | ||
| } | ||
|
|
||
| private def applyForAQE(plan: SparkPlan): SparkPlan = { | ||
| plan match { | ||
| case _: BroadcastExchangeLike => plan | ||
| case exchange: ShuffleExchangeLike => | ||
| revertStageIfNeeded(exchange.child, exchange.supportsColumnar) | ||
| .map(reverted => exchange.withNewChildren(Seq(reverted))) | ||
| .getOrElse(plan) | ||
| case _ => | ||
| // Result stage: its output is collected as rows, so no consumer requires columnar input | ||
| // and the reverted stage needs no trailing R2C. | ||
| revertStageIfNeeded(plan, outputColumnar = false).getOrElse(plan) | ||
| } | ||
| } | ||
|
|
||
| private def applyForNonAQE(plan: SparkPlan): SparkPlan = { | ||
| val withRevertedStages = plan.transformUp { case exchange: ShuffleExchangeLike => | ||
| revertStageIfNeeded(exchange.child, exchange.supportsColumnar) | ||
| .map(reverted => exchange.withNewChildren(Seq(reverted))) | ||
| .getOrElse(exchange) | ||
| } | ||
| revertStageIfNeeded(withRevertedStages, outputColumnar = false) | ||
| .getOrElse(withRevertedStages) | ||
| } | ||
|
|
||
| /** | ||
| * Reverts the stage if C2R count exceeds threshold. Wraps in R2C if exchange needs columnar. | ||
| */ | ||
| private def revertStageIfNeeded( | ||
| stagePlan: SparkPlan, | ||
| outputColumnar: Boolean): Option[SparkPlan] = { | ||
| val transitionCount = countTransitions(stagePlan) | ||
| if (transitionCount <= maxTransitions) return None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make a user friendly response it would be better to use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
|
||
| val reason = | ||
| s"Stage reverted: $transitionCount C2R transitions exceed threshold $maxTransitions" | ||
|
|
||
| val reverted = revertToSpark(stagePlan) | ||
| val result = if (outputColumnar && !reverted.supportsColumnar) { | ||
| RowToColumnarExec(withFallbackReason(reverted, reason)) | ||
| } else { | ||
| withFallbackReason(reverted, reason) | ||
| } | ||
| Some(result) | ||
| } | ||
|
|
||
| /** | ||
| * A node that marks the boundary between this stage and an adjacent one. | ||
| */ | ||
| private def isStageBoundary(plan: SparkPlan): Boolean = plan match { | ||
| case _: QueryStageExec | _: ShuffleExchangeLike | _: BroadcastExchangeLike => true | ||
| case _ => false | ||
| } | ||
|
|
||
| /** | ||
| * Like `transformDown`, never descends stage-boundary children. | ||
| */ | ||
| private def transformStageDown(plan: SparkPlan)( | ||
| rule: PartialFunction[SparkPlan, SparkPlan]): SparkPlan = { | ||
| val transformed = rule.applyOrElse(plan, identity[SparkPlan]) | ||
| val newChildren = transformed.children.map { child => | ||
| if (isStageBoundary(child)) child else transformStageDown(child)(rule) | ||
| } | ||
| if (newChildren == transformed.children) transformed | ||
| else transformed.withNewChildren(newChildren) | ||
| } | ||
|
|
||
| /** Like `transformUp`, never descends stage-boundary children. */ | ||
| private def transformStageUp(plan: SparkPlan)( | ||
| rule: PartialFunction[SparkPlan, SparkPlan]): SparkPlan = { | ||
| val newChildren = plan.children.map { child => | ||
| if (isStageBoundary(child)) child else transformStageUp(child)(rule) | ||
| } | ||
| val withNewChildren = | ||
| if (newChildren == plan.children) plan else plan.withNewChildren(newChildren) | ||
| rule.applyOrElse(withNewChildren, identity[SparkPlan]) | ||
| } | ||
|
|
||
| /** Counts C2R transitions within this stage, stopping at stage boundaries. */ | ||
| private[rules] def countTransitions(plan: SparkPlan): Int = { | ||
| var count = 0 | ||
| def visit(node: SparkPlan): Unit = node match { | ||
| case _ if isStageBoundary(node) => () | ||
| case _: ColumnarToRowTransition => | ||
| count += 1 | ||
| node.children.foreach(visit) | ||
| case _ => | ||
| node.children.foreach(visit) | ||
| } | ||
| visit(plan) | ||
| count | ||
| } | ||
|
|
||
| private[rules] def revertToSpark(plan: SparkPlan): SparkPlan = { | ||
| val stripped = transformStageDown(plan) { | ||
| case CometNativeColumnarToRowExec(child) => child | ||
| case CometColumnarToRowExec(child) => child | ||
| case ColumnarToRowExec(child) => child | ||
| case sparkToColumnar: CometSparkToColumnarExec => sparkToColumnar.child | ||
| case RowToColumnarExec(child) => child | ||
| } | ||
| val reverted = transformStageUp(stripped) { case cometExec: CometExec => | ||
| if (cometExec.originalPlan.children.size == cometExec.children.size) { | ||
| cometExec.originalPlan.withNewChildren(cometExec.children) | ||
| } else { | ||
| logWarning( | ||
| "Comet plan and original have different child count for " + | ||
| s"${cometExec.getClass.getSimpleName}, using originalPlan as-is.") | ||
| cometExec.originalPlan | ||
| } | ||
| } | ||
| insertTransitions(reverted) | ||
| } | ||
|
|
||
| private def insertTransitions(plan: SparkPlan): SparkPlan = { | ||
| // transformStageUp never descends into stage-boundary nodes (QueryStageExec, exchanges), so | ||
| // this only needs to bridge row nodes that still have a columnar child within the stage. | ||
| transformStageUp(plan) { | ||
| case node if !node.supportsColumnar => | ||
| val newChildren = node.children.map { child => | ||
| if (child.supportsColumnar) ColumnarToRowExec(child) else child | ||
|
parthchandra marked this conversation as resolved.
|
||
| } | ||
| if (newChildren != node.children) node.withNewChildren(newChildren) else node | ||
| } | ||
| } | ||
| } | ||
|
parthchandra marked this conversation as resolved.
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there no
case _ =>arm like in the AQE case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handles the final stage, since case match on
Exchnageabove(usingtransformUp) cannot catch it, which is the equivalent ofcase _(ResultStage) in AQE flow