Обработка Ошибок
Обработка ошибок
Ошибка на консьюмере
Пусть наш код выбрасывает ошибку при обработке сообщения:
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
if (cr.Message.Key == "jsmith")
{
throw new Exception("I cannot handle jsmith!");
}
Console.WriteLine($"Consumed event from topic {topic}: key = {cr.Message.Key,-10} value = {cr.Message.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
Если мы запустим этот код увидим следующий вывод:
Debug: [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 2/2 partition(s)
Debug: [thrd:main]: Partition purchases [1] start fetching at offset 195 (leader epoch 0)
Debug: [thrd:main]: Partition purchases [0] start fetching at offset 95 (leader epoch 0)
Consumed event from topic purchases: key = eabara value = gift card
Consumed event from topic purchases: key = jbernard value = alarm clock
Consumed event from topic purchases: key = awalther value = t-shirts
Consumed event from topic purchases: key = htanaka value = alarm clock
Unhandled exception: I cannot handle jsmith!
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 2 partition(s) with generation-id 93 in join-state steady: cgrp auto commit timer
Если мы запустим этот код еще раз, оставшиеся сообщение будут обработаны, но jsmith будет пропущен. Так происходит, потому что по умолчанию консьюмер постоянно пушит в кластер оффсеты закомиченных сообщений, даже если эти сообщения еще не были обработаны, а только получены.
Именно в случае кода выше, оффсеты отправляются в методе consumer.Close() блока finally.
Главные настройки отвечающие за управление оффсетами на консьюмере:
enable.auto.commit _— если
true(по умолчанию), периодически записывает оффсет последнего переданного приложению сообщения. Записанный оффсет будет использован при перезапуске для продолжения обработки с этого места.auto.commit.interval.ms _— Как часто в миллисекундах оффсеты будут записываться в хранилище оффсетов
enable.auto.offset.store_ _ — если
true(по умолчанию), клиент автоматически сохранит значение offset+1 от оффсета сообщения прямо перед отправлением сообщения в приложение. Оффсет хранится в памяти и будет использован во время следующего вызова commit() (без явного указания оффсетов) или в следующий авто-коммит. Если false иenable.auto.commit=true, приложение должно самостоятельно вызывать rd_kafka_offset_store() для сохранения оффсета для последующего авто-коммита.
Для обработки таких ошибок можно выбрать следующие стратегии:
- Залогировать и отбросить сообщение. Похоже на вариант куда сверху, но try-catch блок будет внутри обработчика. Легко реализовать, но очевидный недостаток - это потеря сообщения.
- Dead letter топик. Этим способом можно разбираться с нетранзиентными сообщениями (теми, которые всегда вызывают ошибки). Эти сообщения отправляются в dead letter топик и становятся доступны для последующей обработки. Плюс здесь в том, что мы не теряем подобные сообщения и можем выстроить логику их восстановления или очистки. Недостаток в том, что наш консьюмер становится продюсером и отправка в dead letter должна быть спроектирована так, чтобы не допускать "взрыва" числа таких сообщений.
- Retry топик и retry консьюмер. В добавок к dead letter топику мы можем организовать обработку транзиентных сообщений. Для этого будем отправлять такие сообщение в отдельный retry топик и сделаем отдельный retry консьюмер, который исходя из некоторых условий: число попыток обработки, время, внешние изменения; может принимать решение об обработке: перенаправить обратно в основой топик, отложить ещё или отбросить. Недостатками такого способа является усложнение логики обработки, невозможность использования если важен порядок
Недоступность консьюмера
Клиент периодически отсылает heartbeat сигналы на брокер для проверки доступности
Частота их отправки определяется параметром heartbeat.interval.ms. Если брокер не получил ни одного heartbeat за session.timeout.ms, то консьюмер считается недоступным и будет произведена ребалансировка.
[!warning]
Отправки heartbeat недостаточно для избежания ребалансировки. Это так же зависит от параметраmax.poll.interval.ms. это максимальная задержка между вызовами poll консьюмера, после которого он считается недоступным
Aliases:
caution,attention