How do I manage my messaging flow?

Asked

Viewed 161 times

8

I have this Javascript code that represents my connection to the Websockets server, where it receives and sends messages to the server:

var connection = new WebSocket('ws://127.0.0.1:2333');

connection.onopen = function () { // abriu a conexão com o servidor 
};

connection.onmessage = function (e) { // recebe do servidor
};

function sendToServer(data) { // envia mensagens para o servidor
   connection.send(data)
}

How do I make sure that messages are not sent and received simultaneously, for example if the entrance gate onmessage is busy receiving messages from the exit gate sendToServer() will not send anything until all entries are completed. With these three criteria:

  • No message should be lost
  • Everything has to happen automatically
  • The sequence must be respected

When I say that no message should be lost I say that if a server message comes while an output message is being worked on it will be "saved" and be "worked on" as soon as possible.

When I say that everything has to happen automatically I mean that nothing can stand still, if a message was saved to be executed as soon as possible, it has to be executed as soon as possible, it cannot be "forgotten"..

And finally the sequence, the sequence must be respected, but the sequence in which messages arrive/leave inside the script. Assuming a message A, coming from the server and then while the message A is "worked" comes another message from named server B and after B another from the called server arrives C and while all these, (A, B and C), wait in the queue to be "worked" the user orders that three other messages should be sent to the server, first D, afterward E and then F, in that order respectively.

Assuming all this, the final output/input or "job" sequence of this script should be:

A -> B -> C -> D -> E -> F

i think of it as an airport.. but it’s kind of complex for me. I tried to apply the semaphore but I did not succeed so I come to appeal..

"job" = send/receive to/from server

  • In what situations is this occurring? Javascript is single-threaded by default (except in some degenerate cases, or if you use web Workers), so that when one of the above codes starts running others will not start until the first one has finished. Or is your code stream another? Like, there are multiple receipts on onmessage, and only when something special happens (an "end of entries" message for example) is that the sendToServer can perform. It’s something like that?

  • Seeing Edney Pitta’s answer I came up with another question: when you send a message you need to wait for one answer before sending another? Or can you send several in sequence as long as the system is in the appropriate state? If you need to wait, my answer below is not enough, it would need to be adapted.

  • @mgibsonbr when messages are received they go through normal codes anyway, no webworkers because the codes are simple nothing of complex executions that take too long (that pass of 1 second). If a message is received now, the doors must be closed (from receiving and from "sending") until that stream is handled, after it is handled it can continue to receive messages and send, and also a message should not be sent at the same time as it receives a message. It has to be one at a time not to cause conflict in other things..

  • @mgibsonbr no, it is not necessary to expect a response from the server.. can continue normally.. if the server sends a reply it should be treated in the way I told you..

  • @mgibsonbr but if it is legal to put my code that treats receiving/sending inside a webworker I can put.

  • No, webworkers only complicate things... If not necessary, do not use. As for the other points, I believe that my reply would then work for your case: when receiving the message, mark podeEnviar as false, treat the message and at the end mark it again as true and clear the line. Note that if all treatment occurs within a single function (no events, setInterval, etc.) then neither is this strictly necessary - given the nature single-threaded of browser. But it doesn’t bother you, so you can do it anyway...

  • I suggest rephrasing the question, it was very confusing, both the title and the content.

Show 2 more comments

2 answers

10


You need a queue: whenever the sendToServer is called, put the data in the queue instead of sending it straight. At the end, of the two:

  • If the system is in a state that allows messages to be sent, send them at once (only one);
  • Otherwise do not send.

