Data Validation with Scala

Feature Validation with the Hopsworks Feature Store

In this notebook we introduce feature validation operations with the Hopsworks Feature Store and its client API, hsfs.

Background

Motivation

Data ingested into the Feature Store form the basis for the data fed as input to algorithms that develope machine learning models. The Feature store is a place where curated feature data is stored, therefore it is important that this data is validated against different rules to it adheres to business requirements.

For example, ingested features might be expected to never be empty or to lie within a certain range, for example a feature age should always be a non-negative number.

The Hopsworks Feature Store provides users with an API to create Expectations on ingested feature data by utilizing the Deequ https://github.com/awslabs/deequ open source library. Feature validation is part of the HSFS Java/Scala and Python API for working with Feature Groups. Users work with the abstractions:

  • Rules: A set of validation rules applied on a Spark/PySpark dataframe that is inserted into a Feature Group.
  • Expectations: A set of rules that is applied on a set of features as provided by the user. Expecations are created at the feature store level and can be attached to multiple feature groups.
  • Validations: The results of expectations against the ingested dataframe are assigned a ValidationTime and are persisted within the Feature Store. Users can then retrieve validation results by validation time and by commit time for time-travel enabled feature groups.

Feature Validation is disabled by default, by having the validation_type feature group attribute set to NONE. The list of allowed validation types are: - STRICT: Data validation is performed and feature group is updated only if validation status is “Success” - WARNING: Data validation is performed and feature group is updated only if validation status is “Warning” or lower - ALL: Data validation is performed and feature group is updated only if validation status is “Failure” or lower - NONE: Data validation not performed on feature group

Examples

Create time travel enabled feature group and Bulk Insert Sample Dataset

For this demo we will use small sample of the Agarwal Generator that is a widely used dataset. It contains the hypothetical data of people applying for a loan. Rakesh Agrawal, Tomasz Imielinksi, and Arun Swami, "Database Mining: A Performance Perspective", IEEE Transactions on Knowledge and Data Engineering, 5(6), December 1993. <br/><br/>

For simplicity of demo purposes we split Agarwal dataset into 3 freature groups and demostrate feature validaton on the economy_fg feature group:
  • economy_fg with customer id, salary, loan, value of house, age of house, commission and type of car features;

Importing necessary libraries

import com.logicalclocks.hsfs._
import com.logicalclocks.hsfs.engine._
import com.logicalclocks.hsfs.metadata.validation._
import com.logicalclocks.hsfs.metadata.Expectation
import scala.collection.JavaConversions._
import collection.JavaConverters._

import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

import java.sql.Date
import java.sql.Timestamp

val connection = HopsworksConnection.builder().build();
val fs = connection.getFeatureStore();
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
7application_1612535100309_0043sparkidleLinkLink
SparkSession available as 'spark'.
import com.logicalclocks.hsfs._
import com.logicalclocks.hsfs.engine._
import com.logicalclocks.hsfs.metadata.validation._
import com.logicalclocks.hsfs.metadata.Expectation
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
connection: com.logicalclocks.hsfs.HopsworksConnection = com.logicalclocks.hsfs.HopsworksConnection@409f5003
fs: com.logicalclocks.hsfs.FeatureStore = FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}
val economyFgSchema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("salary", FloatType, true),
  StructField("commission", FloatType, true),
  StructField("car", StringType, true), 
  StructField("hvalue", FloatType, true),      
  StructField("hyears", IntegerType, true),     
  StructField("loan", FloatType, true),
  StructField("year", IntegerType, true)          
)
economyFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(salary,FloatType,true), StructField(commission,FloatType,true), StructField(car,StringType,true), StructField(hvalue,FloatType,true), StructField(hyears,IntegerType,true), StructField(loan,FloatType,true), StructField(year,IntegerType,true))

Create spark dataframes for each Feature groups

val economyBulkInsertData = Seq(
    Row(1, 110499.73f, 0.0f,  "car15",  235000.0f, 30, 354724.18f, 2020),
    Row(2, 140893.77f, 0.0f,  "car20",  135000.0f, 2, 395015.33f, 2020),
    Row(3, 119159.65f, 0.0f,  "car1", 145000.0f, 22, 122025.08f, 2020),
    Row(4, 20000.0f, 52593.63f, "car9", 185000.0f, 30, 99629.62f, 2020)
)

