Skip to content

Invalid json input record crashes HoodieDeltaStreamer #15942

@hudi-bot

Description

@hudi-bot

With a HoodieDeltaStreamer configured to decode json records, a non-json record causes a failure with no way to skip it or move past. ClickHouse KafkaEngine solves this with an optional "skip invalid records" mode, which seems like it would be helpful here as well.

It's not clear to me if --commit-on-errors is supposed to solve for this, but it appears not to in practice.

On failure, the following stack trace is thrown and the process exits:

{{23/07/03 14:49:23 INFO DAGScheduler: ShuffleMapStage 2 (mapToPair at HoodieJavaRDD.java:135) failed in 2144.592 s due to Job aborted due to stage failure: Task 14 in stage 2.0 failed 1 times, most recent f}}
{{ailure: Lost task 14.0 in stage 2.0 (TID 16) (executor driver): org.apache.hudi.exception.HoodieIOException: Unrecognized token 'testvalue': was expecting (JSON String, Number, Array, Obje}}
{{ct or token 'null', 'true' or 'false')                                                                                                                                                                       }}
{{ at [Source: (String)"testvalue"; line: 1, column: 10]                                                                                                                                                       }}
{{        at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:96)                                                                                                                 }}
{{        at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:87)                                                                                                           }}
{{        at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)                                                                                                         }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       }}
{{        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)                                                                                                               }}
{{        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)                                                                                                                 }}
{{        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)}}
{{        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)}}
{{        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)}}
{{        at org.apache.spark.scheduler.Task.run(Task.scala:136)}}
{{        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)}}
{{        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)}}
{{        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)}}
{{        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)}}
{{        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)}}
{{        at java.base/java.lang.Thread.run(Thread.java:829)}}
{{Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'testvalue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')}}
{{ at [Source: (String)"testvalue"; line: 1, column: 10]}}
{{        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)}}
{{        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)}}
{{        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)}}
{{        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2939)}}
{{        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchToken(ReaderBasedJsonParser.java:2713)}}
{{        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchTrue(ReaderBasedJsonParser.java:2667)}}
{{        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:767)}}
{{        at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)}}
{{        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)}}
{{        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)}}
{{        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)}}
{{        at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:93)}}
{{        ... 17 more}}

JIRA info

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions