Pthreads, Buffer Consumer Producer Problem in C

Asked

Viewed 427 times

0

The Goal of the program is to give in the command line nt=number of tasks that will be created , an integer n, and nbloco, number of interactions that each thread can have to calculate the sum of the squares of n. also there must be a buffer with the same size of nt.

I THINK THE PROBLEM WILL BE IN THE VOIDS

So far I’ve been able to do the threads spwan, but I’m having some problems with the buffer in the wheel calculations but it doesn’t show the desired results.

First I created an accessible structure

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

  int nbloco;                   /* numero de elementos do bloco a processar */
  int nt;                       /* numero de tarefas */
  int n;                        /* numero inteiro a processar */
  int nt_somadoras;             /* caso queira mais que uma somadora */
  int soma_global;              /* a soma total dos elementso calculados */
  int items_buffer;
typedef struct {

/* This structure is passed to the threads and contains all necessary
   information for computations */

  pthread_mutex_t * mutex;      /* Mutex */
  pthread_cond_t * cond;        /* Main Condition variable */
  pthread_cond_t * produzir; // signaled when items are removed
  pthread_cond_t * consumir; // signaled when items are added
  int *buffer;                  /* buffer partilhado */
  int *matriz;                  /* Matriz com os valores a calcular */
  int numero_thread;            /* número das threads */
  int inicio;                   /* Elemento que deve começar o cálculo */


}estrutura_global;

void * calculadora(void *);              /* Declaração da finção calculadora */
void * somadora(void *);                 /* Declaração da tarefa somadora */


int main(int argc, char ** arcv){

    int a, i, z, x, y;                                  /* defenição da variavel i */


    /* ler dados da linha de comendos*/

    nt=atoi(arcv[1]);
    n=atoi(arcv[1]);
    nbloco=atoi(arcv[1]);
    nt_somadoras=1;

    /* Verificação de dados */


    /* Alocação de memorias */


    estrutura_global * estrutura = malloc(sizeof(estrutura_global));
    estrutura->mutex = malloc(sizeof(pthread_mutex_t));
    estrutura->cond = malloc(sizeof(pthread_cond_t));
    estrutura->produzir = malloc(sizeof(pthread_cond_t));
    estrutura->consumir = malloc(sizeof(pthread_cond_t));
    pthread_t * calculadoras = malloc(sizeof(pthread_t)*nt);
    pthread_t * somadoras = malloc(sizeof(pthread_t)*nt_somadoras);
    pthread_attr_t * attr = malloc(sizeof(pthread_attr_t));
    estrutura->matriz = malloc(sizeof(int)*n);
    estrutura->buffer = malloc(sizeof(int)*nt);

    /* Inicializações */

    pthread_mutex_init(estrutura->mutex,NULL);
    pthread_cond_init(estrutura->cond,NULL);
    pthread_cond_init(estrutura->produzir,NULL);
    pthread_cond_init(estrutura->consumir,NULL);
    pthread_attr_init(attr);
    pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM);  /* The thread competes for resources with all other threads in all processes on the system  */
    soma_global=0;
    estrutura->numero_thread=0;
    int items_buffer=0;

    /* Filling arrays with numbers */

    for(a=0;a<n;++a){
    estrutura->matriz[a]=a+1;}


     /*Preparing to spawn threads. Mutex + condition variable are used
     to ensure that the thread is spawned and initialized before
     altering the thread structure variables for the next thread */

    /* Lock the mutex to use pthread_cond_wait */

    pthread_mutex_lock(estrutura->mutex);

    /* Compute the number of elements to calculate for each thread and
     initialize the number to 0 */

    if(nt<nbloco){
    nbloco = nbloco + ((n/nt)-nbloco);}

    if(nt==nbloco && nbloco==n){
    nbloco = 1;}

    /* Spawn threads */

    for(i=0;i<nt_somadoras;++i){
    pthread_create(&somadoras[i],NULL,somadora,estrutura);}

    for(z=0;z<nt;++z){
    pthread_create(&calculadoras[z],NULL,calculadora,estrutura);
    ++estrutura->numero_thread;                                                 /* Setting the thread number, start and count */

     /* Here we deal with the situation when the array size is not a multiple of number of threads */

    if(z==(nt-1) && n%nt){
    nbloco = n-nbloco*z;}

    estrutura->inicio = z*nbloco;

    pthread_cond_wait(estrutura->cond,estrutura->mutex);    

    }

    /*Unlock the mutex, it will be used by the threads to add their partial sums to the sum */

  pthread_mutex_unlock(estrutura->mutex);

Here I hope the therads are finished

  /* Wait until the threads are done. */
  for(x=0;x<nt;++x){
  pthread_join(calculadoras[x],NULL);}

  for(y=0;y<nt_somadoras;++y){
  pthread_join(somadoras[z],NULL);}



  /* Print the result */
  printf("The sum of numbers from 1 to %d is %d\n",n,soma_global);

Here I set the memory free

  free(estrutura->matriz);
  pthread_cond_destroy(estrutura->cond);
  free(estrutura->cond);
  pthread_cond_destroy(estrutura->produzir);
  free(estrutura->produzir);
  pthread_cond_destroy(estrutura->consumir);
  free(estrutura->consumir);
  pthread_attr_destroy(attr);
  free(attr);
  pthread_mutex_destroy(estrutura->mutex);
  free(estrutura->mutex);
  free(calculadoras);
  free(somadoras);
  free(estrutura);

  return 0;
}

This is my first void of calculating tasks, they when I run they are not doing the required number of interactions. and I don’t know what it is but I don’t think they’re writing for the buffer.

void * calculadora(void * estrutura_calculadora){

int a, soma_parcial;

estrutura_global * const estrutura = estrutura_calculadora;

pthread_mutex_lock(estrutura->mutex);

int * const matriz = estrutura->matriz;
int * const buffer = estrutura->buffer;
int const numero_thread = estrutura->numero_thread;
int const inicio = estrutura->inicio;
pthread_cond_signal(estrutura->cond);

pthread_mutex_unlock(estrutura->mutex);

int const fim = nbloco + inicio;

for(a=inicio;a<fim;++a){

    if(items_buffer == nt) {                                         // full
       pthread_cond_wait(estrutura->produzir, estrutura->mutex);     // wait until some elements are consumed
        }

    soma_parcial= matriz[a]*matriz[a];
    buffer[items_buffer]=soma_parcial;
    printf("Sou a tarefa %d a minha soma parcial é %d e o buffer é %d\n", numero_thread, soma_parcial, items_buffer);

    pthread_cond_signal(estrutura->consumir);
}

pthread_exit(NULL);

}

The second void is the task that adds up, which seems not to be working due to the reading and cleaning of the buffer, which I cannot do.

void * somadora(void * estrutura_somadora){

int c, b;

estrutura_global * const estrutura = estrutura_somadora;

pthread_mutex_lock(estrutura->mutex);

int * const buffer = estrutura->buffer;

pthread_cond_signal(estrutura->cond);

pthread_mutex_unlock(estrutura->mutex);

for(c=0;c<=n;++c){

    pthread_mutex_lock(estrutura->mutex);
    if(items_buffer == 0) {                                     // empty
    pthread_cond_wait(estrutura->consumir, estrutura->mutex);   // wait for new items to be appended to the buffer
    }

    for(b=0;b<=items_buffer;++b){
        soma_global+=buffer[c];
        printf("Sou a tarefa somadora a soma global é %d\n",soma_global);
}
pthread_cond_signal(estrutura->produzir);
pthread_mutex_unlock(estrutura->mutex);
}
pthread_exit(NULL);

}

1 answer

0

The answer to anyone interested

#include <pthread.h>    //utilizado para as pthreads
#include <stdio.h>      //utilizado para os printf
#include <stdlib.h>     //utilizado para o malloc

int nt,n,nbloco;                                                                        // defenição das variáveis base pedidas
int resultado_esperado = 0, *matriz, *buffer,sinal_saida,soma_global, items_buffer=0;   //defenição das variaveis udadas para calculo e controlo do programa

pthread_mutex_t indice_mutex, criacao_thread_mutex,buffer_mutex,soma_final_mutex;       // defenição dos mutex para exclusividade, poderia ser usado um so mutex para garantir que nao exista confusoes de desbloqueio opteou-se por criacao de mutex exclusivos para as operacoes

//CASO USE O PROGRAMA COM A FUNCAO SCHED YIELD
//DESATIVAR AS CONDICOES PRODUZIR E CONSUMIR

pthread_cond_t cond/*, produzir, consumir*/;                                                // defenição de condições para sinalizar threads criadas e sinalizaao de buffer cheio (mais rapido do que usar o sched_yield as duas opcoes estao acessiveis)