val economyBulkInsertDf = spark.createDataFrame(
    spark.sparkContext.parallelize(economyBulkInsertData),
    StructType(economyFgSchema)
)
economyBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,110499.73,0.0,car15,235000.0,30,354724.2,2020], [2,140893.77,0.0,car20,135000.0,2,395015.34,2020], [3,119159.65,0.0,car1,145000.0,22,122025.08,2020], [4,20000.0,52593.63,car9,185000.0,30,99629.62,2020])
economyBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]
economyBulkInsertDf.show()
+---+---------+----------+-----+--------+------+---------+----+
| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+---------+----------+-----+--------+------+---------+----+
|  1|110499.73|       0.0|car15|235000.0|    30| 354724.2|2020|
|  2|140893.77|       0.0|car20|135000.0|     2|395015.34|2020|
|  3|119159.65|       0.0| car1|145000.0|    22|122025.08|2020|
|  4|  20000.0|  52593.63| car9|185000.0|    30| 99629.62|2020|
+---+---------+----------+-----+--------+------+---------+----+

Data Validation

The next sections shows you how to create feature store expectations, attach them to feature groups, and apply them to dataframes being appended to the feature group.

Discover data validation rules supported in Hopsworks

Hopsworks comes shipped with a set of data validation rules. These rules are immutable, uniquely identified by name and are available across all feature stores. These rules are used to create feature store expectations which can then be attached to feature groups.

// Get all available rule definitions
val rules = connection.getRules()
rules: Seq[com.logicalclocks.hsfs.metadata.RuleDefinition] = Buffer(RuleDefinition(name=HAS_UNIQUENESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_DISTINCTNESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_CORRELATION, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_QUANTILE, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_COUNT_DISTINCT, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=IS_LESS_THAN_OR_EQUAL_TO, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_ENTROPY, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_MIN, predicate=VALUE, valueType=Fra...
// Get a rule definition by name
val ruleMax = connection.getRule(RuleName.HAS_MAX)
ruleMax: com.logicalclocks.hsfs.metadata.RuleDefinition = RuleDefinition(name=HAS_MAX, predicate=VALUE, valueType=Fractional, description=A rule that asserts on the max of the feature)

Create Expectations based on Hopsworks rules

Expectations are created at the feature store level. Multiple expectations can be created per feature store.

An expectation is comprised from one or multiple rules and can refer to one or multiple features. An expectation can be utilized by attaching it to a feature group, as shown in the next sections

// Create an expectation for the "salary" and "commissio" features so that their min value is "10" and their max is "100"
val expectationSales = (fs.createExpectation()
                          .rules(Seq(
                                 Rule.createRule(RuleName.HAS_MIN).min(0).level(Level.WARNING).build(), //Set rule by name
                                 Rule.createRule(ruleMax).max(1000000).level(Level.ERROR).build())) //Set rule by passing the RuleDefinition metadata
                         .name("sales")
                         .description("min and max sales limits")
                         .features(Seq("salary", "commission"))
                         .build())
expectationSales.save()

// Create an expectation for the "year" feature so that its min value is between 2018-2019 and its max value is equal to 2021
val expectationYear = (fs.createExpectation()
                         .rules(Seq(
                                 Rule.createRule(RuleName.HAS_MIN).min(2018).level(Level.WARNING).build(),
                                 Rule.createRule(RuleName.HAS_MAX).max(2021).level(Level.ERROR).build()))
                         .name("year")
                         .description("min and max limits")
                         .features(Seq("year"))
                         .build())
expectationYear.save()
expectationSales: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@52bc8329, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@2e84fc2d, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@4188c5c1)
expectationYear: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5dd73ee4, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3d7c2e0, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@3940336c)

Discover Feature Store Expectations

Using the Python API you can easily find out which expectations are availeble in this feature store.

