Event Sourcing para aplicaciones escalables

3
16156

Event Sourcing para aplicaciones escalables (no CQRS).

Índice de contenidos


1. Introducción

En este post se presenta una pequeña introducción sobre el patrón de arquitectura Event Sourcing y se expondrá un ejemplo
para ver cómo implementarlo.


2. Entorno

El tutorial está escrito usando el siguiente entorno:

  • Hardware: Portátil MacBook Pro 15′ (2,3 GHz Intel Core i7, 16GB DDR3).
  • Sistema Operativo: Mac OS High Sierra 10.13.3
  • IntelliJ IDEA 2017.3.5
  • Docker version 18.05.0-ce-rc1, build 33f00ce
  • docker-compose version 1.21.0, build 5920eb0
  • Java version 1.8.0_161


3. ¿Qué es Event Sourcing Pattern?

Event sourcing es un patrón de arquitectura (no una arquitectura en sí misma)
que se encarga de capturar todos los cambios que se pueden producir en nuestra aplicación como una secuencia de eventos.

Este patrón de arquitectura no es algo nuevo, de hecho Martin Fowler publicó un post en 2005 hablando sobre cómo
modelar los cambios en un sistema a través de eventos. En los últimos años ha tomado mayor relevancia
debido principalmente a la necesidad de escalabilidad que se les exigen a aplicaciones modernas y dado que
encaja bastante bien con la naturaleza asíncrona que presentan estas aplicaciones.


4. ¿Por qué Event Sourcing (ES)?

Hasta hace unos años era habitual que las aplicaciones presentaran tiempos de respuestas bajos, caídas frecuentes, etc.
Hoy en día los sistemas deben dar servicio 24/7 y proporcionar siempre unos buenos tiempos de respuesta;
ésto sólo se puede conseguir dejando de lado los diseños tradicionales (como las aplicaciones basadas en ACID).
Todo sistema que presente la necesidad de soportar una gran carga de usuarios o procesos debe adoptar características como:
responsividad, resiliencia, comunicación asíncrona y escalabilidad. En otras palabras, deben ser sistemas reactivos.
Event Sourcing encaja de forma natural en un sistema de estas características:

  • A diferencia de los tradicionales CRUD, cuya escalabilidad está muy limitada
    debido a que toda operación se debe realizar de forma atómica y consistente (ACID), con Event Sourcing solo se almacenan
    eventos lo que propicia un modelo óptimo para una arquitectura basada en eventos, lo que conlleva escalabilidad y
    responsividad.
  • Al tener almacenado cada uno de los eventos que han ocurrido en el sistema se obtiene información de negocio
    muy valiosa ya que en todo momento se conoce cómo el dominio ha llegado a su estado actual.
  • Auditoria sin esfuerzo al tener en todo momento un tracking del dominio.


5. Ejemplo de implementación

Como ejemplo se modelará un sistema basado en Event Sourcing junto con una arquitectura conducida por eventos (event-driven).
Se han creado dos pequeñas aplicaciones que modelan lo que podría ser un módulo de un sistema bancario
y que se comunican entre sí en base a eventos. Por razones de simplicidad no se han validado aspectos obvios, como que la persona que hace la retirada
de saldo sea la propietaria de la cuenta.
Para ello se ha dividido la aplicación en dos funcionalidades de dominio:

  • Un módulo de operaciones, encargado de hacer depósitos o retiradas de saldo de una cuenta.
  • Un módulo de cuentas que se encarga de la tramitación y validación de dichas operaciones.

5.1 Stack

Para el ejemplo se ha usado el siguiente stack:

  • Docker para desplegar cada una de las aplicaciones de forma independiente.
  • Wildfly 11 es el servidor en el cual correrán las aplicaciones dentro de los contenedores Docker.
  • Java EE 8; JAX-RS para los endpoints REST y CDI como contenedor de beans y eventos internos.
  • Kafka para bus de eventos entre las aplicaciones. Podría usarse cualquier otro bus de mensajes como RabbitMQ.

5.2 Flujo

Lo principal es entender el flujo de eventos entre las aplicaciones.
Cuando la aplicación arranca, automáticamente se registrarán servicios de escucha en el bus de eventos (kafka listeners),
que se encargan de recibir eventos de la cola y comunicar su llegada a cada aplicación.