static int numero_para_comeco = 0;                                                      //numero de controlo para saber que bloco a thread vai processar

typedef struct {

    //esta estrutara vai passar os valores para as threads 
    //valores que vão diferir de uma thread para 
    //a outra, dái se encontrarem dentro da estrutura.

  int inicio_inicial;
  int id_threads;

}estrutura_geral;

// esta primeira função representa a função das tarefas
// calculadoras está dividida em duas partes um primeira 
//que processa o bloco associado de inicio conforme a sequencia de numero de ordem
//(cada thread obriagtoriamento computa um bloco) 
// e uma segunda que permite qualquer thread agarrar um novo bloco
// esta funç]ao tem um apontador que direciona para a estrutura

void *calculadoras(void *es)                                        
{
        int i, inicio, fim, contador;                                           //defenicao de variaveis
        int calculo_parcial = 0;                                    //defenicao variavel soma parcial

        pthread_mutex_lock(&criacao_thread_mutex);                  //bloqueio do mutex para garantir a leitura da estrutura e o envio de sinal da thread criada

        estrutura_geral * const estrutura = es;                     //pointer é criado para a estrutura

        int const id_threads = estrutura->id_threads;               //leitura dos elemnetos da estrutura
        int const inicio_inicial = estrutura->inicio_inicial;

        contador=0;                                                 //incializaçao do conatdor

        if ((inicio_inicial+nbloco) < n){                           //verificaçao do tamanho do bloco a calcular
            fim = inicio_inicial + nbloco;
        }else{
            fim = n;
        }

        pthread_cond_signal(&cond);                                 //sinalizacao à main thread que a thread foi criada

        pthread_mutex_unlock(&criacao_thread_mutex);                //desbloqueio do mutex para a variavel de condicao na main thread ser usada

        sched_yield();                                              //nao é necessário estar aqui mas ao fazer garanto que outras threads tenham ainda mais oportunidade.

        //printf ("A thread %d está a calcular o bloco de %d a %d\n", id_threads, inicio_inicial+1, fim); 

        //primeira parte da computacao onde os valores para
        //soma passados orbriatoriamente por oredem de criacao da thread

                //pthread_mutex_lock (&buffer_mutex);                   //(quando usar as condiçoes ou fazer o calculo do bloco completo)estamos a entrar numa regiao onde a computacao vai ser feira precisamos de boquea o mutex para que nao exista perda de dados

                 while(items_buffer==nt){                           

                 //enquanto o buffer for igual ao numero de tarefas
                 //existem duas opçoes ou a thread manda um sinal à thread que
                 //soma os valores do buffer para limpar o buffer
                 //ou entao podemos optar por nao mandar o sinal desbloquear o mutex
                 // e simplesmente libertar a tarefa com sched_yield
                 //o sinal acaba por se mais eficaz pois nao e certo que com o sched yield 
                 //a proxima tarefa seja a somadora

                            //pthread_cond_wait(&produzir,&buffer_mutex);
                            //pthread_mutex_unlock(&buffer_mutex);              //ativar quando o mutex for bloqueado anteriormente
                            sched_yield();                                      //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
                        }

                    for (i = inicio_inicial; i < fim ; i++) {


                        calculo_parcial+= matriz[i]*matriz[i];          //calculo da soma parcial
                        contador=contador+1;                            //conatra elemento somado

                }

                pthread_mutex_lock (&buffer_mutex);                     //regiao critica assegura-se que tem exlusividade para gravar os dados

                buffer[items_buffer+1]=calculo_parcial;                 //envio dos items para o buffer
                items_buffer=items_buffer+1;                            // contador de items no buffer

                //printf("o meu buffer tem %d items a soma parcial é de %d e o buffer tem %d\n",items_buffer,calculo_parcial,buffer[items_buffer]);
                //printf ("A thread %d calculou o bloco de %d a %d\n", id_threads, inicio_inicial+1, fim); 

                pthread_mutex_unlock(&buffer_mutex);                    //desbloqueio do mutex para libertar o processador às outras threads
                //pthread_cond_signal(&consumir);                           // sinalizar a thread somador que existem items no buffer mais uma vez poderiamos usar o sched yield, que nao seria tao eficaz
                sched_yield();                                          //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO

                //a partir deste momento caso exitam blocos
                //por computar as threads vao agarrar um novo bloco e computalo
                //segue exatamente a mesma estrutura indicada em cima
                //mas agora nao existe obrigatoriedade de cada thread ter um bloco

        while (1) {

                pthread_mutex_lock(&indice_mutex);

                if (numero_para_comeco >= n) {
                        pthread_mutex_unlock(&indice_mutex);
                        break;
                }

                inicio = numero_para_comeco;

                if ((inicio + nbloco) < n)
                        numero_para_comeco = fim = inicio + nbloco;
                else 
                        numero_para_comeco = fim = n;


                pthread_mutex_unlock(&indice_mutex);

                calculo_parcial = 0;                        // inicializaçao da soma parcial de volta a 0                       

                //printf ("A thread %d está a calcular o bloco de %d a %d\n", id_threads, inicio+1, fim); 

                    //pthread_mutex_lock (&buffer_mutex);

                    while(items_buffer==nt){

                            //pthread_cond_wait(&produzir,&buffer_mutex);
                            //pthread_mutex_unlock(&buffer_mutex);
                            sched_yield();                              //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
                        }

                    for (i = inicio; i < fim ; i++) {

                        calculo_parcial+= matriz[i]*matriz[i];
                        contador=contador+1;                            //conatra elemento somado

                }

                pthread_mutex_lock (&buffer_mutex);

                buffer[items_buffer+1]=calculo_parcial;
                items_buffer=items_buffer+1;

                //printf("o meu buffer tem %d items a soma parcial é de %d e o buffer tem %d\n",items_buffer,calculo_parcial,buffer[items_buffer]);
                //printf ("A thread %d calculou o bloco de %d a %d\n", id_threads, inicio+1, fim);

                pthread_mutex_unlock (&buffer_mutex);
                //pthread_cond_signal(&consumir);
                sched_yield();                      //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO
        }

        sinal_saida=sinal_saida+1;                      //forma de sinalizar que a thread saiu para que a thread de soma e que limpa o buffer saiba que pode acabar

        printf("tarefa %d calculou %d elementos\n",id_threads,contador);
        //printf("tarefa %d de saída\n",id_threads);

        pthread_exit(NULL);

}

