Skip to main content

Kafka Event System

Overview

This comprehensive guide covers the Kafka-based event system in the Ink platform, including event types, topic configuration, producer/consumer patterns, serialization strategies, error handling, and monitoring.

Target Audience: Backend developers and architects
Prerequisites: Understanding of event-driven architecture, Kafka basics
Estimated Time: 50-60 minutes

Prerequisites

  • Event-driven architecture concepts
  • Apache Kafka fundamentals
  • Spring Kafka knowledge
  • Understanding of asynchronous processing
  • Completed Service Layer Architecture

Kafka Architecture

Installation Steps

1. Kafka Dependencies

<!-- filepath: /Users/jetstart/dev/jetrev/ink/pom.xml -->
<!-- ...existing code... -->
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<!-- ...existing code... -->

2. Kafka Configuration

# filepath: /Users/jetstart/dev/jetrev/ink/src/main/resources/application.yml
# ...existing code...
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
max.in.flight.requests.per.connection: 5
enable.idempotence: true
compression.type: snappy

consumer:
group-id: ${KAFKA_CONSUMER_GROUP:ink-consumer-group}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.json.trusted.packages: com.jetrev.ink.events
isolation.level: read_committed

listener:
ack-mode: manual
concurrency: 3
poll-timeout: 3000

# Custom Kafka configuration
kafka:
topics:
user-events: user-events
order-events: order-events
license-events: license-events
notification-events: notification-events
retry:
max-attempts: 3
backoff-interval: 1000
# ...existing code...

3. Kafka Configuration Class

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/config/KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.jetrev.ink.events");

return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L)
));

return factory;
}
}

Configuration

Event Models

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/BaseEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public abstract class BaseEvent {
private String eventId;
private String eventType;
private Instant timestamp;
private String source;
private Map<String, Object> metadata;

@PrePersist
protected void onCreate() {
if (eventId == null) {
eventId = UUID.randomUUID().toString();
}
if (timestamp == null) {
timestamp = Instant.now();
}
}
}
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/UserEvent.java
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class UserEvent extends BaseEvent {
private Long userId;
private String username;
private String email;
private String action; // CREATED, UPDATED, DELETED, ACTIVATED, DEACTIVATED
private Map<String, Object> changes;
}

@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class UserCreatedEvent extends UserEvent {
private Set<String> roles;

public UserCreatedEvent(User user) {
super();
setEventType("USER_CREATED");
setUserId(user.getId());
setUsername(user.getUsername());
setEmail(user.getEmail());
setAction("CREATED");
setRoles(user.getRoles());
setSource("user-service");
onCreate();
}
}

@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class UserUpdatedEvent extends UserEvent {
private Map<String, Object> previousValues;
private Map<String, Object> newValues;

public UserUpdatedEvent(User user, Map<String, Object> changes) {
super();
setEventType("USER_UPDATED");
setUserId(user.getId());
setUsername(user.getUsername());
setEmail(user.getEmail());
setAction("UPDATED");
setChanges(changes);
setSource("user-service");
onCreate();
}
}
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/OrderEvent.java
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class OrderEvent extends BaseEvent {
private Long orderId;
private String orderNumber;
private Long userId;
private String status;
private BigDecimal total;
private String action;
}

@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class OrderCreatedEvent extends OrderEvent {
private List<OrderItemDto> items;

public OrderCreatedEvent(Order order) {
super();
setEventType("ORDER_CREATED");
setOrderId(order.getId());
setOrderNumber(order.getOrderNumber());
setUserId(order.getUserId());
setStatus(order.getStatus().name());
setTotal(order.getTotal());
setAction("CREATED");
setSource("order-service");
onCreate();
}
}

