Skip to content

Commit 813303b

Browse files
TebaleloSbenedeki
andauthored
Rename error handling to error handler:113 (#114)
* Fixes #113 - Refactored ErrorHandling to ErrorHandler and ErrorHandlingFilteringErrorRows to ErrorHandlerFilteringErrorRows * Fixes #113 - Rolled back some changes * Fixes #113 - Refactored ErrorHandlingFilteringErrorRows to ErrorHandlerFilteringErrorRows * Fixes #113 - Refactored ErrorHandlingIgnoringErrors to ErrorHandlerIgnoringErrors * Fixes #113 - Refactored ErrorMessageArray to ErrorHandlerErrorMessageIntoArray * Fixes #113 - Refactored ErrorMessageArray Object to ErrorHandlerErrorMessageIntoArray * Fixes #113 - Renamed errorHandling package to errorHandler * Fixes #113 - Renamed DataFrameErrorHandlingImplicit to DataFrameErrorHandlerImplicit * Fixes #113 * Fixes #113 - fixed some documentation errors * Update spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandler/DataFrameErrorHandlerImplicitTest.scala Co-authored-by: David Benedeki <[email protected]> * Update spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandler/implementations/ErrorHandlerFilteringErrorRowsTest.scala Co-authored-by: David Benedeki <[email protected]> * Fixes #113 - fixed some documentation typo and added more information on ErrorHandler library in the README.md --------- Co-authored-by: David Benedeki <[email protected]>
1 parent 576985f commit 813303b

29 files changed

+194
-171
lines changed

README.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ _Json Utils_ provides methods for working with Json, both on input and output.
104104

105105
_ColumnImplicits_ provide implicit methods for transforming Spark Columns
106106

107-
1. Transforms the column into a booleaan column, checking if values are negative or positive infinity
107+
1. Transforms the column into a boolean column, checking if values are negative or positive infinity
108108

109109
```scala
110110
column.isInfinite()
@@ -424,10 +424,18 @@ path even of nested fields. It also evaluates arrays and maps where the array in
424424
def nul_coll(dataType: DataType): Column
425425
```
426426

427-
## Error Handling
427+
## Error Handler
428428

429-
A `trait` and a set of supporting classes and other traits to enable errrors channeling between libraries and
430-
application during Spark data processing.
429+
A `trait` and a set of supporting classes and other traits to enable errors channeling between libraries and
430+
application during Spark data processing.
431+
432+
1. It has an [implicit dataFrame](https://github.com/AbsaOSS/spark-commons/blob/113-Rename-ErrorHandling-to-ErrorHandler/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/DataFrameErrorHandlerImplicit.scala) for easier usage of the methods provided by the error handler trait.
433+
434+
2. It provides four basic implementations
435+
* [ErrorHandlerErrorMessageIntoArray](https://github.com/AbsaOSS/spark-commons/blob/113-Rename-ErrorHandling-to-ErrorHandler/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/implementations/ErrorHandlerErrorMessageIntoArray.scala) - An implementation of error handler trait that collects errors into columns of struct based on [za.co.absa.spark.commons.errorhandler.ErrorMessage ErrorMessage] case class.
436+
* [ErrorHandlerFilteringErrorRows](https://github.com/AbsaOSS/spark-commons/blob/113-Rename-ErrorHandling-to-ErrorHandler/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/implementations/ErrorHandlerFilteringErrorRows.scala) - An implementation of error handler that implements the functionality of filtering rows that have some error (any of the error columns is not NULL).
437+
* [ErrorHandlerIgnoringErrors](https://github.com/AbsaOSS/spark-commons/blob/113-Rename-ErrorHandling-to-ErrorHandler/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/implementations/ErrorHandlerIgnoringErrors.scala) - An implementation of error handler trait that ignores the errors detected during the dataFrame error aggregation
438+
* [ErrorHandlerThrowingException](https://github.com/AbsaOSS/spark-commons/blob/113-Rename-ErrorHandling-to-ErrorHandler/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/implementations/ErrorHandlerThrowingException.scala) - An implementation of error handler trait that throws an exception on error detected.
431439

432440
## Spark Commons Test
433441

spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/DataFrameErrorHandlingImplicit.scala renamed to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/DataFrameErrorHandlerImplicit.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,17 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.spark.commons.errorhandling
17+
package za.co.absa.spark.commons.errorhandler
1818

1919
import org.apache.spark.sql.{Column, DataFrame}
20-
import za.co.absa.spark.commons.errorhandling.types.{AdditionalInfo, ErrCode, ErrMsg, ErrSourceColName, ErrType, ErrorColumn, ErrorWhen}
20+
import za.co.absa.spark.commons.errorhandler.types.{AdditionalInfo, ErrCode, ErrMsg, ErrSourceColName, ErrType, ErrorColumn, ErrorWhen}
2121
import scala.language.implicitConversions
2222

2323
/**
24-
* Class implement the functionality of implicit ErrorHandling trait to DataFrame. This implementation take ErrorHandling trait as an
25-
* implicit which will allow easier usage for ErrorHandling trait.
24+
* Class implement the functionality of implicit ErrorHandler trait to DataFrame. This implementation take ErrorHandler trait as an
25+
* implicit which will allow easier usage for ErrorHandler trait.
2626
*/
27-
object DataFrameErrorHandlingImplicit {
27+
object DataFrameErrorHandlerImplicit {
2828
/**
2929
* This method implicitly convert an errorColumn to a normal Column
3030
*
@@ -35,33 +35,33 @@ object DataFrameErrorHandlingImplicit {
3535
errorColumn.column
3636
}
3737

38-
implicit class DataFrameEnhancedWithErrorHandling(val dataFrame: DataFrame) extends AnyVal {
38+
implicit class DataFrameEnhancedWithErrorHandler(val dataFrame: DataFrame) extends AnyVal {
3939

4040
/**
4141
* Applies the earlier collected [[types.ErrorColumn ErrorColumns]] to the provided [[org.apache.spark.sql.DataFrame spark.DataFrame]].
4242
*
43-
* @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn]]
43+
* @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandler!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandler\.ErrorMessageSubmit)* createErrorAsColumn]]
4444
* @return - the original data frame with the error detection applied
45-
* @group Error Handling
45+
* @group Error Handler
4646
* @since 0.6.0
4747
*/
48-
def applyErrorColumnsToDataFrame(errCols: ErrorColumn*)(implicit errorHandling: ErrorHandling): DataFrame = {
49-
errorHandling.applyErrorColumnsToDataFrame(dataFrame)(errCols: _*)
48+
def applyErrorColumnsToDataFrame(errCols: ErrorColumn*)(implicit errorHandler: ErrorHandler): DataFrame = {
49+
errorHandler.applyErrorColumnsToDataFrame(dataFrame)(errCols: _*)
5050
}
5151

5252
/**
5353
* The idea of this function is: "Put the error specified to the provided dataframe if the condition is true on the row."
54-
* The error is transformed to a column using the [[ErrorHandling.transformErrorSubmitToColumn]] method and applied to the data frame
55-
* if the "when" condition is true using the [[ErrorHandling.doApplyErrorColumnsToDataFrame]] method.
54+
* The error is transformed to a column using the [[ErrorHandler.transformErrorSubmitToColumn]] method and applied to the data frame
55+
* if the "when" condition is true using the [[ErrorHandler.doApplyErrorColumnsToDataFrame]] method.
5656
*
5757
* @param when - the condition that defines the error occurred on the row
5858
* @param errorMessageSubmit - the detected error specification
5959
* @return - the original [[org.apache.spark.sql.DataFrame spark.DataFrame]] with the error detection applied
60-
* @group Error Handling
60+
* @group Error Handler
6161
* @since 0.6.0
6262
*/
63-
def putError(when: Column)(errorMessageSubmit: ErrorMessageSubmit)(implicit errorHandling: ErrorHandling): DataFrame = {
64-
errorHandling.putError(dataFrame)(when)(errorMessageSubmit)
63+
def putError(when: Column)(errorMessageSubmit: ErrorMessageSubmit)(implicit errorHandler: ErrorHandler): DataFrame = {
64+
errorHandler.putError(dataFrame)(when)(errorMessageSubmit)
6565
}
6666

6767
/**
@@ -71,11 +71,11 @@ object DataFrameErrorHandlingImplicit {
7171
*
7272
* @param errorsWhen - the list of condition-error pairs, the condition are grouped by the field of the error submissions
7373
* @return - the original data frame with the error detection applied
74-
* @group Error Handling
74+
* @group Error Handler
7575
* @since 0.6.0
7676
*/
77-
def putErrorsWithGrouping(errorsWhen: Seq[ErrorWhen])(implicit errorHandling: ErrorHandling): DataFrame = {
78-
errorHandling.putErrorsWithGrouping(dataFrame)(errorsWhen)
77+
def putErrorsWithGrouping(errorsWhen: Seq[ErrorWhen])(implicit errorHandler: ErrorHandler): DataFrame = {
78+
errorHandler.putErrorsWithGrouping(dataFrame)(errorsWhen)
7979
}
8080

8181
/**
@@ -85,15 +85,15 @@ object DataFrameErrorHandlingImplicit {
8585
*
8686
* @param errorMessageSubmit - the error specification
8787
* @return - [[types.ErrorColumn]] expression containing the error specification
88-
* @group Error Handling
88+
* @group Error Handler
8989
* @since 0.6.0
9090
*/
91-
def createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)(implicit errorHandling: ErrorHandling): ErrorColumn = {
92-
errorHandling.createErrorAsColumn(errorMessageSubmit)
91+
def createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)(implicit errorHandler: ErrorHandler): ErrorColumn = {
92+
errorHandler.createErrorAsColumn(errorMessageSubmit)
9393
}
9494

9595
/**
96-
* Same as the other [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification
96+
* Same as the other [[ErrorHandler!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandler\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification
9797
* in decomposed state, not in the [[ErrorMessageSubmit]] trait form.
9898
*
9999
* @param errType - word description of the type of the error
@@ -102,12 +102,12 @@ object DataFrameErrorHandlingImplicit {
102102
* @param errSourceColName - the name of the column the error happened at
103103
* @param additionalInfo - any optional additional info in JSON format
104104
* @return - [[types.ErrorColumn]] expression containing the error specification
105-
* @group Error Handling
105+
* @group Error Handler
106106
* @since 0.6.0
107107
*/
108108
def createErrorAsColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None)
109-
(implicit errorHandling: ErrorHandling): ErrorColumn = {
110-
errorHandling.createErrorAsColumn(errType, errCode, errMessage, errSourceColName, additionalInfo)
109+
(implicit errorHandler: ErrorHandler): ErrorColumn = {
110+
errorHandler.createErrorAsColumn(errType, errCode, errMessage, errSourceColName, additionalInfo)
111111
}
112112
}
113113

spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorHandling.scala renamed to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/ErrorHandler.scala

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,31 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.spark.commons.errorhandling
17+
package za.co.absa.spark.commons.errorhandler
1818

1919
import org.apache.spark.sql.catalyst.expressions.{CaseWhen, Expression}
2020
import org.apache.spark.sql.functions.when
2121
import org.apache.spark.sql.types.DataType
2222
import org.apache.spark.sql.{Column, DataFrame}
23-
import za.co.absa.spark.commons.errorhandling.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
24-
import za.co.absa.spark.commons.errorhandling.types._
23+
import za.co.absa.spark.commons.errorhandler.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
24+
import za.co.absa.spark.commons.errorhandler.types._
2525

2626
/**
27-
* The basic class of error handling component. Every library that wants to use the component during Spark data
27+
* The basic class of error handler component. Every library that wants to use the component during Spark data
2828
* processing should utilize this trait and its methods. The methods serve to record the errors and attach them to the
2929
* [[org.apache.spark.sql.DataFrame spark.DataFrame]]. The trait should be an input parameter for such library, perhaps as an implicit.
30-
* On the other side the end application provides concrete [[ErrorHandling]] implementation, that does the actual error
30+
* On the other side the end application provides concrete [[ErrorHandler]] implementation, that does the actual error
3131
* handling by the application desire.
3232
* For easy to use and as examples, a few general implementations are provided in the implementations sub-folder.
3333
* Also for common, repeated implementations the folder `partials` offer some traits.
3434
*/
35-
trait ErrorHandling {
35+
trait ErrorHandler {
3636
/**
3737
* First of the few methods that needs to be coded in the trait implementation
3838
* The purpose of this method is to convert the error specification into a [[org.apache.spark.sql.Column spark.Column]] expression
3939
* @param errorMessageSubmit - the error specification
4040
* @return - the error specification transformed into a column expression
41-
* @group Error Handling
41+
* @group Error Handler
4242
* @since 0.6.0
4343
*/
4444
protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column
@@ -65,7 +65,7 @@ trait ErrorHandling {
6565
* @param when - the condition that defines the error occurred on the row
6666
* @param errorMessageSubmit - the detected error specification
6767
* @return - the original [[org.apache.spark.sql.DataFrame spark.DataFrame]] with the error detection applied
68-
* @group Error Handling
68+
* @group Error Handler
6969
* @since 0.6.0
7070
*/
7171
def putError(dataFrame: DataFrame)(when: Column)(errorMessageSubmit: ErrorMessageSubmit): DataFrame = {
@@ -79,7 +79,7 @@ trait ErrorHandling {
7979
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on
8080
* @param errorsWhen - the list of condition-error pairs, the condition are grouped by the field of the error submissions
8181
* @return - the original data frame with the error detection applied
82-
* @group Error Handling
82+
* @group Error Handler
8383
* @since 0.6.0
8484
*/
8585
def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame = {
@@ -104,23 +104,24 @@ trait ErrorHandling {
104104
* The returned [[types.ErrorColumn]] should then be used in [[applyErrorColumnsToDataFrame]].
105105
* @param errorMessageSubmit - the error specification
106106
* @return - [[types.ErrorColumn]] expression containing the error specification
107-
* @group Error Handling
107+
* @group Error Handler
108108
* @since 0.6.0
109109
*/
110110
def createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn = {
111111
ErrorColumn(transformErrorSubmitToColumn(errorMessageSubmit))
112112
}
113113

114114
/**
115-
* Same as the other [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification
115+
* Same as the other [[ErrorHandler!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandler\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification
116116
* in decomposed state, not in the [[ErrorMessageSubmit]] trait form.
117+
*
117118
* @param errType - word description of the type of the error
118119
* @param errCode - number designation of the type of the error
119120
* @param errMessage - human friendly description of the error
120121
* @param errSourceColName - the name of the column the error happened at
121122
* @param additionalInfo - any optional additional info in JSON format
122123
* @return - [[types.ErrorColumn]] expression containing the error specification
123-
* @group Error Handling
124+
* @group Error Handler
124125
* @since 0.6.0
125126
*/
126127
def createErrorAsColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = {
@@ -133,26 +134,29 @@ trait ErrorHandling {
133134
/**
134135
* Applies the earlier collected [[types.ErrorColumn ErrorColumns]] to the provided [[org.apache.spark.sql.DataFrame spark.DataFrame]].
135136
* See [[doApplyErrorColumnsToDataFrame]] for detailed functional explanation.
137+
*
136138
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on
137-
* @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn]]
139+
* @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandler!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandler\.ErrorMessageSubmit)* createErrorAsColumn]]
138140
* @return - the original data frame with the error detection applied
139-
* @group Error Handling
141+
* @group Error Handler
140142
* @since 0.6.0
141143
*/
142144
def applyErrorColumnsToDataFrame(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame = {
143145
doApplyErrorColumnsToDataFrame(dataFrame, errCols.map(_.column): _*)
144146
}
145147

146148
/**
147-
* Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured.
149+
* Provides the library some information about how the actual implementation of [[ErrorHandler]] is structured.
148150
* This function provides the information on the structure of single error column
151+
*
149152
* @return - the DataType of the column returned from `createErrorAsColumn` function
150153
*/
151154
def errorColumnType: DataType
152155

153156
/**
154-
* Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured.
157+
* Provides the library some information about how the actual implementation of [[ErrorHandler]] is structured.
155158
* This function describes what is the type of the column attached (if it didn't exists before) to the [[org.apache.spark.sql.DataFrame DataFrame]]
159+
*
156160
* @return - the DataType of the column containing the error info that is attached to the [[org.apache.spark.sql.DataFrame DataFrame]].
157161
*/
158162
def dataFrameColumnType: Option[DataType]

spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessage.scala renamed to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/ErrorMessage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.spark.commons.errorhandling
17+
package za.co.absa.spark.commons.errorhandler
1818

19-
import za.co.absa.spark.commons.errorhandling.types._
19+
import za.co.absa.spark.commons.errorhandler.types._
2020

2121
/**
2222
* Case class to represent an error message

spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessageSubmit.scala renamed to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandler/ErrorMessageSubmit.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.spark.commons.errorhandling
17+
package za.co.absa.spark.commons.errorhandler
1818

19-
import za.co.absa.spark.commons.errorhandling.types._
19+
import za.co.absa.spark.commons.errorhandler.types._
2020

2121
/**
2222
* Trait collecting error definition in a format usable during Spark data processing

0 commit comments

Comments
 (0)