Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ private boolean isIgnored() {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
}

final int bodySize = record.serializedValueSize();
if (bodySize >= 0) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize);
}

final @Nullable Integer retryCount = retryCount(record);
if (retryCount != null) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor
import io.sentry.test.initForTest
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Optional
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
Expand All @@ -22,6 +23,7 @@ import kotlin.test.assertTrue
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.record.TimestampType
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
Expand Down Expand Up @@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest {
private fun createRecord(
topic: String = "my-topic",
headers: RecordHeaders = RecordHeaders(),
serializedValueSize: Int = -1,
): ConsumerRecord<String, String> {
val record = ConsumerRecord<String, String>(topic, 0, 0L, "key", "value")
headers.forEach { record.headers().add(it) }
return record
return ConsumerRecord(
topic,
0,
0L,
System.currentTimeMillis(),
TimestampType.CREATE_TIME,
3,
serializedValueSize,
"key",
"value",
headers,
Optional.empty(),
)
}

private fun createRecordWithHeaders(
Expand Down Expand Up @@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest {
)
}

@Test
fun `sets body size from serializedValueSize`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord(serializedValueSize = 42)

interceptor.intercept(record, consumer)

assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
}

@Test
fun `does not set body size when serializedValueSize is negative`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord(serializedValueSize = -1)

interceptor.intercept(record, consumer)

assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
}

@Test
fun `sets retry count from delivery attempt header`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
Expand Down
Loading