This program is supposed to creat a producer and two consum
/*
This program is supposed to creat a producer and two consumer thread.
When the producer has put all the items in the buffer, it should wait
When the consumer is empty, it should wait from the procer.
This pramas requires Conditional variables to control the producer and consumer.
The error with the program is that when using the \"top\" command in another window
while the program is running, the a.out is using 100% of cpu usage.
That is what we need to fix by using conditional variables.
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
// number of items in shared buffer
#define NITEMS 10
// shared variables
char shared_buffer[NITEMS];// echo buffer
int shared_count;// item count
//******************************************
//Conditional Variables for control producer and consumer
//******************************************
pthread_cond_t c1 = PTHREAD_COND_INITIALIZER;
pthread_cond_t c2 = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex; // pthread mutex
unsigned int prod_index = 0; // producer index into shared buffer
unsigned int cons_index = 0; // consumer index into shard buffer
//******************************************
// function prototypes
//******************************************
void * producer(void *arg);
void * consumer(void *arg);
//********************************************
//MAIN FUNCTION
//********************************************
int main(){
pthread_t prod_tid, cons_tid1, cons_tid2;
// initialize pthread variables
pthread_mutex_init(&mutex, NULL);
//Conditional Variables Initialization
pthread_cond_init(&c1, NULL);
pthread_cond_init(&c2, NULL);
// start producer thread
pthread_create(&prod_tid, NULL, producer, NULL);
// start consumer threads
pthread_create(&cons_tid1, NULL, consumer, NULL);
pthread_create(&cons_tid2, NULL, consumer, NULL);
// wait for threads to finish
pthread_join(prod_tid, NULL);
pthread_join(cons_tid1, NULL);
pthread_join(cons_tid2, NULL);
// clean up
pthread_mutex_destroy(&mutex);
return 0;
}
//******************************************
// This function executes the producer thread
//******************************************
void * producer(void *arg){
char key;
while (1){
// read input key
scanf(\"%c\", &key);
while (1){
printf(\"INSIDE PRODUCER SECOND WHILE\ \");
// acquire mutex lock
pthread_mutex_lock(&mutex);
//*******************************************************
// if buffer is full, release mutex lock and check again
//*******************************************************
if (shared_count == NITEMS){
printf(\"shared_consumer == NITEMS ON PRODUCER\ \");
//IF THIS IS TRUE, I NEED TO WAIT UNTIL THE CONSONSUMER TAKES AN ITEM OFF.
//THIS MEANS THAT PRODUCER NEEDS TO SLEE OR WAIT.
pthread_cond_wait(&c1, &mutex);
pthread_mutex_lock(&mutex);
}
else{
break;
}
}
// store key in shared buffer
shared_buffer[prod_index] = key;
// update shared count variable
shared_count++;
// update producer index
if (prod_index == NITEMS - 1){
prod_index = 0;
}
else
prod_index++;
// release mutex lock
pthread_mutex_unlock(&mutex);
}
return NULL;
}
//******************************************
// consumer thread executes this function
//******************************************
void * consumer(void *arg){
char key;
int id = (int)pthread_self();
while (1){
while (1){
// acquire mutex lock
pthread_mutex_lock(&mutex);
//*******************************************************
// if buffer is empty, release mutex lock and check again
//*******************************************************
if (shared_count == 0){
printf(\"shared_consumer == 0 ON CONSUMER\ \");
//IF THE BUFFER IS EMPTY, I NEED TO WAIT UNTILL PRODUCER PUTS SOMETHING IN.
//THIS MEANS I NEED TO SLEEP OR WAIT UNTILL SOMETHING IS PUT INTO THE BUFFER.
//AND I ALSO NEED TO SIGNAL THE PRODUCER TO PRODUCE SOMETHING
pthread_cond_signal(&c2);
}
else{
break; //REMOVED THIS
}
}
// read key from shared buffer
key = shared_buffer[cons_index];
// echo key
printf(\"consumer %d %c\ \", id, key);
// update shared count variable
shared_count--;
// update consumer index
if (cons_index == NITEMS - 1){
cons_index = 0;
}
else
cons_index++;
// release mutex lock
pthread_mutex_unlock(&mutex);
}
return NULL;
}
Solution
#define _REEENTRANT
#include <stdio.h>
#include <thread.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#define BUFSIZE 512
#define BUFCNT 4
/* this is the data structure that is used between the producer
and consumer threads */
struct {
char buffer[BUFCNT][BUFSIZE];
int byteinbuf[BUFCNT];
mutex_t buflock;
mutex_t donelock;
cond_t adddata;
cond_t remdata;
int nextadd, nextrem, occ, done;
} Buf;
/* function prototype */
void *consumer(void *);
main(int argc, char **argv)
{
int ifd, ofd;
thread_t cons_thr;
/* check the command line arguments */
if (argc != 3)
printf(\"Usage: %s <infile> <outfile>\ \", argv[0]), exit(0);
/* open the input file for the producer to use */
if ((ifd = open(argv[1], O_RDONLY)) == -1)
{
fprintf(stderr, \"Can\'t open file %s\ \", argv[1]);
exit(1);
}
/* open the output file for the consumer to use */
if ((ofd = open(argv[2], O_WRONLY|O_CREAT, 0666)) == -1)
{
fprintf(stderr, \"Can\'t open file %s\ \", argv[2]);
exit(1);
}
/* zero the counters */
Buf.nextadd = Buf.nextrem = Buf.occ = Buf.done = 0;
/* set the thread concurrency to 2 so the producer and consumer can
run concurrently */
thr_setconcurrency(2);
/* create the consumer thread */
thr_create(NULL, 0, consumer, (void *)ofd, NULL, &cons_thr);
/* the producer ! */
while (1) {
/* lock the mutex */
mutex_lock(&Buf.buflock);
/* check to see if any buffers are empty */
/* If not then wait for that condition to become true */
while (Buf.occ == BUFCNT)
cond_wait(&Buf.remdata, &Buf.buflock);
/* read from the file and put data into a buffer */
Buf.byteinbuf[Buf.nextadd] = read(ifd,Buf.buffer[Buf.nextadd],BUFSIZE);
/* check to see if done reading */
if (Buf.byteinbuf[Buf.nextadd] == 0) {
/* lock the done lock */
mutex_lock(&Buf.donelock);
/* set the done flag and release the mutex lock */
Buf.done = 1;
mutex_unlock(&Buf.donelock);
/* signal the consumer to start consuming */
cond_signal(&Buf.adddata);
/* release the buffer mutex */
mutex_unlock(&Buf.buflock);
/* leave the while looop */
break;
}
/* set the next buffer to fill */
Buf.nextadd = ++Buf.nextadd % BUFCNT;
/* increment the number of buffers that are filled */
Buf.occ++;
/* signal the consumer to start consuming */
cond_signal(&Buf.adddata);
/* release the mutex */
mutex_unlock(&Buf.buflock);
}
close(ifd);
/* wait for the consumer to finish */
thr_join(cons_thr, 0, NULL);
/* exit the program */
return(0);
}
/* The consumer thread */
void *consumer(void *arg)
{
int fd = (int) arg;
/* check to see if any buffers are filled or if the done flag is set */
while (1) {
/* lock the mutex */
mutex_lock(&Buf.buflock);
if (!Buf.occ && Buf.done) {
mutex_unlock(&Buf.buflock);
break;
}
/* check to see if any buffers are filled */
/* if not then wait for the condition to become true */
while (Buf.occ == 0 && !Buf.done)
cond_wait(&Buf.adddata, &Buf.buflock);
/* write the data from the buffer to the file */
write(fd, Buf.buffer[Buf.nextrem], Buf.byteinbuf[Buf.nextrem]);
/* set the next buffer to write from */
Buf.nextrem = ++Buf.nextrem % BUFCNT;
/* decrement the number of buffers that are full */
Buf.occ--;
/* signal the producer that a buffer is empty */
cond_signal(&Buf.remdata);
/* release the mutex */
mutex_unlock(&Buf.buflock);
}
/* exit the thread */
thr_exit((void *)0);
}