//aqui é apresentada a funcao que soma as somas parcias que estao no buffer e o limpa

void *somadora(void *ts) 
{

    pthread_mutex_lock(&criacao_thread_mutex);          //bloqueamos o mutex para que seja dado o sinal de que a thread foi criada

    //printf("Sou a thread somadora\n");

    pthread_cond_signal(&cond);                         //sinalizamos a main thread que a thread foi criada

    pthread_mutex_unlock(&criacao_thread_mutex);        //desbloqueio do mutex para que as threads estejam á vontade

    pthread_mutex_lock(&buffer_mutex);                  //estramos numa operaçao critica onde os dados nao se podem perder, bloqueamos o mutex

        while(items_buffer==0){

            //emquanto o buffer tiver 0 elemnetos
            //sinalizamos as threads que podem produzir
            //é feita entao uma condicao de espera ou 
            //podemos usar um sched yield

            //pthread_cond_wait(&consumir,&buffer_mutex);
            pthread_mutex_unlock(&buffer_mutex);                //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
            sched_yield();
        }

        while(sinal_saida<nt){                      //enquanto todas as thread nao se extinguirem esta condicao é valida

            while(items_buffer!=0){                 //sempre que o buffer é diferente de 0 é calculado a soma das somas parciais e o buffer é esvaziado

                soma_global+=buffer[items_buffer];  //actualizacao da soma global
                items_buffer=items_buffer-1;        //reduçao do buffer

                //printf("o meu buffer ficou com %d items\n",items_buffer);

            }

            pthread_mutex_unlock(&buffer_mutex);    //computacao realizada podemos desbloquear o mutex
            //pthread_cond_signal(&produzir);           //envio de sinal que as threads podem produzir realizar mais somas parciais
            sched_yield();                      //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO
        }

        //quando todas as thread terminaram
        //a tarefa soma terá que rodar mais uma
        //para verificar se nao sobraram elementos no buffer_mutex
        //a logica é a mesma apresentada anteriormente

        pthread_mutex_lock(&soma_final_mutex);      

        while(items_buffer!=0){

                soma_global+=buffer[items_buffer];
                items_buffer=items_buffer-1;

                //printf("o meu buffer ficou com %d items\n",items_buffer);
            }

        pthread_mutex_unlock(&soma_final_mutex);

        //printf("Sou a thread somadora estou de saida\n");

        pthread_exit(NULL);

}