// Retrieve all Feature Store expectations
val fsExpectations = fs.getExpectations()
fsExpectations: Seq[com.logicalclocks.hsfs.metadata.Expectation] = Buffer(Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@73440814, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@68b15078, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@64bcfca4), Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@7d545a36, name=HAS_M...
// Retrieve a Feature Store expectation by name
val yearExp = fs.getExpectation("year")
yearExp: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@130559a2, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4bdf1a6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@2fbd7c66)

Create feature group with expectations and validation type

Feature store expectations can be attached and detached from feature groups. That enables ingestions pipelines to validate incoming data against expectations. Expectations can be set when creating a feature group. Later in the notebook we describe the possible validation type values and what that means for the feature group ingestion. For the moment, we initialize the validation type to STRICT

val economyFg = (fs.createFeatureGroup()
                .name("economy_fg48")
                .description("Hudi Household Economy Feature Group")
                .expectations(Seq(expectationSales, expectationYear))
                .validationType(ValidationType.STRICT)
                .version(1)
                .primaryKeys(Seq("id"))
                .partitionKeys(Seq("year"))
                .timeTravelFormat(TimeTravelFormat.HUDI)
                .build())
economyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@1f3c778c

Bulk insert data into the feature group

Since we have not yet saved any data into newly created feature groups we will use Apache hudi terminology and Bulk Insert data. In HSFS its just issuing save method.

Data will be validated prior to being persisted into the Feature Store.

economyFg.save(economyBulkInsertDf)

Attach expectations to Feature Groups

Expectations can be attached and detached from feature groups even after the latter are created. If an expectation is attached to a feature group, it will be used when inserted data is validated. An expectation can be attached to multiple feature groups, as long as the expectation’s features exist in that feature group.

// Detach expectation by using its name or the metadata object, example shows the latter
economyFg.detachExpectation(expectationYear)
// Attach expectation by using its name or the metadata object, example shows the former
economyFg.attachExpectation("year")
res19: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6b767f68, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@dd6947c, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@8221927)

Validations

You can also validate the dataframe without having to insert the data into a feature group

