Data Validation with Scala
Feature Validation with the Hopsworks Feature Store
In this notebook we introduce feature validation operations with the Hopsworks Feature Store and its client API, hsfs.
Background
Motivation
Data ingested into the Feature Store form the basis for the data fed as input to algorithms that develope machine learning models. The Feature store is a place where curated feature data is stored, therefore it is important that this data is validated against different rules to it adheres to business requirements.
For example, ingested features might be expected to never be empty or to lie within a certain range, for example a feature age
should always be a non-negative number.
The Hopsworks Feature Store provides users with an API to create Expectations
on ingested feature data by utilizing the Deequ
https://github.com/awslabs/deequ open source library. Feature validation is part of the HSFS Java/Scala and Python API for working with Feature Groups. Users work with the abstractions:
- Rules: A set of validation rules applied on a Spark/PySpark dataframe that is inserted into a Feature Group.
- Expectations: A set of rules that is applied on a set of features as provided by the user. Expecations are created at the feature store level and can be attached to multiple feature groups.
- Validations: The results of expectations against the ingested dataframe are assigned a ValidationTime and are persisted within the Feature Store. Users can then retrieve validation results by validation time and by commit time for time-travel enabled feature groups.
Feature Validation is disabled by default, by having the validation_type
feature group attribute set to NONE
. The list of allowed validation types are:
- STRICT: Data validation is performed and feature group is updated only if validation status is “Success”
- WARNING: Data validation is performed and feature group is updated only if validation status is “Warning” or lower
- ALL: Data validation is performed and feature group is updated only if validation status is “Failure” or lower
- NONE: Data validation not performed on feature group
Examples
Create time travel enabled feature group and Bulk Insert Sample Dataset
For this demo we will use small sample of the Agarwal Generator that is a widely used dataset. It contains the hypothetical data of people applying for a loan. Rakesh Agrawal, Tomasz Imielinksi, and Arun Swami, "Database Mining: A Performance Perspective", IEEE Transactions on Knowledge and Data Engineering, 5(6), December 1993. <br/><br/>
For simplicity of demo purposes we split Agarwal dataset into 3 freature groups and demostrate feature validaton on the economy_fg feature group:
economy_fg
with customer id, salary, loan, value of house, age of house, commission and type of car features;
Importing necessary libraries
import com.logicalclocks.hsfs._
import com.logicalclocks.hsfs.engine._
import com.logicalclocks.hsfs.metadata.validation._
import com.logicalclocks.hsfs.metadata.Expectation
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
val connection = HopsworksConnection.builder().build();
val fs = connection.getFeatureStore();
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
7 | application_1612535100309_0043 | spark | idle | Link | Link |
SparkSession available as 'spark'.
import com.logicalclocks.hsfs._
import com.logicalclocks.hsfs.engine._
import com.logicalclocks.hsfs.metadata.validation._
import com.logicalclocks.hsfs.metadata.Expectation
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
connection: com.logicalclocks.hsfs.HopsworksConnection = com.logicalclocks.hsfs.HopsworksConnection@409f5003
fs: com.logicalclocks.hsfs.FeatureStore = FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}
val economyFgSchema =
scala.collection.immutable.List(
StructField("id", IntegerType, true),
StructField("salary", FloatType, true),
StructField("commission", FloatType, true),
StructField("car", StringType, true),
StructField("hvalue", FloatType, true),
StructField("hyears", IntegerType, true),
StructField("loan", FloatType, true),
StructField("year", IntegerType, true)
)
economyFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(salary,FloatType,true), StructField(commission,FloatType,true), StructField(car,StringType,true), StructField(hvalue,FloatType,true), StructField(hyears,IntegerType,true), StructField(loan,FloatType,true), StructField(year,IntegerType,true))
Create spark dataframes for each Feature groups
val economyBulkInsertData = Seq(
Row(1, 110499.73f, 0.0f, "car15", 235000.0f, 30, 354724.18f, 2020),
Row(2, 140893.77f, 0.0f, "car20", 135000.0f, 2, 395015.33f, 2020),
Row(3, 119159.65f, 0.0f, "car1", 145000.0f, 22, 122025.08f, 2020),
Row(4, 20000.0f, 52593.63f, "car9", 185000.0f, 30, 99629.62f, 2020)
)
val economyBulkInsertDf = spark.createDataFrame(
spark.sparkContext.parallelize(economyBulkInsertData),
StructType(economyFgSchema)
)
economyBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,110499.73,0.0,car15,235000.0,30,354724.2,2020], [2,140893.77,0.0,car20,135000.0,2,395015.34,2020], [3,119159.65,0.0,car1,145000.0,22,122025.08,2020], [4,20000.0,52593.63,car9,185000.0,30,99629.62,2020])
economyBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]
economyBulkInsertDf.show()
+---+---------+----------+-----+--------+------+---------+----+
| id| salary|commission| car| hvalue|hyears| loan|year|
+---+---------+----------+-----+--------+------+---------+----+
| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|
| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|
| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|
| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|
+---+---------+----------+-----+--------+------+---------+----+
Data Validation
The next sections shows you how to create feature store expectations, attach them to feature groups, and apply them to dataframes being appended to the feature group.
Discover data validation rules supported in Hopsworks
Hopsworks comes shipped with a set of data validation rules. These rules are immutable, uniquely identified by name and are available across all feature stores. These rules are used to create feature store expectations which can then be attached to feature groups.
// Get all available rule definitions
val rules = connection.getRules()
rules: Seq[com.logicalclocks.hsfs.metadata.RuleDefinition] = Buffer(RuleDefinition(name=HAS_UNIQUENESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_DISTINCTNESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_CORRELATION, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_QUANTILE, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_COUNT_DISTINCT, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=IS_LESS_THAN_OR_EQUAL_TO, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_ENTROPY, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_MIN, predicate=VALUE, valueType=Fra...
// Get a rule definition by name
val ruleMax = connection.getRule(RuleName.HAS_MAX)
ruleMax: com.logicalclocks.hsfs.metadata.RuleDefinition = RuleDefinition(name=HAS_MAX, predicate=VALUE, valueType=Fractional, description=A rule that asserts on the max of the feature)
Create Expectations based on Hopsworks rules
Expectations are created at the feature store level. Multiple expectations can be created per feature store.
An expectation is comprised from one or multiple rules and can refer to one or multiple features. An expectation can be utilized by attaching it to a feature group, as shown in the next sections
// Create an expectation for the "salary" and "commissio" features so that their min value is "10" and their max is "100"
val expectationSales = (fs.createExpectation()
.rules(Seq(
Rule.createRule(RuleName.HAS_MIN).min(0).level(Level.WARNING).build(), //Set rule by name
Rule.createRule(ruleMax).max(1000000).level(Level.ERROR).build())) //Set rule by passing the RuleDefinition metadata
.name("sales")
.description("min and max sales limits")
.features(Seq("salary", "commission"))
.build())
expectationSales.save()
// Create an expectation for the "year" feature so that its min value is between 2018-2019 and its max value is equal to 2021
val expectationYear = (fs.createExpectation()
.rules(Seq(
Rule.createRule(RuleName.HAS_MIN).min(2018).level(Level.WARNING).build(),
Rule.createRule(RuleName.HAS_MAX).max(2021).level(Level.ERROR).build()))
.name("year")
.description("min and max limits")
.features(Seq("year"))
.build())
expectationYear.save()
expectationSales: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@52bc8329, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@2e84fc2d, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@4188c5c1)
expectationYear: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5dd73ee4, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3d7c2e0, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@3940336c)
Discover Feature Store Expectations
Using the Python API you can easily find out which expectations are availeble in this feature store.
// Retrieve all Feature Store expectations
val fsExpectations = fs.getExpectations()
fsExpectations: Seq[com.logicalclocks.hsfs.metadata.Expectation] = Buffer(Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@73440814, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@68b15078, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@64bcfca4), Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@7d545a36, name=HAS_M...
// Retrieve a Feature Store expectation by name
val yearExp = fs.getExpectation("year")
yearExp: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@130559a2, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4bdf1a6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@2fbd7c66)
Create feature group with expectations and validation type
Feature store expectations can be attached and detached from feature groups. That enables ingestions pipelines to validate incoming data against expectations. Expectations can be set when creating a feature group. Later in the notebook we describe the possible validation type values and what that means for the feature group ingestion. For the moment, we initialize the validation type to STRICT
val economyFg = (fs.createFeatureGroup()
.name("economy_fg48")
.description("Hudi Household Economy Feature Group")
.expectations(Seq(expectationSales, expectationYear))
.validationType(ValidationType.STRICT)
.version(1)
.primaryKeys(Seq("id"))
.partitionKeys(Seq("year"))
.timeTravelFormat(TimeTravelFormat.HUDI)
.build())
economyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@1f3c778c
Bulk insert data into the feature group
Since we have not yet saved any data into newly created feature groups we will use Apache hudi terminology and Bulk Insert
data. In HSFS its just issuing save
method.
Data will be validated prior to being persisted into the Feature Store.
economyFg.save(economyBulkInsertDf)
Attach expectations to Feature Groups
Expectations can be attached and detached from feature groups even after the latter are created. If an expectation is attached to a feature group, it will be used when inserted data is validated. An expectation can be attached to multiple feature groups, as long as the expectation’s features exist in that feature group.
// Detach expectation by using its name or the metadata object, example shows the latter
economyFg.detachExpectation(expectationYear)
// Attach expectation by using its name or the metadata object, example shows the former
economyFg.attachExpectation("year")
res19: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6b767f68, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@dd6947c, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@8221927)
Validations
You can also validate the dataframe without having to insert the data into a feature group
economyFg.validate(economyBulkInsertDf)
res20: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=19, validationTime=1612891168467, commitTime=null, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@39a295cf, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@36fdf3b6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@444ead48), results=[ValidationResult(status=Success, me...
You get retrieve all the validations of a feature group
// Get all validations of the feature group
val validations = economyFg.getValidations()
validations: java.util.List[com.logicalclocks.hsfs.metadata.FeatureGroupValidation] = [FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6c213ee3, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3f20aaab, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@db7cd28), results=[Vali...
… or retrieve a validation by validation or commit time.
Validation time is the timestamp when the validation started.
Commit time is the time data was peristed in the time travel enabled feature group
val validationTime = validations(0).getValidationTime()
validationTime: Long = 1612891077406
Commit time associated with the validation time when the data is committed to time-travel enabled feature group.
val commitTime = validations(0).getCommitTime()
commitTime: Long = 1612891080000
Get validation by Feature Group Commit Time
// Get a validation by commitTime
val validation = economyFg.getValidation(commitTime, DataValidationEngine.ValidationTimeType.COMMIT_TIME)
validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@26d38c44, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4ca150b, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@196348ed), results=[ValidationResult(statu...
Get validation by Feature Group Validation Time
// Get a validation by validationTyme
val validation = economyFg.getValidation(validationTime, com.logicalclocks.hsfs.engine.DataValidationEngine.ValidationTimeType.VALIDATION_TIME)
validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@72764f97, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5c1d4d62, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@15e6c418), results=[ValidationResult(stat...
Get the status of a validation
validation.getStatus()
res40: com.logicalclocks.hsfs.metadata.ExpectationResult.Status = Success
Upsert new invalid data into a Feature Group
Now we will try to upsert some invalid data (year feature does not meet the maximum expectation). An error is returned to the client along with the failed expectation
Generate Sample Upserts Data
val economyUpsertData = Seq(
Row(1, 120499.73f, 0.0f, "car17", 205000.0f, 30, 564724.18f, 2022), //update
Row(2, 160893.77f, 0.0f, "car10", 179000.0f, 2, 455015.33f, 2020), //update
Row(5, 93956.32f, 0.0f, "car15", 135000.0f, 1, 458679.82f, 2020), //insert
Row(6, 41365.43f, 52809.15f, "car7", 135000.0f, 19, 216839.71f, 2020), //insert
Row(7, 94805.61f, 0.0f, "car17", 135000.0f, 23, 233216.07f, 2022) //insert
)
val economyUpsertDf = spark.createDataFrame(
spark.sparkContext.parallelize(economyUpsertData),
StructType(economyFgSchema)
)
economyUpsertDf.show(5)
economyUpsertData: Seq[org.apache.spark.sql.Row] = List([1,120499.73,0.0,car17,205000.0,30,564724.2,2022], [2,160893.77,0.0,car10,179000.0,2,455015.34,2020], [5,93956.32,0.0,car15,135000.0,1,458679.8,2020], [6,41365.43,52809.15,car7,135000.0,19,216839.7,2020], [7,94805.61,0.0,car17,135000.0,23,233216.06,2022])
economyUpsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]
+---+---------+----------+-----+--------+------+---------+----+
| id| salary|commission| car| hvalue|hyears| loan|year|
+---+---------+----------+-----+--------+------+---------+----+
| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2022|
| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|
| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|
| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|
| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2022|
+---+---------+----------+-----+--------+------+---------+----+
// Insert call will fail as invalid data (year feature) is about to be ingested. Error shows the expectation that was not met
economyFg.insert(economyUpsertDf)
An error was encountered:
java.io.IOException: Error: 417{"type":"restApiJsonResponse","errorCode":270149,"errorMsg":"Feature group validation checks did not pass, will not persist validation results.","usrMsg":"Results: [ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='2020.0', feature='year', rule=Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Failure, message='Value: 2022.0 does not meet the constraint requirement! HAS_MAX', value='2022.0', feature='year', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='year', features=[year], rules=[Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}]}}, ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='41365.4296875', feature='salary', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='0.0', feature='commission', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='160893.765625', feature='salary', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='52809.1484375', feature='commission', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='sales', features=[salary, commission], rules=[Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}]}}]"}
at com.logicalclocks.hsfs.metadata.AuthorizationHandler.handleResponse(AuthorizationHandler.java:45)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:223)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:191)
at com.logicalclocks.hsfs.metadata.HopsworksInternalClient.handleRequest(HopsworksInternalClient.java:148)
at com.logicalclocks.hsfs.metadata.HopsworksClient.handleRequest(HopsworksClient.java:151)
at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:135)
at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:106)
at com.logicalclocks.hsfs.engine.DataValidationEngine.validate(DataValidationEngine.java:64)
at com.logicalclocks.hsfs.FeatureGroup.validate(FeatureGroup.java:394)
at com.logicalclocks.hsfs.engine.FeatureGroupEngine.saveDataframe(FeatureGroupEngine.java:134)
at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:294)
at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:256)
at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:238)
... 62 elided
Validation type
The validation type determines the validation behavior. Available types are: - STRICT: Data validation is performed and data is ingested into feature group is updated only if validation status is “SUCCESS” - WARNING: Data validation is performed and data is ingested into the feature group only if validation status is “WARNING” or “SUCCESS” - ALL: Data validation is performed and data is ingested into the feature group regardless of the validation status - NONE: Data validation not performed on feature group
The validation type can easily be changed for a feature group
// The previous economy_upsert_df contains invalid data but we still want to persist the data, so we set the validation type from STRICT to ALL
economyFg.updateValidationType(ValidationType.ALL)
// We try to insert the invalid df again
economyFg.insert(economyUpsertDf)