When the system changes from "locked" to "available" status, send everything in the queue (so you don’t have to wait for the next call sendToServer.

var connection = new WebSocket('ws://127.0.0.1:2333');

    var fila = [];
    function esvaziaFila() {
        while ( fila.length > 0 ) {
            if ( !podeProcessar )
                return;
          
            var proximo = fila.shift();
            if ( proximo.envio )
                connection.send(proximo.dados);
            if ( proximo.recebimento )
                processar(proximo.evento);
        }
    }
    var podeProcessar = false;
    
    connection.onopen = function () { // abriu a conexão com o servidor 
        document.body.innerHTML += "<pre>Abriu a conexão</pre>";
        podeProcessar = true;
    };
    
    connection.onerror = function (error) { // ocorreu um erro na comunicação
    };
    
    connection.onmessage = function (e) { // recebe do servidor
        rec("cliente", e);
        fila.push({ recebimento:true, evento:e });
        esvaziaFila();
        
    };

    function processar(evento) { // recebe mensagens do servidor
      document.body.innerHTML += "<pre>Cliente vai processar " + JSON.stringify(evento) + "; não faz mais nada até estar pronto!";
      podeProcessar = false;
      
      tarefaDemorada(evento);
    }
    
    function sendToServer(data) { // envia mensagens para o servidor
       fila.push({envio:true, dados:data});
       esvaziaFila();
    }

/********** Mockups para testar ***************/
function rec(classe, dados) {
    document.body.innerHTML += "<pre class='" + classe + "'>" + classe + " recebeu: " + JSON.stringify(dados) + "</pre>";
}
function env(classe, dados) {
    document.body.innerHTML += "<pre class='" + classe + "'>" + classe + " enviou: " + JSON.stringify(dados) + "</pre>";
}

function WebSocket() {
  this.send = function(data) {
    rec("servidor", data);
  }
  // Manda algumas mensagens no futuro
  var self = this;
  setTimeout(function() { self.onopen() }, 500);
  setTimeout(function() { env("servidor","A"); self.onmessage("A") }, 2000);
  setTimeout(function() { env("servidor","B"); self.onmessage("B") }, 4000);
  setTimeout(function() { env("servidor","C"); self.onmessage("C") }, 6000);
}

setTimeout(function() { env("cliente","D"); sendToServer("D"); }, 8000);
setTimeout(function() { env("cliente","E"); sendToServer("E"); }, 10000);

function tarefaDemorada(evento) {
    setTimeout(function() {
        document.body.innerHTML += "<pre>Terminou de processar " + JSON.stringify(evento) + "; pode processar o resto.";
        podeProcessar = true;
        esvaziaFila();
    }, evento == "B" ? 5500 : 1250);
}
.servidor {
  color: blue;
}

.cliente {
  color: red;
}

Alternative: graph of dependencies

If you have a complex sending/receiving flow, one way to manage this complexity is to create a dependency graph between your messages, both incoming and outgoing. I suggest this - not a total order - as it is complicated to say that "event A occurred before event B" when A originated on a machine (e.g.: client) and B in another (e.g.: the server)[1].

An example combining the queue technique with a dependency graph would be the following:

var fila = [];
function processaFila() {
    while ( fila.length > 0 ) {
        var proximo = fila[fila.length-1];

        if ( !proximo.pronto ) // Se está aguardando outro processamento
            break;             // Não processa mais nada

        fila.pop();
        if ( proximo.envio ) // Se está pronta pra ser enviada
             connection.send(proximo.dados);
        if ( proximo.recebimento ) // Se foi recebida e tem de ser processada
            proximo.processar(proximo.evento);
    }
}

function incluiNaFila(job) {
    var i = fila.length;
    while ( i > 0 ) {
        // Se o próximo da fila precisa ser executado depois da tarefa atual
        if ( dependeDe(fila[i-1], job) )
            break; // Coloca a atual no começo da fila

        // Senão, coloca a atual depois do primeiro da fila
        fila[i] = fila[i-1];
        i--; // Repete a lógica pro segundo da fila, etc
    }
    fila[i] = job;
}

connection.onmessage = function (e) {
    incluiNaFila({
        recebimento:true, pronto:true,
        evento:e,
        processar:function(e) { ... }, // Função que processa a entrada
        ... // outros campos que determinam a dependência
    });
    processaFila();
}

function sendToServer(data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        ... // Outros campos que determinam a dependência
    });
    processarFila();
}

function dependeDe(tarefaA, tarefaB) {
    /* Aqui entra sua lógica específica.
       Ela deve retornar false se a tarefa B pode ser executada depois
       de A (B é uma tarefa mais nova que A), ou true se A precisa
       esperar B estar pronta pra executar.
    */
}

After a quick read on your example in Pastebin (in the comments), I would suggest something like starting point:

var sequencial = 0; // Cada mensagem de iniciativa do servidor ou do cliente
                    // recebe um id sequencial; mensagens que são respostas
                    // a outra mensagem recebem o mesmo id da mensagem original.