economyFg.validate(economyBulkInsertDf)
res20: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=19, validationTime=1612891168467, commitTime=null, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@39a295cf, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@36fdf3b6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@444ead48), results=[ValidationResult(status=Success, me...

You get retrieve all the validations of a feature group

// Get all validations of the feature group
val validations = economyFg.getValidations()
validations: java.util.List[com.logicalclocks.hsfs.metadata.FeatureGroupValidation] = [FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6c213ee3, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3f20aaab, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@db7cd28), results=[Vali...

… or retrieve a validation by validation or commit time.

Validation time is the timestamp when the validation started.

Commit time is the time data was peristed in the time travel enabled feature group

val validationTime = validations(0).getValidationTime()
validationTime: Long = 1612891077406

Commit time associated with the validation time when the data is committed to time-travel enabled feature group.

val commitTime = validations(0).getCommitTime()
commitTime: Long = 1612891080000

Get validation by Feature Group Commit Time

// Get a validation by commitTime
val validation = economyFg.getValidation(commitTime, DataValidationEngine.ValidationTimeType.COMMIT_TIME)
validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@26d38c44, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4ca150b, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@196348ed), results=[ValidationResult(statu...

Get validation by Feature Group Validation Time

// Get a validation by validationTyme
val validation = economyFg.getValidation(validationTime, com.logicalclocks.hsfs.engine.DataValidationEngine.ValidationTimeType.VALIDATION_TIME)
validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@72764f97, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5c1d4d62, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@15e6c418), results=[ValidationResult(stat...

Get the status of a validation

validation.getStatus()
res40: com.logicalclocks.hsfs.metadata.ExpectationResult.Status = Success

Upsert new invalid data into a Feature Group

Now we will try to upsert some invalid data (year feature does not meet the maximum expectation). An error is returned to the client along with the failed expectation

Generate Sample Upserts Data

val economyUpsertData = Seq(
    Row(1, 120499.73f, 0.0f, "car17", 205000.0f, 30, 564724.18f, 2022),    //update
    Row(2, 160893.77f, 0.0f, "car10", 179000.0f, 2, 455015.33f, 2020),     //update
    Row(5, 93956.32f, 0.0f, "car15",  135000.0f, 1, 458679.82f, 2020),     //insert
    Row(6, 41365.43f, 52809.15f, "car7", 135000.0f, 19, 216839.71f, 2020), //insert
    Row(7, 94805.61f, 0.0f, "car17", 135000.0f, 23, 233216.07f, 2022)      //insert
)

val economyUpsertDf = spark.createDataFrame(
  spark.sparkContext.parallelize(economyUpsertData),
  StructType(economyFgSchema)
)

economyUpsertDf.show(5)
economyUpsertData: Seq[org.apache.spark.sql.Row] = List([1,120499.73,0.0,car17,205000.0,30,564724.2,2022], [2,160893.77,0.0,car10,179000.0,2,455015.34,2020], [5,93956.32,0.0,car15,135000.0,1,458679.8,2020], [6,41365.43,52809.15,car7,135000.0,19,216839.7,2020], [7,94805.61,0.0,car17,135000.0,23,233216.06,2022])
economyUpsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]
+---+---------+----------+-----+--------+------+---------+----+
| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+---------+----------+-----+--------+------+---------+----+
|  1|120499.73|       0.0|car17|205000.0|    30| 564724.2|2022|
|  2|160893.77|       0.0|car10|179000.0|     2|455015.34|2020|
|  5| 93956.32|       0.0|car15|135000.0|     1| 458679.8|2020|
|  6| 41365.43|  52809.15| car7|135000.0|    19| 216839.7|2020|
|  7| 94805.61|       0.0|car17|135000.0|    23|233216.06|2022|
+---+---------+----------+-----+--------+------+---------+----+
// Insert call will fail as invalid data (year feature) is about to be ingested. Error shows the expectation that was not met
economyFg.insert(economyUpsertDf)
An error was encountered:
java.io.IOException: Error: 417{"type":"restApiJsonResponse","errorCode":270149,"errorMsg":"Feature group validation checks did not pass, will not persist validation results.","usrMsg":"Results: [ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='2020.0', feature='year', rule=Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Failure, message='Value: 2022.0 does not meet the constraint requirement! HAS_MAX', value='2022.0', feature='year', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='year', features=[year], rules=[Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}]}}, ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='41365.4296875', feature='salary', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='0.0', feature='commission', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='160893.765625', feature='salary', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='52809.1484375', feature='commission', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='sales', features=[salary, commission], rules=[Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}]}}]"}
  at com.logicalclocks.hsfs.metadata.AuthorizationHandler.handleResponse(AuthorizationHandler.java:45)
  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:223)
  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:191)
  at com.logicalclocks.hsfs.metadata.HopsworksInternalClient.handleRequest(HopsworksInternalClient.java:148)
  at com.logicalclocks.hsfs.metadata.HopsworksClient.handleRequest(HopsworksClient.java:151)
  at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:135)
  at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:106)
  at com.logicalclocks.hsfs.engine.DataValidationEngine.validate(DataValidationEngine.java:64)
  at com.logicalclocks.hsfs.FeatureGroup.validate(FeatureGroup.java:394)
  at com.logicalclocks.hsfs.engine.FeatureGroupEngine.saveDataframe(FeatureGroupEngine.java:134)
  at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:294)
  at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:256)
  at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:238)
  ... 62 elided

Validation type

The validation type determines the validation behavior. Available types are: - STRICT: Data validation is performed and data is ingested into feature group is updated only if validation status is “SUCCESS” - WARNING: Data validation is performed and data is ingested into the feature group only if validation status is “WARNING” or “SUCCESS” - ALL: Data validation is performed and data is ingested into the feature group regardless of the validation status - NONE: Data validation not performed on feature group

The validation type can easily be changed for a feature group

// The previous economy_upsert_df contains invalid data but we still want to persist the data, so we set the validation type from STRICT to ALL
economyFg.updateValidationType(ValidationType.ALL)
// We try to insert the invalid df again
economyFg.insert(economyUpsertDf)