Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-955] implement lpad/rpad (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored Jun 14, 2022
1 parent e9dfc2d commit 81a1a9e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,15 @@ case class ColumnarSortMergeJoinExec(
// build check for condition
val conditionExpr: Expression = condition.orNull
if (conditionExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val columnarConditionExpr =
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val supportCodegen =
columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
// Columnar SMJ only has codegen version of implementation.
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
// build check types
for (attr <- left.output) {
Expand All @@ -372,12 +380,24 @@ case class ColumnarSortMergeJoinExec(
// build check for expr
if (leftKeys != null) {
for (expr <- leftKeys) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
}
if (rightKeys != null) {
for (expr <- rightKeys) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,26 @@ object ColumnarExpressionConverter extends Logging {
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case slpad: StringLPad =>
ColumnarTernaryOperator.create(
replaceWithColumnarExpression(slpad.str, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(slpad.len, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(slpad.pad, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case srpad: StringRPad =>
ColumnarTernaryOperator.create(
replaceWithColumnarExpression(srpad.str, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(srpad.len, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(srpad.pad, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case sr: StringReplace =>
check_if_no_calculation = false
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
Expand Down Expand Up @@ -541,6 +561,10 @@ object ColumnarExpressionConverter extends Logging {
containsSubquery(sr.replaceExpr)
case conv: Conv =>
conv.children.map(containsSubquery).exists(_ == true)
case lpad: StringLPad =>
lpad.children.map(containsSubquery).exists(_ == true)
case rpad: StringRPad =>
rpad.children.map(containsSubquery).exists(_ == true)
case expr: ScalaUDF if (expr.udfName match {
case Some(name) =>
ColumnarUDF.isSupportedUDF(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class ColumnarIn(value: Expression, list: Seq[Expression], original: Expression)
throw new UnsupportedOperationException(
s"${value.dataType} is not supported in ColumnarIn.")
}
if (list.map(_.isInstanceOf[Literal]).exists(_ == false)) {
throw new UnsupportedOperationException(
"Only Literal Type is supported for the input list!"
)
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,76 @@ class ColumnarRegExpExtract(subject: Expression, regexp: Expression, idx: Expres
}
}

class ColumnarStringLPad(str: Expression, len: Expression, pad: Expression,
original: Expression) extends StringLPad(str: Expression,
len: Expression, pad: Expression) with ColumnarExpression {

buildCheck

def buildCheck: Unit = {
val supportedType = List(StringType)
if (supportedType.indexOf(str.dataType) == -1) {
throw new RuntimeException("Only string type is expected!")
}

if (!pad.isInstanceOf[Literal]) {
throw new UnsupportedOperationException("Only literal regexp" +
" is supported in ColumnarRegExpExtract by now!")
}
}

override def supportColumnarCodegen(args: java.lang.Object): Boolean = {
false
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (str_node, _): (TreeNode, ArrowType) =
str.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (len_node, _): (TreeNode, ArrowType) =
len.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (pad_node, _): (TreeNode, ArrowType) =
pad.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
(TreeBuilder.makeFunction("lpad",
Lists.newArrayList(str_node, len_node, pad_node), resultType), resultType)
}
}

class ColumnarStringRPad(str: Expression, len: Expression, pad: Expression,
original: Expression) extends StringRPad(str: Expression,
len: Expression, pad: Expression) with ColumnarExpression {

buildCheck

def buildCheck: Unit = {
val supportedType = List(StringType)
if (supportedType.indexOf(str.dataType) == -1) {
throw new RuntimeException("Only string type is expected!")
}

if (!pad.isInstanceOf[Literal]) {
throw new UnsupportedOperationException("Only literal regexp" +
" is supported in ColumnarRegExpExtract by now!")
}
}

override def supportColumnarCodegen(args: java.lang.Object): Boolean = {
false
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (str_node, _): (TreeNode, ArrowType) =
str.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (len_node, _): (TreeNode, ArrowType) =
len.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (pad_node, _): (TreeNode, ArrowType) =
pad.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
(TreeBuilder.makeFunction("rpad",
Lists.newArrayList(str_node, len_node, pad_node), resultType), resultType)
}
}

class ColumnarSubstringIndex(strExpr: Expression, delimExpr: Expression,
countExpr: Expression, original: Expression)
extends SubstringIndex(strExpr, delimExpr, countExpr) with ColumnarExpression {
Expand Down Expand Up @@ -310,6 +380,10 @@ object ColumnarTernaryOperator {
new ColumnarStringLocate(src, arg1, arg2, sl)
case re: RegExpExtract =>
new ColumnarRegExpExtract(src, arg1, arg2, re)
case slpad: StringLPad =>
new ColumnarStringLPad(src, arg1, arg2, slpad)
case slpad: StringRPad =>
new ColumnarStringRPad(src, arg1, arg2, slpad)
case substrIndex: SubstringIndex =>
new ColumnarSubstringIndex(src, arg1, arg2, substrIndex)
case _: StringReplace =>
Expand Down

0 comments on commit 81a1a9e

Please sign in to comment.