quarta-feira, 19 de maio de 2021

Threads - Exemplo de uso para tarefas pesadas: synchronized, wait, notify e mais...

      Olá, nesse post vou mostrar um estudo de caso de como separar uma tarefa pesada – como um processamento de dados que envolva várias entidades e regras de negócio – e dar algumas dicas de como trabalhar essa questão com threads.

 

      Primeiro vamos ao caso:

 

     Temos uma tarefa que consiste em processar um grande volume de dados, (poderia ser uma folha de pagamento, contas de serviços de clientes, etc). Essa atividade hoje é feita da seguinte forma:

 

1.    Em um laço for uma consulta paginada retorna de 1000 em 1000 registros usados no início do processo (poderia ser o id dos colaboradores no caso da folha de pagamento, dos clientes no caso das contas de clientes, etc).

2.    Para cada página, temos um outro laço for que percorre cada um dos 1000 registros e inicia o processo de consultar os demais dados necessários, realizar processamento (todos os descontos, acréscimos, multas, juros, etc...), e gravar o resultado.

3.    Vai para próxima página, busca mais 1000 registros e continua.

 

Veja o código (esse código não compila, é apenas um exemplo didático. Mais à frente o código que uso threads está OK, compila e roda perfeitamente):

 

/*representa a classe que da início ao nosso processo*/

public class SincronizacaoThreads {

 

    public static void main( String[] args ) {

 

        ClasseDeNegocio classeDeNegocio = new ClasseDeNegocio();

        classeDeNegocio.executaTarefaPesada();

    }

 

}

 

public class ClasseDeNegocio {

 

    public void executaTarefaPesada() {

 

        /*

         * Digamos que eu tivesse um grande processamento de dados muito pesado para fazer,

         * Nesse ponto eu prepararia uma consulta paginada para buscar os dados, 1k por pagina.

         * Esse primeiro for representa essa paginação, aqui simulo 3 páginas.

         */

        for ( int j = 0; j < 3; j++ ) {

 

            // aqui buscaríamos os 1000 registros no banco e preencheríamos nossa lista

            System.out.println( "Tarefa pesada rodada número " + j );

 

            // Aqui percorreríamos todos os 1000 registros

            for ( int i = 0; i < lista.lenght; i++ ) {

 

                /*

                 * executando todo processo pesado, item a item...

                 * chamando outras classes de negócio necessárias etc...

                 */

            }

        }

    }

}

 

Vamos otimizar esse processo usando threads e ver algumas questões importantes na construção de soluções usando essa abordagem. A imagem abaixo mostra um rascunho de como era e como ficou o código:

 


Agora vamos ao código. São duas classes apenas, você pode copiar e colocar o código para rodar na sua máquina para facilitar o entendimento (elas estão funcionando rsrsrs):

 

/*representa a classe que da início ao nosso processo*/

public class SincronizacaoThreads {

 

    public static void main( String[] args ) {

 

        ClasseDeNegocio classeDeNegocio = new ClasseDeNegocio();

        classeDeNegocio.executaTarefaPesada();

    }

 

}

 

public class ClasseDeNegocio {

 

    // esse contador serve para validar se posso mudar de página

    private int contadorThreadsFilhasAtivas = 0;

 

    public void executaTarefaPesada() {

 

        /*

         * Digamos que eu tivesse um grande processamento de dados muito pesado para fazer,

         * Nesse ponto eu prepararia uma consulta paginada para buscar os dados, 1k por página.

         * Esse primeiro for representa essa paginação, aqui simulo 3 páginas.

         */

        for ( int j = 0; j < 3; j++ ) {

 

            // mostro o múmero da pagina (0 a 2)

            System.out.println( "Tarefa pesada rodada número " + j );

 

            // Começando o trabalho - Dividindo a tarefa em 10 threads.

            for ( int i = 0; i < 10; i++ ) {

 

                // aqui eu criaria a sublista com 100 elementos

 

                // incremento o contador de threads filhas

                contadorThreadsFilhasAtivas++;

 

                new Thread( new Runnable() {

 

                    @Override

                    public void run() {

 

                        // aqui passaria a sublista com 100 elementos (1k / 10) para o processaDados

                        processaDados();

                    }

                }, "thread " + i ).start();

            }

 

            /*

             * só posso continuar a paginação e buscar mais 1k de dados após todas as 10 threads

             * auxiliares acaberem seu serviço, para evitar sobrecarga no sistema e no banco, já

             * estamos falando de uma tarefa bastante pesada de processamento de dados.

             */

            synchronized ( this ) {

                try {

                    System.out.println( "Esperando a execução das threads filhas para continuar" );

                    while ( contadorThreadsFilhasAtivas != 0 ) {

                        this.wait();

                    }

                } catch ( InterruptedException e ) {

                    e.printStackTrace();

                }

            }

 

            System.out.println( "Fim das threads filhas da página " + j + " e agora posso continuar" );

            System.out.println();

        }

    }

 

