Repository and Causal Consistency

Автор раздела: Ivan Zakrevsky

Контекст

Версия агрегата часто используется для организации "Optimistic Offline Lock". С этой целью она обычно инкрементируется однократно на одну транзакцию, даже если при этом было создано несколько Domain Events (при их in-process обработке).

Иначе дело обстоит в Event Sourced Aggregate, где версия инкрементируется на каждое Доменное Событие, поскольку она определяет положение этого События в потоке/журнале Событий. Взаимосвязанные События для обеспечения атомарности в таком случае помечаются идентификатором запроса в виде CorrelationId.

Проблема заключается в том, что если мы хотим сделать Domain Events публичными и отправить их в шину, либо сформировать на их основе Integration Events, то существует риск нарушения очередности их доставки.

Более подробно эта тема раскрывается в заметке "О гонке сообщений в условиях конкурирующих подписчиков".

Причин нарушения очередности доставки может быть несколько. Например, Конкурирующие Подписчики. В таком случае проблема решается обычно партиционированием каналов шины, используя идентификатор агрегата для вычисления партиции, таким образом направляя все сообщения одного агрегата к единственному обработчику, устраняя петлю в топологии маршрута сообщений, а значит, устраняя условия для возникновения гонки сообщений.

Однако, не все шины поддерживают партиционирование каналов, либо эта поддержка оставляет желать лучшего. Кроме того, в интервью "Modeling Uncertainty with Reactive DDD" by Vaughn Vernon reviewed by Thomas Betts, Vaughn Vernon утверждает, что это не спасает.

В книге "Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka" Vaughn Vernon объясняет почему: петля может быть образована не только конкурирующими подписчиками, но и самой топологией маршрутов сообщений. Грубо говоря, если для одного сообщения у нас маршрут A->C а для другого A-B->C, то гонка уже не исключена.

В этой же книге Vaughn Vernon отсылает за решением к статьям:

Рассмотрим классический пример. Глава родительского комитета школьного класса удалил классного руководителя из группы рассылки (E1), разослал всем оставшимся сообщение о сборе денег на подарок классному руководителю (E2), и вернул классного руководителя назад (E3).

Вот теперь давайте представим, что произойдет, если событие E1 где-то задержалось в шине, и было обработано после E2.

Решение сводится к организации Causal Consistency посредством векторных часов, используя версию агрегата в качестве их значения. Каждое сообщение снабжается списком своих Causal Dependencies.

И здесь обнажается проблема, т.к. при инкрементировании версии агрегата единожды на транзакцию, возникает риск образования более одного публичного Доменного События с одной и той же версией Агрегата. А это означает, что версию Агрегата не получится использовать для восстановления очередности Событий.

Есть еще одна проблема. Если за один инкремент версии агрегат издал несколько событий, то при обновлении ReadModel по событиям её версия разъедется с версией агрегата, и будет утрачена возможность отслеживать, например, с frontend, докатились ли изменения до ReadModel. Но это так же означает, что доменное событие должно производиться на абсолютно каждое изменение состояния агрегата.

Варианты решений

Версионирование состояния Bounded Context

Для указания последовательности События можно использовать порядковый номер механизма доставки, например, автоинкрементальный первичный ключ таблицы Outbox внутри Bounded Context.

Недостатком такого решения является существенное понижение уровня параллелизма вплоть до Sequential Consistency.

Другим недостатком такого решения является трудоемкость миграции с in-process обработки Доменных Событий на out-of-process.

Отдельная версия на Событие

Поскольку Доменное Событие является фактом изменения состояния Агрегата, логично предположить, что образование каждого нового События должно инкрементировать версию Агрегата, как это общепринято в Event Sourced Agregate.

Несущественным недостатком такого решения является усложнение реализации оптимистической блокировки, поскольку инкрементация теперь происходит вне SQL-запроса. Критерий выборки обновляемой строки теперь будет вычисляться как математическая разница версии Агрегата и количества Доменных Событий в нем.

Несколько сложнее дело обстоит с объединением (пакетированием) SQL-запросов, сформированных не из состояния агрегата, а из доменных событий, т.к. версия агрегата сдвигается с каждым запросом. Но этот вопрос тоже несущественный, и легко решается вынесением оптимистической блокировки в отдельный (либо в объединенный, при отсутствии изменения вложенных сущностей) SQL-запрос.

К достоинствам такого решения можно отнести простоту миграции с in-process обработки Доменных Событий на out-of-process, поскольку интерфейс событий остается неизменным.

Вывод

Вариант с инкрементацией версии Агрегата на каждое Доменное Событие выглядит более приемлемым решением.