connection.onmessage = function(e) {
    var id = sequencial++;
    incluiNaFila({
        recebimento:true,
        evento:e,
        processar:function(e) {
            var aguradando = {
                recebimento:true, processar:function(){},
                pronto:false, id:id
            }

            incluiNaFila(aguardando);
            funcaoquetrataamensagem(.......).then(function() {
                aguardando.pronto = true;
                processaFila();
            });
        },
        id:id
    });
    processaFila();
}

// Envia uma mensagem, sem impor nenhuma ordem específica (iniciativa do cliente)
function sendToServer(data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        id:sequencial++
    });
    processarFila();        
}

// Envia uma mensagem como resposta a uma mensagem recebida
function respondToServer(idMensagemOriginal, data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        id:idMensagemOriginal+1 // Garante que só será enviada quando a
                                // mensagem original tiver sido processada
    });
    processarFila();        
}

function dependeDe(a, b) {
    return ( a.id > b.id );
}

In this example the messages received and the messages sent on the client’s initiative will be processed in the order (queue), while messages that are reply to another message will be processed only after the original message has been fully processed. Any message that needs to "do more things" before finishing its processing blocks the entire queue - by inserting a new task into it aguardando with the same id, and hence the same priority, of the original message.


[1]: There are several attempts to solve this problem, but none perfect. Choosing a side of communication to be authoritative can cause starvation on the other; The technique Operational Tranform is promisory, but does not apply to all cases; in some scenarios (e.g., the bitcoin network), the Proof of work can be used to reduce the problem, but does not eliminate it altogether. Etc.

  • Hi, I updated the question trying to explain as much as I can. I ran your script to test raw and then I created two functions SR to simulate a receipt and SE to simulate sending to the server. No onmessage I put in a function that takes time, to see if the line really worked. The problem was that he grouped the events in the queue (at the time of execution) and gave preference to the receipts, coming from the SR. See code http://pastebin.com/PpneGPBm (sorry not to use jsfiddle, it does not work properly on my machine)

  • just to point out that in this test I used a test server, which sends a message as soon as the user logs in, to test the queue. You can see this print of what I said: http://s18.postg.org/p10kn9bnd/image.png

  • Look, you can’t help debug a program without knowing what it’s doing... Only with your code in Pastebin, but incomplete, without the server part, I have no idea what might be wrong. Have you tried creating a Minimum, Complete and Verifiable Example, that you don’t need a server, that people can just run and see what’s going on? I’ll edit my answer by putting my code into Stacksnippets so it’s easy to test. It’ll take a little while, I’ll notify you by comment as soon as I’m done, okay?

  • @Elaine Pronto! Take a look at the first example (the one that uses the queue, without this question of dependencies) and see if it does what you want. The messages were "worked" in the order, none was lost. The key point is to block queue handling (podeProcessar = false;) when a job starts, and release this treatment and clear the queue (podeProcessar = true; esvaziarFila();) when the job is over - it doesn’t matter if it happened in the code itself, in response to an event, a timeout, on then of a promise, etc. Try to adapt this to your real code.

2

I don’t know if the Websocket object has some kind of control, but you can control it manually

var connection = new WebSocket('ws://127.0.0.1:2333');
var available = true;

connection.onopen = function () { // abriu a conexão com o servidor 
    // mantém available = false
};

connection.onerror = function (error) { // ocorreu um erro na comunicação
    available = true;
};

connection.onmessage = function (e) { // recebe do servidor
    processa(e);
    available = true;
};

function sendToServer(data) { // envia mensagens para o servidor 
    var interval = setInterval(function () {
        if (available) {
            clearInterval(interval);
            available = false;
            connection.send(data);
        }
    }, 1000)
}
  • 1

    I find it very dangerous. If the sent for false as many calls are made to the function sendToServer(), many would be created while running endlessly. You can easily lock the browser up because the while wasn’t made for it. Your idea is good but dangerous. But the while could be exchanged for a setInterval for example, what would prevent this problem.

  • @Dontvotemedown Well pointed out, you’re right

  • For a single message, this works, but if you have several such code will cause them to be sent [potentially] out of order.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.