Para interactuar con la aplicación se han habilitado diferentes endpoints REST.
Cada petición generará el siguiente flujo:

  • 1. Persistencia de cada evento. Los eventos ocurren en el pasado y nada puede cambiar que haya ocurrido,
    por lo que deben ser almacenados (de hecho el nombre de los eventos debe escribirse en pasado). Por ejemplo:
    ejecutar una acción de retirada de saldo y que posteriormente se detecte que no hay suficiente saldo en la cuenta
    debe quedar reflejado a través de eventos.
  • 2. Publicación de cada evento al bus de mensajería. El evento persistido debe ser publicado para que el sistema pueda entrar en
    un estado consistente, evitando todo lo posible la consistencia eventual (más detalles posteriormente).

5.3 Implementación

Como ejemplo se modelará el flujo completo de cómo la aplicación gestionaría un depósito de saldo en una cuenta bancaria.

Creación de una cuenta

curl -i -XPOST -d '{"ownerName":"Sergio","ownerSurnames":"Verde Caballero"}' \ 
-H 'Content-Type:application/json' \ 
http://localhost:8080/bank-account-event-sourcing/resources/accounts

HTTP/1.1 201 Created
Connection: keep-alive
X-Powered-By: Undertow/1
Server: WildFly/11
Location: http://localhost:8080/bank-account-event-sourcing/resources/operations/4ee78bab-b979-4a32-af1a-de92be94bc1b
Content-Length: 0
Date: Sun, 20 May 2018 07:20:01 GMT

Tras este paso previo se añade saldo a la cuenta:

curl -i -XPOST -d '{"account":"1178b586-2faf-4549-bc9c-b5c21d506121","quantity":50.0}' \ 
-H 'Content-Type:application/json' \
http://localhost:8080/bank-account-event-sourcing/resources/operations/deposit

5.3.1 Flujo 1 – Operación de depósito

Este es el código que modela el flujo:

OperationService.java

@Inject
OperationEventStorage eventStorage;

@Inject
EventBus eventBus;

public String deposit(String accountId, Float quantity) {
	DepositOrderPlaced depositOrder = new DepositOrderPlaced(accountId, quantity);
	eventStorage.add(depositOrder);
	eventBus.produce(depositOrder);
	return depositOrder.getId();
}

En este punto se ha creado un evento para ingresar saldo y es publicado a la cola de eventos. La configuración
actual del productor de mensajes asegura que éste se ha entregado correctamente a la cola, aunque todavía no se ha
procesado la petición.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaOrderSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1"); // wait leader broker to get response

Actualmente la aplicación se encuentra en estado de consistencia eventual: se ha realizado un depósito,
pero si en ese mismo instante (antes de que se procese el mensaje) se consulta el estado de la cuenta, el saldo será 0.
La petición REST retorna la cabecera Location, donde se encuentra la información necesaria para consultar el estado del depósito,
que en este momento estará como PLACED.

5.3.2 Flujo 2 – Orden de depósito recibida

Una vez la cola recibe el mensaje la publicará a todos los consumidores, en este caso la aplicación de gestión de cuentas,
que se encargará de:

  • 1. Validar el estado de la cuenta.
  • 2. Generar el evento que indica que el depósito se ha realizado correctamente.
  • 3. Comunicarlo de vuelta a la cola.

Este es el código del consumidor Kafka que se encarga de procesar el evento. La comunicación con el servicio se ejecuta
a partir de eventos CDI (esto no es necesario, podría comunicarse directamente con su propio servicio):

KafkaConsumer.java

	
@Inject
Event<AccountKafkaEvent> orderEvent;

@Resource
ManagedExecutorService executorService;

public void consume(@Observes @Initialized(ApplicationScoped.class) Object init) {
	executorService.submit(() -> {

		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "operations-consumer");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaOrderDeserializer.class.getName());
		Consumer<String, AccountKafkaEvent> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
		consumer.subscribe(Collections.singletonList("test"));

		try {
			while (true) {
				System.out.println("WAITING FOR CONSUMING OPERATIONS...");
				ConsumerRecords<String, AccountKafkaEvent> records = consumer.poll(Long.MAX_VALUE);
				for (ConsumerRecord<String, AccountKafkaEvent> record : records) {
					AccountKafkaEvent operation = record.value();
					orderEvent.fire(operation);
				}
			}
		}finally {
			consumer.close();
		}

	});
}

El servicio se encargará de observar eventos CDI internos recepcionando el evento de depósito (entre otros), haciendo las comprobaciones
oportunas, persistiéndolo y comunicando que el depósito se ha realizado de forma correcta o rechazándolo.

AccountService.java