    public void processaDados() {

 

        String nome = Thread.currentThread().getName();

        System.out.println( nome + " entrando no processaDados" );

 

        try {

            /*

             * simula o tempo de execução do método.

             * o tempo dos sleep simula o processamento dos 100 dados que o método receberia

             * executando todo processo pesado, item a item...

             * chamando outras classes de negócio necessárias etc...

             */

            Thread.sleep( 9000 );

        } catch ( InterruptedException e ) {

            e.printStackTrace();

        }

 

        System.out.println( nome + " terminando execução do processaDados" );

        System.out.println( nome + " saindo do processaDados" );

 

        synchronized ( this ) {

            // decremento o contador de threads filhas antes de sair

            contadorThreadsFilhasAtivas--;

            this.notify();

        }

    }

}

 

Vamos analisar o código e algumas dicas.

 

A classe ClasseDeNegocio tem apenas 2 métodos: executaTarefaPesada() e processaDados(). Se fosse um código real, o método processaDados() receberia um lista com 100 objetos para ele processar (já que minha paginação pega 1.000 registros e divido em 10 threads no laço).

 

Antes de usarmos threads, o método executaTarefaPesada fazia tudo sozinho. Agora, ele apenas busca os dados e divide para as threads fazerem o trabalho. Você pode se perguntar: por que cada thread não busca seus próprios dados? Porque facilmente as threads poderiam buscar os mesmos dados e isso poderia ocasionar sérios problemas de duplicação de dados entre outras coisas.

 

Fizemos uso de wait() após disparamos as threads que vão fazer o trabalho pesado e deixamos ela em espera até todas as threads terminarem, e então seguimos para a próxima página. Esse controle é feito através da variável contadorThreadsFilhasAtivas que criamos na ClasseDeNegocio e começa com valor 0, porém a cada thread que lançamos esse valor é acrescido de 1, e no final do método processaDados() ela é decrescida em 1.

 

Logo que decrescemos o valor, usamos o notify() método processaDados() que serve para dizer a thread que deixamos parada com o wait() que ela pode continuar seu trabalho. Como o código que chama o wait() está dentro de um laço, ele vai executar o laço mais uma vez e checar se o valor da variável contadorThreadsFilhasAtivas já chegou a 0, caso contrário, ele executa outra chamada a wait() e entra em espera novamente. Isso ocorre até que todas as threads tenha terminado seu trabalho, e o código segue seu fluxo.

 

Quando usamos o wait(), notify() ou notifyAll(), nem sempre ele vai ser chamado a partir do “this”, mas pode ser de um parâmetro passado no método, etc. O ponto de aplicação do wait(), notify() ou notifyAll() é o objeto ou classe de negócio, enfim, aquilo que é de fato acessado de forma paralela. Ao dar um wait() em um objeto, faz-se necessário um notify() ou notifyAll() para o mesmo objeto, afim de que ele volte a execução, caso contrário ficará fadado a espera eterna. Wait(), notify() ou notifyAll() só podem ser usados dentro de um bloco synchronized.

 

Usamos também o synchronized. Ele pode ser usado tanto em blocos específicos de código, como foi o nosso caso, quanto na declaração do método. Qual a diferença? Bem, quando declaramos algo como sincronizado, isso significa que será permitido apenas um acesso por vez aquele trecho de código. Isso é muito ruim, pois cria gargalos... pra quê usar várias threads se eu deixar os métodos com 1 acesso por vez? Não faz sentido! Mas, as vezes é necessário. E isso vai depender de cada regra de negócio. No nosso caso, apenas o trecho que controla a variável contadorThreadsFilhasAtivas  teve essa necessidade.

 

Então, onde usar synchronized? Onde houver chance de duplicação de dados ou erro de calculo em variáveis compartilhadas entre threads como era o caso do nosso contadorThreadsFilhasAtivas ou coisas do tipo. Sincronizar o método ou um trecho? Sempre o menor possível, mantendo integridade e consistência para não perder muita performance.

 

O controle da execução de blocos sincronizados é válido para cada instância da classe. Ou seja, mesmo que anote um método com synchronized, se eu chama-lo a partir de duas instancias distintas, ele será executado ao mesmo tempo.

 

Acredito que para esse post era isso. Espero que tenha ajudado.

Nenhum comentário:

Postar um comentário