Event Producer

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/producer/EventProducer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class EventProducer {

private final KafkaTemplate<String, Object> kafkaTemplate;

@Value("${kafka.topics.user-events}")
private String userEventsTopic;

@Value("${kafka.topics.order-events}")
private String orderEventsTopic;

@Value("${kafka.topics.license-events}")
private String licenseEventsTopic;

public CompletableFuture<SendResult<String, Object>> publishEvent(
String topic,
String key,
BaseEvent event) {

log.info("Publishing event: {} to topic: {} with key: {}",
event.getEventType(), topic, key);

return kafkaTemplate.send(topic, key, event)
.thenApply(result -> {
log.info("Event published successfully: {} to partition: {} with offset: {}",
event.getEventType(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
return result;
})
.exceptionally(ex -> {
log.error("Failed to publish event: {} to topic: {}",
event.getEventType(), topic, ex);
throw new EventPublishException("Failed to publish event", ex);
});
}

public CompletableFuture<SendResult<String, Object>> publishUserEvent(UserEvent event) {
return publishEvent(userEventsTopic, event.getUserId().toString(), event);
}

public CompletableFuture<SendResult<String, Object>> publishOrderEvent(OrderEvent event) {
return publishEvent(orderEventsTopic, event.getOrderId().toString(), event);
}
}
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/producer/UserEventProducer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class UserEventProducer {

private final EventProducer eventProducer;

@Async
public void publishUserCreatedEvent(User user) {
UserCreatedEvent event = new UserCreatedEvent(user);
eventProducer.publishUserEvent(event)
.thenAccept(result ->
log.info("User created event published for user: {}", user.getId()));
}

@Async
public void publishUserUpdatedEvent(User user, Map<String, Object> changes) {
UserUpdatedEvent event = new UserUpdatedEvent(user, changes);
eventProducer.publishUserEvent(event)
.thenAccept(result ->
log.info("User updated event published for user: {}", user.getId()));
}

@Async
public void publishUserDeletedEvent(User user) {
UserEvent event = UserEvent.builder()
.eventType("USER_DELETED")
.userId(user.getId())
.username(user.getUsername())
.email(user.getEmail())
.action("DELETED")
.source("user-service")
.build();
event.onCreate();

eventProducer.publishUserEvent(event)
.thenAccept(result ->
log.info("User deleted event published for user: {}", user.getId()));
}
}

Usage Examples

Event Consumers

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/consumer/UserEventConsumer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class UserEventConsumer {

private final EmailService emailService;
private final AuditService auditService;

@KafkaListener(
topics = "${kafka.topics.user-events}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeUserEvent(
@Payload UserEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {

log.info("Received user event: {} from partition: {} with offset: {}",
event.getEventType(), partition, offset);

try {
processUserEvent(event);
acknowledgment.acknowledge();
log.info("Successfully processed user event: {}", event.getEventId());
} catch (Exception e) {
log.error("Failed to process user event: {}", event.getEventId(), e);
// Don't acknowledge - message will be reprocessed
throw new EventProcessingException("Failed to process event", e);
}
}

private void processUserEvent(UserEvent event) {
switch (event.getEventType()) {
case "USER_CREATED":
handleUserCreated((UserCreatedEvent) event);
break;
case "USER_UPDATED":
handleUserUpdated((UserUpdatedEvent) event);
break;
case "USER_DELETED":
handleUserDeleted(event);
break;
default:
log.warn("Unknown event type: {}", event.getEventType());
}

// Always audit
auditService.logEvent(event);
}

private void handleUserCreated(UserCreatedEvent event) {
log.info("Processing user created event for user: {}", event.getUserId());
emailService.sendWelcomeEmail(event.getEmail(), event.getUsername());
}

private void handleUserUpdated(UserUpdatedEvent event) {
log.info("Processing user updated event for user: {}", event.getUserId());

if (event.getChanges().containsKey("email")) {
emailService.sendEmailChangeNotification(
event.getEmail(),
(String) event.getChanges().get("email")
);
}
}

private void handleUserDeleted(UserEvent event) {
log.info("Processing user deleted event for user: {}", event.getUserId());
// Cleanup related data, send farewell email, etc.
}
}

Batch Consumer

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/consumer/AnalyticsEventConsumer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class AnalyticsEventConsumer {

private final AnalyticsService analyticsService;

@KafkaListener(
topics = {"${kafka.topics.user-events}", "${kafka.topics.order-events}"},
groupId = "analytics-consumer-group",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void consumeEventsBatch(
List<ConsumerRecord<String, BaseEvent>> records,
Acknowledgment acknowledgment) {

log.info("Received batch of {} events", records.size());

try {
List<BaseEvent> events = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());

analyticsService.processBatch(events);
acknowledgment.acknowledge();

log.info("Successfully processed batch of {} events", events.size());
} catch (Exception e) {
log.error("Failed to process event batch", e);
throw new EventProcessingException("Batch processing failed", e);
}
}
}

// Batch consumer factory configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
batchKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;
}

Dead Letter Queue Pattern

// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/events/consumer/DeadLetterQueueHandler.java
@Service
@Slf4j
@RequiredArgsConstructor
public class DeadLetterQueueHandler {

private final KafkaTemplate<String, Object> kafkaTemplate;

@Value("${kafka.topics.dead-letter-queue:dlq-events}")
private String dlqTopic;

@KafkaListener(
topics = "${kafka.topics.user-events}",
groupId = "user-events-with-dlq",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeWithDLQ(
@Payload UserEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment acknowledgment) {

try {
processEvent(event);
acknowledgment.acknowledge();
} catch (RecoverableException e) {
log.warn("Recoverable error processing event: {}, will retry", event.getEventId());
throw e; // Will be retried by error handler
} catch (Exception e) {
log.error("Unrecoverable error processing event: {}, sending to DLQ",
event.getEventId(), e);
sendToDLQ(event, topic, e);
acknowledgment.acknowledge(); // Acknowledge to prevent infinite retry
}
}

private void sendToDLQ(BaseEvent event, String originalTopic, Exception error) {
Map<String, Object> dlqMetadata = new HashMap<>();
dlqMetadata.put("originalTopic", originalTopic);
dlqMetadata.put("errorMessage", error.getMessage());
dlqMetadata.put("errorClass", error.getClass().getName());
dlqMetadata.put("timestamp", Instant.now().toString());

event.setMetadata(dlqMetadata);

kafkaTemplate.send(dlqTopic, event.getEventId(), event);
log.info("Event sent to DLQ: {}", event.getEventId());
}

private void processEvent(UserEvent event) {
// Event processing logic
}
}

Verification

Event Testing

// filepath: /Users/jetstart/dev/jetrev/ink/src/test/java/com/jetrev/ink/events/EventIntegrationTest.java
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = {"user-events", "order-events"},
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
@TestPropertySource(properties = {
"kafka.topics.user-events=user-events",
"kafka.topics.order-events=order-events"
})
class EventIntegrationTest {

@Autowired
private UserEventProducer userEventProducer;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Test
void shouldPublishAndConsumeUserEvent() throws Exception {
// Given
User user = User.builder()
.id(1L)
.username("testuser")
.email("test@example.com")
.roles(Set.of("USER"))
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<UserEvent> receivedEvent = new AtomicReference<>();

// Setup consumer
Consumer<String, UserEvent> consumer = createConsumer();
consumer.subscribe(Collections.singleton("user-events"));

// When
userEventProducer.publishUserCreatedEvent(user);

// Then
ConsumerRecords<String, UserEvent> records =
consumer.poll(Duration.ofSeconds(10));

assertThat(records.count()).isGreaterThan(0);

for (ConsumerRecord<String, UserEvent> record : records) {
assertThat(record.value().getUserId()).isEqualTo(1L);
assertThat(record.value().getEventType()).isEqualTo("USER_CREATED");
}

consumer.close();
}

private Consumer<String, UserEvent> createConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEvent.class.getName());

return new KafkaConsumer<>(props);
}
}

Troubleshooting

Common Kafka Issues

Issue: Message loss

# Solution: Enable idempotence and proper acks
spring:
kafka:
producer:
acks: all
retries: 3
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5

Issue: Duplicate message processing

// Solution: Implement idempotency in consumer
@Service
public class IdempotentEventConsumer {

private final Set<String> processedEventIds = new ConcurrentHashMap<>().newKeySet();

@KafkaListener(topics = "user-events")
public void consume(UserEvent event, Acknowledgment ack) {
if (processedEventIds.contains(event.getEventId())) {
log.warn("Duplicate event detected: {}", event.getEventId());
ack.acknowledge();
return;
}

try {
processEvent(event);
processedEventIds.add(event.getEventId());
ack.acknowledge();
} catch (Exception e) {
log.error("Error processing event", e);
// Don't acknowledge - will be retried
}
}
}

Issue: Consumer lag

// Monitor consumer lag with metrics
@Service
@Slf4j
public class KafkaMetricsService {

@Scheduled(fixedDelay = 60000)
public void monitorConsumerLag() {
// Use AdminClient to check consumer lag
// Alert if lag exceeds threshold
}
}

Best Practices

  1. Use Event Sourcing: Store all state changes as events
  2. Idempotent Consumers: Handle duplicate messages gracefully
  3. Schema Evolution: Use compatible serialization (Avro recommended)
  4. Partition Keys: Use meaningful keys for ordered processing
  5. Error Handling: Implement DLQ for failed messages
  6. Monitoring: Track consumer lag and processing metrics
  7. Async Publishing: Use async producers for better performance
  8. Batch Processing: Process multiple events together when possible
  9. Retry Logic: Implement exponential backoff for retries
  10. Testing: Use embedded Kafka for integration tests

Performance Optimization

Producer Optimization

spring:
kafka:
producer:
batch-size: 16384
buffer-memory: 33554432
compression-type: snappy
linger-ms: 10

Consumer Optimization

spring:
kafka:
consumer:
fetch-min-size: 1
fetch-max-wait: 500
max-poll-records: 500
listener:
concurrency: 5

Additional Resources


Next Steps: Learn about Event-Driven Architecture patterns and best practices.