Usando o padrão observer com CDI para lidar com o Kafka

Há pouco tempo passei por um problema que é rotina na vida de todo desenvolvedor: acoplamento. Identificamos que um sistema está acoplado quando é difícil mudar a implementação da solução que foi escolhida para determinado problema, quando uma classe depende de várias outras para funcionar, quando há muito o uso do design patterns RCP, dentre outros.

No meu caso, o problema era retirar mensagens de um servidor de mensageria. Teria que fazer da forma menos acoplada possível, pois a probabilidade de trocar esse servidor é grande. Não deveria nem ter relacionamento com a especificação padrão JMS, haja vista que escolhemos o Kafka como servidor e ele, por uma série de motivos, incluindo desempenho, não segue a especificação.

Talvez haja mais servidores de mensageria do que de aplicação JavaEE, então o analista tem muitos trade-offs para avaliar. De qualquer forma, mesmo que futuramente haja uma troca para um servidor JMS, as partes dos sistema que usam esse serviço não devem ser alteradas, aí que está o desafio do acoplamento.

The more coupling we have between objects, components, modules, or systems, the more we experience many consequences. These consequences include but are not limited to difficulty of modification, propagation of failure, inability to scale because of contention, and performance issues due to dependent actions. 

Encapsulando a API.

A API do Kafka é bem tranquila de usar, baseada em Producer e Consumer, portanto o primeiro desafio era encapsular o uso em uma interface mais simples ainda. Outro problema aparece, o encapsulamento. Nenhuma das partes do sistema devem saber sobre os detalhes de como foi implementado o serviço de mensageria, devem apenas saber o mínimo possível para utilizá-lo.

public interface ServicoMensageria {
       public void assinar(String topico);

       public void enviarMensagem(String topico, String mensagem);
}

Da interface acima, poderíamos diminuir ainda mais, como omitir método assinar, identificando que o contexto da aplicação foi inicializado e fazendo a assinatura automática.

Agora basta usar um framework de injeção de dependências para usar o serviço de mensageria, sem saber quem é o responsável por prover a implementação.

    @Inject
    private ServicoMensageria servicoMensageria;

    public void envia(){
         servicoMensageria.enviarMensagem("meutopico","mensagem")
    }

O padrão Observer

Agora passamos para o próximo problema, o cliente da API já fez a assinatura de um fila de mensagens e precisa ser notificado quando houver alguma mensagem na fila. Para isso, nada mais adequado do que implementar o padrão Observer. Ao invés de implementar o padrão, usei uma api que já implementa, inclusive discutida pelo Sérgio Lopes anteriormente, o CDI.

 

public class OuvidorMensageriaNFe {

 /**
 *
 * SerieMensagens é uma wrapper que encapsula uma List <Mensagem>;
 **/
 public void ouvindoMensagens(@Observes SerieMensagens serieMensagens) {

     System.out.println("mensagem recebida "+ serieMensagens);
 }
}

Lidando com o sincronismo.

Agora precisamos retirar mensagens da fila e produzir eventos do CDI. Só que aqui temos um sério problema: a API de Consumer do Kafka fica em um loop infinito while true e caso haja mensagem um evento deve ser disparado.

  try (KafkaConsumer<String, String> consumer = new KafkaConsumer<> (consumerProperties);) {

        consumer.subscribe("meutopico");

        while (true) {

           List < Mensagem > mensagens = new ArrayList <> ();
           ConsumerRecords < String, String > records = consumer.poll(pollPadrao);

           for (ConsumerRecord < String, String > record : records) {

              mensagens.add(new Mensagem(record.topic(), record.value()));
           }
             if (!mensagens.isEmpty()) {

                //cuidado redobrado aqui
                evento.fire(new SerieMensagens(mensagens));

                mensagens = new ArrayList<>();
            }

      }
  }

O problema aqui é que essa solução não escala e será muito lenta, devido a basicamente um limitador: o CDI só trabalha com eventos síncronos, ou seja, quando a Thread chega na linha evento.fire(), ela só passa para o próximo passo quando todos os responsáveis por ouvir o evento @Observer SerieMensagens terminam o seu processamento. Há como contornar esse “problema”, tanto no produtor do evento como no observador do evento. Neste, poderíamos utilizar o Asynchonous e naquele bastaria lançar o evento em uma thread separada.

 


if (!mensagens.isEmpty()) {

     final List < Mensagem > mensagensParaEnviar = new ArrayList <> (mensagens);
 
     new Thread(() -> { evento.fire(new SerieMensagens(mensagensParaEnviar));}).start();

     mensagens = new ArrayList <> ();
}

Concluindo.

Pensar em baixo acoplamento, alta coesão e encapsulamento é problema diário de qualquer desenvolvedor. Você pode até não perceber de imediato, mas o preço da manutenibilidade chega logo (Design payoff). Por fim, há várias outras features do CDI que você deveria ter no seu leque de opções e talvez no futuro, a invocação de métodos assíncronos faça parte da especificação. Se bem que o JavaEE 8 não está tão perto assim de sair, no momento passa por uma crise interna.

Tags: , ,

4 Comentários

  1. Danilo Siqueira 15/06/2016 at 07:43 #

    Muito bom!

  2. Kennedy Oliveira 15/06/2016 at 23:07 #

    Bom artigo, apenas uma sugestão, não é a melhor opção criar uma thread para lançar um evento manualmente, seria melhor ter um ThreadPool para reaproveitar as threads criadas, e também limitar a quantidade de threads que podem ser criadas, pois se tiver threads demais, vai gerar tanto context switch e overhead que essa solução não vai ser tão escalável também, eu sei que ficaria mais verboso colocar um código para gerenciar isso, mas pelo menos acho valido mencionar que o que foi feito ali não é a melhor maneira, apenas foi feito para facilitar o entendimento e deixar algum link para documentação de uma sugestão alternatia como o ThreadPool que eu sugeri.
    Abraços.

  3. Kennedy Oliveira 15/06/2016 at 23:11 #

    Complementando o comentário anterior, esqueci de mencionar, além do fato de não ser a melhor maneira criar a thread manualmente, o thread pool nesse caso deve utilizar uma ThreadFactory gerenciada pelo container JEE ou utilizar um ExecutorService, pois de acordo com a especificação do JEE, a aplicação não deve criar threads que ela mesmo gerencia, isso deve ser feito pelo Container.
    Abraços.

  4. Raphael Lacerda 16/06/2016 at 00:58 #

    Kennedy, excelente observação!!!

    No meu código eu usei o FixedPool, mas ainda vou fazer a monitoração disso. Talvez tenha que mudar para o CachedPool mesmo.

    Fica um link aí pra galera

    http://www.javaworld.com/article/2078809/java-concurrency/java-concurrency-java-101-the-next-generation-java-concurrency-without-the-pain-part-1.html

Deixe uma resposta