void depositOrder(@Observes DepositOrderPlaced depositOrder) {
	System.out.println("DEPOSIT ORDER PLACED! checking information...");
	String accountId = depositOrder.getAccountId();
	Optional<AccountInfo> info = accountRepository.get(accountId);
	if(info.isPresent()) {
		DepositOrderAccepted externalDepositOrderAccepted = new DepositOrderAccepted(depositOrder.getId());
		com.sergio.model.events.internal.DepositOrderAccepted orderAccepted = new com.sergio.model.events.internal.DepositOrderAccepted(depositOrder.getId(), depositOrder.getQuantity());
		accountRepository.save(accountId, orderAccepted);
		eventBus.produce(externalDepositOrderAccepted);
	} else{
		reject(depositOrder.getId(), "account not found = " + depositOrder.getAccountId());
	}
}

5.3.3 Flujo 3 – Operación de depósito aceptada

Con esto el flujo ha acabo y el depósito (si todo ha ido bien) ha sido aceptado, quedando en estado ACCEPTED. El siguiente paso lógico sería consultar
la cuenta para ver que los cambios quedan reflejados:

curl -i http://localhost:8080/bank-account-event-sourcing/resources/accounts/1178b586-2faf-4549-bc9c-b5c21d506121

HTTP/1.1 200 OK
Connection: keep-alive
X-Powered-By: Undertow/1
Server: WildFly/11
Content-Type: application/json
Content-Length: 94
Date: Sun, 20 May 2018 18:05:57 GMT

{"id":"1178b586-2faf-4549-bc9c-b5c21d506121","owner":"Sergio Verde Caballero","quantity":50.0}

Como ya se ha comentado no se actualiza el dominio sino que se generan
los eventos necesarios para, partiendo de un estado inicial, poder reconstruirlo dejándolo en un estado consistente.
Para ello es necesario recuperar todos los eventos del sistema de más antiguos a más recientes.

private static class EventProcessor {
	private final AccountInfo accountInfo;
	EventProcessor(AccountInfo accountInfo) {
		this.accountInfo = accountInfo;
	}
	void process(List<AccountEvent> allEvents) {
		allEvents.forEach(ev -> ev.apply(accountInfo));
	}
}

public Optional<AccountInfo> get(String accountId) {
	AccountInfo accountInfo = accounts.get(accountId);
	if (accountInfo == null) {
		return Optional.empty();
	} else {
		List<AccountEvent> allEvents = events.getOrDefault(accountId, Collections.emptyList());
		EventProcessor processor = new EventProcessor(accountInfo);
		processor.process(allEvents);
		return Optional.of(accountInfo);
	}
}


6. Conclusiones

Aunque diseñar aplicaciones teniendo en cuenta este patrón puede aportar mayor complejidad, los beneficios
que éste aporta compensa con creces la complejidad. Hoy en día es necesario dar una vuelta de tuerca al diseño de
aplicaciones, aportando la suficiente flexiblidad como para que sean capaces de escalar a medida que la carga crezca.
Event Sourcing es un patrón de arquitectura y no una arquitectura en sí misma, por lo que
puede que encaje en ciertas partes del sistema pero que no lo haga en otras. Saber identificar qué partes deben
ser escalables es vital para que el sistema aguante la gran carga de usuarios y procesos que deben
soportan las apliciones de hoy día.


7. Referencias

3 COMENTARIOS

  1. He hecho algunas pruebas con Spring-boot + Axon. Combinar CQRS y Event sourcing es realmente sencillo con el framework.
    Os recomiendo echarle un vistazo y preparar un minitutorial usándolo. Es tan sencillo cómo poner en la config de la aplicación los datos de conexión a las BBDD y a la cola de mensajería.

  2. Hola:
    Afirmas que NO se actualiza el dominio; es decir, ¿no se actualiza el saldo de la cuenta? ¿Quieres decir que el saldo de la cuenta es virtual (que simular haberse sido actualizada), que no es físico (actualización de la cuenta propiamente dicha)?

    Gran artículo y gracias por traérnoslo. 🙂

    • Buenas Abelardo.

      No sé exáctamente a qué te refieres con que no se actualiza el saldo de la cuenta. El saldo de la cuenta sí que se actualizará, eventualmente. Esto quiere decir que la acción que afecta a la cuenta y el movimiento en cuenta en si no ocurren de forma «atómica» sino en momentos distintos del tiempo. No sé si te refieres a eso.

      Un saludo.

DEJA UNA RESPUESTA

Por favor ingrese su comentario!

He leído y acepto la política de privacidad

Por favor ingrese su nombre aquí

Información básica acerca de la protección de datos

  • Responsable:
  • Finalidad:
  • Legitimación:
  • Destinatarios:
  • Derechos:
  • Más información: Puedes ampliar información acerca de la protección de datos en el siguiente enlace:política de privacidad