//funçao princial

int main(int argc, char *argv[])
{
        int i,z;                            //defeinao de variaveis

        //recolha de elementos da linha de comandos

        nt=atoi(argv[1]);
        n=atoi(argv[2]);
        nbloco=atoi(argv[3]);

        //verificacao dos elementos inceridos pelo utilizador

        if(argc!=4){
            printf("Utilização: ./mtss nt n nbloco\n");
            exit(1);}

        if(nt<1){
            printf("O numero de processos terá que ser pelo menos 1\n");
            exit(1);}

        if(n<1||n>999){
        printf("O n tem que estar comprefimido entre 1 e 999\n");
        exit(1);}

        if(nbloco<1){
        printf("O bloco tem que ser pelo menos 1\n");
        exit(1);
        }

        printf("Soma do quadrado dos %d primeiros numeros naturais com %d tarefas e blocos de %d termos\n",n,nt,nbloco);

        //defeniçao de threads e attributos

        pthread_t threads_calculadora[nt];
        pthread_t thread_soma;
        pthread_attr_t attr;

        //alocacar espaço para a estrutura que vai ser passada às threads

        estrutura_geral * estrutura = malloc(sizeof(estrutura_geral));

        //alocarçao de espaço para a matriz com os valores de calculo e para o buffer

        matriz = malloc(sizeof(int)*n);
        buffer = malloc(sizeof(int)*nt);

        // preenchimento da matriz com os valores de n

        for(z=0;z<n;z++){

            matriz[z]=z+1;

        }

        //inicializaçao dos mutex

        pthread_mutex_init(&indice_mutex, NULL);
        pthread_mutex_init(&criacao_thread_mutex,NULL);
        pthread_mutex_init(&soma_final_mutex,NULL);
        pthread_mutex_init(&buffer_mutex,NULL);

        //inicializaçao das condicoes

        pthread_cond_init(&cond,NULL);
        //pthread_cond_init(&produzir,NULL);                    //DESTIVAR EM CASO DE USO DO SCHED YIELD
        //pthread_cond_init(&consumir,NULL);                    //DESTIVAR EM CASO DE USO DO SCHED YIELD

        // inicializacao e defenicao de atributos

        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); //este atributo já é predefenido mas nunca é demais garantir

        numero_para_comeco=nbloco*nt;       //defenicao da variavel que controla o numero para a thread começar quando está no loop while(1)
        estrutura->inicio_inicial=0;        //inicializaçao da variavel

        //criaçao da thread soma que usara a funcao somadora e reduzira o buffer

        pthread_create(&thread_soma, &attr, somadora,estrutura);
        pthread_cond_wait(&cond,&criacao_thread_mutex);             //espera o sinal que a thread está criada

        //criaçao das threads calculadoras

        for (i=0; i<nt; i++) {

                ++estrutura->id_threads;            //numero de ordem da thread

                pthread_create(&threads_calculadora[i], &attr, calculadoras,estrutura);     //cria a thread

                estrutura->inicio_inicial=i*nbloco; //define o inicio da thread

                pthread_cond_wait(&cond,&criacao_thread_mutex); //espera que seja sinalizada que a thread foi criada
        }

        //espera que todas a threads terminem

        for (i=0; i<nt; i++) {

                pthread_join(threads_calculadora[i], NULL);

        }

        pthread_join(thread_soma, NULL);

        resultado_esperado = (n*(n+1)*((2*n)+1))/6;

        printf("Soma Total= %d\n",soma_global);
        printf("Resultado esperado = %d\n",resultado_esperado);

        //Libertar memória

        pthread_attr_destroy(&attr);
        pthread_mutex_destroy(&indice_mutex);
        pthread_mutex_destroy(&criacao_thread_mutex);
        pthread_mutex_destroy(&soma_final_mutex);
        pthread_mutex_destroy(&buffer_mutex);
        //pthread_cond_destroy(&produzir);                      //DESTIVAR EM CASO DE USO DO SCHED YIELD
        //pthread_cond_destroy(&consumir);                      //DESTIVAR EM CASO DE USO DO SCHED YIELD

        return 0;
}

Browser other questions tagged

You are not signed in. Login or sign up in order to post.