To modify the collection within the function onmessage
, you need to use a reference to the collection itself (self), instead of the variable 'result'
:
self[event.data[0]] = event.data[1];
Just to complement the answer:
The Parallel.Foreach method in the framework. NET performs a processing on each of the elements of an array (map) and this processing may or may not be done using parallelism.
At the time of execution of the method, the . NET evaluates available resources (e.g., number of processors and memory) and the amount of processing available on the machine to determine whether:
- it is worth or not to run the map operation in parallel
- in how many threads the work will be divided (if there is an advantage in parallelism)
Assuming your Javascript code runs on a web page, you have no way to access all of this system information to decide whether it’s worth running parallel.
Even if you prefer to 'force' parallel execution, you need to determine how many threads will run the service, and for that you need to know, for example, how many processors (physical and logical) are available in the system, because if the number of threads exceeds the number of available cores, there may be a loss of performance.
In your code, you are creating a thread per element of the array and it will probably reduce performance as the effort to create threads will be much greater than the gain with parallel processing.
A common solution for this type of implementation is to create a pool of threads and divide the array into parts, which will be processed separately on each thread.
In the method onmessage
, the code of pool collect results and update array elements.
After the last thread runs, you call callback finish
.
An important point that needs to be evaluated is also that communication between the main thread and the Workers is done by copying the parameters and not by reference, and this can generate performance loss depending on the algorithm you will implement.
Below is a commented example of the implementation of this concept with a pool 4-threaded, but, do not recommend the use of this code in any production environment.
A suggestion for you to test the above concepts, is you implement a "stopwatch" and do several tests with different array sizes (small, medium, giant), with different thread numbers and, if possible, on different machines (with different processor quantities) to observe implementation behavior and performance.
// Determina o número de threads no pool
var NUMERO_DE_THREADS = 4
// Cria o método parallelMap
Array.prototype.parallelMap = function (callback, finish) {
var self = this;
// Pool de threads
var pool = [];
// Status da thread. false => já terminou o trabalho
var status = [];
// Corpo do objeto Worker
var source = "onmessage = " + function (event) {
// Posição no array
var posicao = event.data[0];
// Trecho do array a ser processado
var dados = event.data[1];
// Callback de processamento
eval("var cb = " + event.data[2]);
// ID da thread
var id = event.data[3]
// Efetua o processamento do trecho do array
for (var i=0; i<dados.length; i++)
dados[i] = cb(dados[i], i);
// Retorna o resultado
postMessage([id, posicao, dados]);
// Finaliza o Worker
close();
}.toString();
var blob = new Blob([source], { type: 'text/javascript' });
var _url = URL.createObjectURL(blob);
// Cria um pool de workers
for (var i=0; i<NUMERO_DE_THREADS; i++) {
// Seta o status do worker como true => trabalhando
status[i] = true;
// Cria o Worker
pool[i] = new Worker(_url);
// Processa o retorno do Worker
pool[i].onmessage = function(e) {
var id = e.data[0];
var posicao = e.data[1];
var resultado = e.data[2];
// Atualiza o array com os resultados
for (var j=0; j<resultado.length; j++)
self[posicao+j] = resultado[j];
// Sinaliza que este Worker terminou o trabalho
status[id] = false;
// Retorna se algum Worker ainda estiver trabalhando
for (var j=0; j<NUMERO_DE_THREADS; j++)
if (status[j])
return;
// Todos terminaram o trabalho, chama o callback finish
finish(self);
}
}
// Calcula o tamanho do trabalho de cada Worker, divindo o tamanho
// do array pelo número de threads
var tamanhoDoTrabalho = Math.floor(self.length / NUMERO_DE_THREADS);
// Inicia o trabalho do pool e envia cada parte do array para
// um Worker
var posicao = 0;
for (var i=0; i<NUMERO_DE_THREADS; i++) {
var trabalho;
if (i<NUMERO_DE_THREADS-1)
trabalho = self.slice(posicao, posicao+tamanhoDoTrabalho);
else
trabalho = self.slice(posicao);
pool[i].postMessage([posicao, trabalho, callback.toString(), i]);
posicao += tamanhoDoTrabalho;
};
}
var numeros = [7, 2, 4, 6, 8, 1, 3, 5, 7, 9];
numeros.parallelMap(function (numero, indice) {
return numero + Math.random()*200;
}
, function (resultado) {
console.log(resultado); // os valores da coleção numeros deveriam está alterados neste ponto.
}
);
Updating:
In response to the comment on not using the return
inside the callback, in this case it is necessary to access the element of the array through the parameter indice
, because the element is not passed by reference to the callback function, therefore it cannot be updated directly.
Changing the line:
dados[i] = cb(dados[i], i);
To:
cb(dados, i);
The function callback
you won’t need the return
.
Thus, the function call parallelMap
is as follows:
numeros.parallelMap(function (numero, indice) {
numero[indice] += Math.random()*200;
}
, function (resultado) {
console.log(resultado);
}
);
Libraries for parallel execution in Javascript:
Parallel.js - Parallel Computing with Javascript
Hamsters.js | Parallel Javascript
js threads.
Inline Worker
Can you be a little clearer about what your problem is? You need to
parallelForEach
the collection is modified? Or the functionparallelMap
is not passing the modified collection to callback?– Guilherme Nagatomo
@Guilhermenagatomo, the
parallelMap
is ok, the problem here is to modify the collection inparallelForEach
– Tobias Mesquita