Skip to content

KAFKA-19465: feat(connect): add value.type config to InsertHeader SMT#22560

Open
wilmerdooley wants to merge 1 commit into
apache:trunkfrom
wilmerdooley:oss/kafka-19465
Open

KAFKA-19465: feat(connect): add value.type config to InsertHeader SMT#22560
wilmerdooley wants to merge 1 commit into
apache:trunkfrom
wilmerdooley:oss/kafka-19465

Conversation

@wilmerdooley

Copy link
Copy Markdown

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

This PR adds a new value.type configuration option to the InsertHeader SMT in connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java. The option lets users specify the Connect Schema type for the literal value being inserted as a header, supporting int8, int16, int32, int64, float32, float64, boolean, string, and bytes. When value.type is omitted, the existing behavior of using Values.parseString is preserved, so the change is backward compatible.

This addresses the underlying need behind KAFKA-10428 by allowing InsertHeader to produce schemas other than Schema.STRING_SCHEMA (most notably Schema.BYTES_SCHEMA), which is required for a ByteArray header.converter. Invalid value.type values are rejected at configure time with a ConfigException.

Testing strategy

Unit tests in connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java cover the new behavior. insertionWithExplicitTypes exercises each supported type end to end through apply, asserting that the produced header carries the expected Schema and the expected value (with assertArrayEquals for the bytes case). A separate configRejectsInvalidValueType test verifies that an unknown type is rejected with ConfigException. The existing tests continue to pass because the default path (no value.type set) is unchanged.

JIRA: https://issues.apache.org/jira/browse/KAFKA-19465

Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
@github-actions github-actions Bot added triage PRs from the community connect labels Jun 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connect triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant