Skip to content

DefaultConsumerErrorHandler

import io.micrometer.core.instrument.MeterRegistry
import org.slf4j.LoggerFactory
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.messaging.support.ErrorMessage
import org.springframework.stereotype.Component

private const val UNHANDLED_EXCEPTION = "unhandled-exception"

@Component
class DefaultConsumerErrorHandler(private val meterRegistry: MeterRegistry) {
    private val logger = LoggerFactory.getLogger(DefaultConsumerErrorHandler::class.java)

    @ServiceActivator(inputChannel = "errorChannel")
    fun handleError(message: ErrorMessage) {
        logger.error("Failed Kafka message: ${String(message.originalMessage?.payload as ByteArray)}", message.payload)
        meterRegistry.counter(UNHANDLED_EXCEPTION).increment()
        logger.error("$UNHANDLED_EXCEPTION -  ${message.payload.message}", message.payload)
    }
}