Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

OBJECTIVE The purpose of this programming project is to explore process synchron

ID: 3854324 • Letter: O

Question

OBJECTIVE

The purpose of this programming project is to explore process synchronization. This will be accomplished by writing a simulation to the Producer / Consumer problem described below. Your simulation will be implemented using Pthreads. This simulation is a modification to the programming project found at the end of Chapter 6 and most of the text of this project specification is directly from that project description. Name the program osproj3.cpp or osproj3.c and the executable as osproj3.

THE PRODUCER / CONSUMER PROBLEM

In Chapter 3, we developed a model of a system consisting of cooperating sequential processes or threads, all running asynchronously and possibly sharing data. We illustrated this model with the producer - consumer problem, which is representative of operating systems. Specifically, in Section 3.4.1 we discussed how a "bounded buffer" could be used to enable processes to share memory.

In Chapter 3, we described a technique using a circular buffer that can hold BUFFER_SIZE-1 items. By using a shared memory location count, the buffer can hold all BUFFER_SIZE items. This count is initialized to 0 and is incremented every time an item is placed into the buffer and decremented every time an item is removed from the buffer. The count data item can also be implemented as a counting semaphore.

The producer can place items into the buffer only if the buffer has a free memory location to store the item. The producer cannot add items to a full buffer. The consumer can remove items from the buffer if the buffer is not empty. The consumer must wait to consume items if the buffer is empty.

The "items" stored in this buffer will be integers. Your producer process will have to insert random numbers into the buffer. The consumer process will consume a number and detect if the number is prime.

PROJECT SPECIFICATIONS

The buffer used between producer and consumer processes will consist of a fixed-size array of type buffer_item. The queue of buffer_item objects will be manipulated using a circular array. The buffer will be manipulated with two functions, buffer_insert_item() and buffer_remove_item(), which are called by the producer and consumer threads, respectively. A skeleton outlining these function can be found in buffer.h.

The buffer_insert_item() and buffer_remove_item() functions will synchronize the producer and consumer using the algorithms similar to those in in Figure 6.10 and 6.11. The buffer will also require an initialization function (not supplied in buffer.h) that initializes the mutual exclusion object "mutex" along with the "empty" and "full" semaphores.

The producer thread will alternate between sleeping for a random period of time and generating and inserting (trying to) an integer into the buffer. Random numbers will be generated using the rand_r() function. The sleep function used must be a "thread safe" sleep function. See the text on page 276 for an overview of the producer algorithm.

The consumer thread will alternate between sleeping for a random period of time (thread safe of course) and (trying to) removing a number out of the buffer. The number removed will then be verified if it is prime. See the text on page 276 for an overview of the consumer algorithm.

The main function will initialize the buffer and create the separate producer and consumer threads. Once it has created the producer and consumer threads, the main() function will sleep (thread safe) for duration of the simulation. Upon awakening, the main thread will signal other threads to quit by setting a simulation flag which is a global variable. The main thread will join with the other threads and then display the simulation statistics. The main() function will be passed five parameters on the command line:

The length of time the main thread is to sleep before terminating (simulation length in seconds)

The maximum length of time the producer and consumer threads will sleep prior to producing or consuming a buffer_item

The number of producer threads

The number of consumer threads

A "yes" or "no" to output the individual buffer snapshots for each item produced and consumed

A skeleton for the main function appears as:

Creating Pthreads using the Pthreads API is discussed in Chapter 4 and in supplemental notes provided online. Please refer to those references for specific instructions regarding creation of the producer and consumer Pthreads.

The following code sample illustrates how mutex locks available in the Pthread API can be used to protect a critical section:

Pthreads uses the pthread_mutex_t data type for mutex locks. A mutex is created with the pthread_mutex_init() function, with the first parameter being a pointer to the mutex. By passing NULL as a second parameter, we initialize the mutex to its default attributes. The mutex is acquired and released with the pthread_mutex_lock() and pthread_mutex_unlock() functions. If the mutex lock is unavailable when pthread_mutex_lock() is invoked, the calling thread is blocked until the owner invokes pthread_mutex_unlock(). All mutex functions return a value of 0 with correct operaton; if an error occurs, these functions return a nonzero error code.

Pthreads provides two types of semaphores: named and unnamed. For this project, we will use unnamed semaphores. The code below illustrates how a semaphore is created:

The sem_init() function creates and initializes a semaphore. This function is passed three parameters: A pointer to the semaphore, a flag indicating the level of sharing, and the semaphore's initial value. In this example, by passing the flag 0, we are indicating that this semaphore can only be shared by threads belonging to the same process that created the semaphore. A nonzero value would allow other processes to access the semaphore as well. In this example, we initialize the semaphore to the value 5.

In Section 6.5, we described the classical wait() and signal() semaphore operations. Pthread names the wait() and signal() operations sem_wait() and sem_post(), respectively. The code example below creates a binary semaphore mutex with an initial value 1 and illustrates it use in protecting a critical section:

PROGRAM OUTPUT

Output for this simulation is critical to verify that your simulation program is working correctly. Use this sample as to determine what your simulation should output when various conditions occur (buffer empty/full, location of next producer/consumer, etc.) Your program output format should be identical to the following:

The following is buffer.h:

#ifndef _BUFFER_H_DEFINED_
#define _BUFFER_H_DEFINED_

typedef int buffer_item;

#define BUFFER_SIZE 5

bool buffer_insert_item( buffer_item item );

bool buffer_remove_item( buffer_item *item );

#endif // _BUFFER_H_DEFINED_

Im really having trouble getting started on this. If I could get some help, I would really appreciate it. Thanks!

Explanation / Answer

main.cpp

#include <iostream>
#include <cstring>
#include <string>
#include <cstdlib>
#include "buffer.h"
#include "stats.h"
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;

//Structs
struct ARGS //struct to hold args from command line
{
   int simLength; //simulation length
   int maxThreadSleep; //maximum sleep time for thread
   int numProducers; //number of producers
   int numConsumers; //number of consumers
   bool outputBufferSnapshots; //true/false output buffer snapeshots
};

//Global args
ARGS global_args; //create an instance of arguments struct
bool runSim = true; //flag to indicate that the simluation should stop when false

//function prototype
void arguments(int, char **);
bool checkPrime(buffer_item);
void *producer(void * param);
void *consumer(void * param);

int main( int argc, char *argv[] )
{
   //Get command line arguments
   arguments(argc, argv);
  
   //Store simulation statistics
   Stats simulationStats(global_args.simLength, global_args.maxThreadSleep,
           global_args.numProducers, global_args.numConsumers,
           BUFFER_SIZE);

   //Initialize the buffer
   buffer_initialize();

   //========= Threads =========
          pthread_t tidP[global_args.numProducers]; //producer thread identifier array
          pthread_t tidC[global_args.numConsumers]; //consumer thread identifier array
          pthread_attr_t attr; //set of thread attributes
   pthread_attr_init( &attr ); //get the default attributes
  
   cout << "Starting Threads... ";
  
   //Create producer thread(s)
   for(int i=0; i < global_args.numProducers; i++)
   {
       pthread_create( &tidP[i], &attr, producer, NULL);
   }

   //Create consumer thread(s)
   for(int i=0; i < global_args.numConsumers; i++)
   {
       pthread_create( &tidC[i], &attr, consumer, NULL);
   }
  
   //Sleep
   sleep(global_args.simLength);

   //Signal Threads to quit
   //cout << "simulation stopping ";
   runSim = false;

   //Used to accept array from threads
   int *tempStats;

   //Join Threads
   //cout << "Seg fault here may be caused by no mutex/semaphore "; //debug statement here
       //cout << "join producers ";
   for(int i=0; i < global_args.numProducers; i++)
   {
       //cout << "join producer #" << i << " ";
       //Store stats for each thread
       tempStats = (int*) pthread_join( tidP[i], NULL );

       simulationStats.totalThreadNumProduced[i] = tempStats[0];
       simulationStats.numTimesBufferFull += tempStats[1];

   }
       //cout << "done join producers ";
       //cout << "join consumers ";
   for(int i=0; i < global_args.numConsumers; i++)
   {
       //Store stats for each thread
       tempStats = (int*) pthread_join( tidC[i], NULL );

       simulationStats.totalThreadNumConsumed[i] = tempStats[0];
       simulationStats.numTimesBufferEmpty += tempStats[1];
   }
       //cout << "done join consumers ";
  
   //Grab number of items remaining in buffer
   simulationStats.numItemsRemaining = bufferCount;

   //Display Statistics
   simulationStats.print();

   //Exit
   return 0;
}

/********************************************************************
//
// Arguments function
//
// This function takes in the command line arguments and
// places them into a struct.
//
// Return Value
// ------------
// void
//
// Value Parameters
// ----------------
// argc       int       Number of arguments in argv
//
// Reference Parameters
// --------------------
// argv    char        Array of char arrays. Provides command line args
//
*******************************************************************/
void arguments(int argc, char *argv[])
{
   if(argc < 6)
   {
       cerr << "5 arguments are needed. The arguments should be ";
       cerr << "in the following order: simulation length ";
       cerr << "max thread sleep number of producers number ";
       cerr << "of consumers yes or no for outputting buffer snapshots ";
       exit(-1);
   }

   global_args.simLength = atoi(argv[1]);
   global_args.maxThreadSleep = atoi(argv[2]);
   global_args.numProducers = atoi(argv[3]);
   global_args.numConsumers = atoi(argv[4]);
  
   if(strcmp(argv[5],"yes") == 0)
       global_args.outputBufferSnapshots = 1;
   else if(strcmp(argv[5],"no") == 0)
       global_args.outputBufferSnapshots = 0;
   else
   {
       cerr << "The last argument must be 'yes' or 'no' ";
       exit(-1);
   }


}

/********************************************************************
//
// Check Prime Function
//
// This function determines if a number is prime
//
// Return Value
// ------------
// bool                         True/False if numer is prime
//
// Value Parameters
// ----------------
// in        buffer_item   The value
//
// Local Variables
// ---------------
// lim       int       Limit to go to checking for a prime number
// result   int       Temporary result of mod operation
//
*******************************************************************/
bool checkPrime(buffer_item in)
{
   int lim = in/2;
   int result;

   for(int i=2; i<=lim; i++)
   {
       result = in % i;
       if(result == 0)
           return false;
   }

   return true;

}

/********************************************************************
//
// Producer function
//
// Producer function to produce numbers and put them into the buffer
//
// Return Value
// ------------
// void
//
// Reference Parameters
// --------------------
// void
// thread returns stats value
//
// Local Variables
// ---------------
// stats    int        holds stats for # of times buffer is full
//                 & number of items produced.
//
*******************************************************************/
void *producer(void * param)
{
   //Variables
   int * stats = new int[2]; //used to return stats
   int time; //holds random number for time to sleep
   stats[0] = 0; //holds # of items produced
   stats[1] = 0; //holds # of times buffer is full
   buffer_item bItem; // this is my item

   while(runSim)
   {
       //Sleep for a random amount of time.
       time = rand();
       time = time % global_args.maxThreadSleep;

       sleep(time);

       //Generate a random number
       bItem = random();
       bItem %= 9999;
       //bItem = bItem % 99;
       //cout << "Producer item: " << bItem << endl << endl;

       //Check to see if buffer is full
       if(bufferCount == BUFFER_SIZE)
       {
           cout << "All buffers full. Producer " << pthread_self();
           cout << " waits. ";
           stats[1]=(stats[1] + 1);
       }

       //Wait
       sem_wait( &bufferEmpty );

       //Lock
       pthread_mutex_lock( &mutex );

       //Produce - CRITICAL SECTION
       if(runSim && buffer_insert_item(bItem))
       {
           cout << "Producer " << pthread_self() << " writes ";
           cout << bItem << endl;

           stats[0]= (stats[0] + 1); //add # of items produced

           if(global_args.outputBufferSnapshots) //if snapshots enabled
               buffer_print(); //print snapshots
       }

       //unlock
       pthread_mutex_unlock( &mutex );

       //signal
       sem_post( &bufferEmpty );

   }

   pthread_exit( (void*)stats ); //return stats array
}  

/********************************************************************
//
// Consumer function
//
// Consumer function to consume numbers and remove them from the buffer
//
// Return Value
// ------------
// void
//
// Reference Parameters
// --------------------
// void
// thread returns stats value
//
// Local Variables
// ---------------
// stats    int        holds stats for # of times buffer is empty
//                 & number of items consumed.
//
*******************************************************************/
void *consumer(void * param)
{
   //Variables
   buffer_item bItem; //bItem to store received item
   int time; //int to hold random time to sleep
   int * stats = new int[2]; //array explain on next two lines
   stats[0] = 0; //holds # of items consumed
   stats[1] =0; //holds # of times buffer was empty

   while(runSim)
   {
       //Sleep for a random amount of time
       time = rand();
       time = time % global_args.maxThreadSleep;
       sleep(time);

       //Check if buffers are empty
       if(bufferCount == 0)
       {
           cout << "All buffers empty. Consumer ";
           cout << pthread_self();
           cout << " waits. ";

           stats[1]= (stats[1] + 1); //add one time buffer was empty
       }

       //Check semaphore
       sem_wait( &bufferFull );

       //Lock
       pthread_mutex_lock( &mutex );

       //Consume
       if(runSim && buffer_remove_item( &bItem)) //consumes in if statement
       {
           //line 1
           cout << "Consumer " << pthread_self() << " reads ";
           cout << bItem;

           stats[0] = (stats[0]+1); //increase # of items consumed

           if(checkPrime(bItem)) //find if # was prime
               cout << "   * * * PRIME * * *";
           cout << endl;

           //if snapshots are enabled
           if(global_args.outputBufferSnapshots)
               buffer_print();
       }

       //Unlock
       pthread_mutex_unlock( &mutex );

       //Signal
       sem_post( &bufferFull );

   }

   pthread_exit( (void*)stats ); //return stats for this thread

}


buffer.h


/********************************************************************
//
// Christopher Carlisle
// Operating Systems
// Programming Project #3: Process Synchronization Using Pthreads:
//                The Producer / Consumer Problem With
//                Prime Number Detector
// November 23, 2009
// Instructor: Dr. Ajay K. Katangur
//
********************************************************************/
#ifndef _BUFFER_H_DEFINED_
#define _BUFFER_H_DEFINED_

#include <iostream>
#include <iomanip>
#include <pthread.h>
#include <semaphore.h>
using namespace std;

typedef int buffer_item;

#define BUFFER_SIZE 5
buffer_item global_buffer[BUFFER_SIZE];
unsigned int bufferCount=0; //keeps track of number of items in buffer
unsigned int buffer_In_index=0; //index position for putting stuff in buffer
unsigned int buffer_Out_index=0; //index position for pulling stuff out of buffer

//function prototypes
bool buffer_insert_item( buffer_item item );
bool buffer_remove_item( buffer_item *item );
void buffer_print();
void buffer_initialize();

//Semphores and mutexs
sem_t bufferEmpty;
sem_t bufferFull;
pthread_mutex_t mutex;

/********************************************************************
//
// Buffer Initialize
//
// This function initializes the semaphores, mutexs and buffer
// positions to -1
//
// Return Value
// ------------
// void
//
*******************************************************************/
void buffer_initialize()
{
   sem_init(&bufferEmpty, 0, BUFFER_SIZE );
   sem_init( &bufferFull, 0, 0 );
   pthread_mutex_init( &mutex, NULL );
   for(int i=0 ; i < BUFFER_SIZE; i++)
   {
       global_buffer[i] = -1; //sets all buffer positions to -1
   }
}

/********************************************************************
//
// Buffer Insert Item Function
//
// This function inserts an item into the buffer moves
// the buffer index and adds one to the count of the buffer
//
// Return Value
// ------------
// bool                         True that function succeeded
//
// Value Parameters
// ----------------
// item       buffer_item   Value to place into buffer
//
*******************************************************************/
bool buffer_insert_item( buffer_item item )
{
   cout << "Item: " << (int) item << endl; //debug print statement

   //put item into buffer
   global_buffer[buffer_In_index] = item;

   //move buffer in index plus one in circular fashion
   buffer_In_index = (buffer_In_index + 1) % BUFFER_SIZE;
   //cout << bufferCount << endl; //debug output buffercount

   //increase buffer count since item was inserted
   bufferCount++;

   return true;
}

/********************************************************************
//
// Buffer Remove Item Function
//
// This function removes an item from the buffer, moves
// the buffer index, and subtracts one from the count of the buffer
//
// Return Value
// ------------
// bool                         True that function succeeded
//
// Reference Parameters
// ----------------
// item       buffer_item   Value to remove from buffer
//
*******************************************************************/
bool buffer_remove_item( buffer_item *item )
{
   //Grab item from buffer
   *item = global_buffer[buffer_Out_index];

   //Move out to next index position in circular fashion
   buffer_Out_index = ( buffer_Out_index + 1) % BUFFER_SIZE;

   //Decrease number of items in buffer
   bufferCount--;

   return true;
}

/********************************************************************
//
// Linear Search Function
//
// This function prints out the number of buffers occupied
// The buffers contents. And it also prints the position of
// the in and out indexes.
//
// Return Value
// ------------
// void
//
*******************************************************************/
void buffer_print()
{
   //line 1
   cout << "(buffers occupied: " << (uint) bufferCount << ") ";

   //line 2
   cout << "buffers: ";

     
   for(int i=0; i < BUFFER_SIZE; i++)
       cout << " " << (uint) global_buffer[i] << " "; //print #'s in buffer
   cout << endl; //start a new line

   // Third line of output. puts dashes under numbers
   cout << "        ";
   for(int i=0; i < BUFFER_SIZE; i++)
       cout << " ----";
   cout << endl;

   // Fourth line of output. Shows position of in & out indexes
   cout << "         ";
   for(int i=0; i < BUFFER_SIZE; i++)
   {
       cout << " ";

       if(buffer_In_index == i || buffer_Out_index == i)
       {
           if( bufferCount == BUFFER_SIZE )
               cout << "WR";
           else if( bufferCount == 0 )
               cout << "RW";
           else if( buffer_In_index == i )
               cout << "W ";
           else
               cout << "R ";

           cout << " ";
       }
       else
           cout << "    ";
   }
   cout << endl << endl;


}

#endif // _BUFFER_H_DEFINED_


stats.h

#ifndef STATS_H
#define STATS_H

#include <iostream>
using namespace std;

class Stats
{
   private:
       //vars
       int simulationTime; //holds simulation time
       int maxThreadSleepTime; //holds max sleep time for threads
       int numProducerThreads; //holds # of producer threads
       int numConsumerThreads; //holds # of consumer threads
       int sizeOfBuffer; //holds size of the buffer
       int totalNumConsumed; //gets # of items consumed
       int totalNumProduced; //gets # of items produced

   public:
       //vars
       int *totalThreadNumProduced; //holds # of items produced in array for each thread
       int *totalThreadNumConsumed; //holds # of items consumed in array for each thread
       int numTimesBufferFull; //total # of times the buffer was full
       int numTimesBufferEmpty; //total # of times the buffer was empty
       int numItemsRemaining; //total # of items that were left in the buffer

       //functions
       void print(); //does the output at the end
       Stats(int, int, int, int, int); //Constructor to set all the stuff we get from cli
       ~Stats(); //destructor that gets rid of dynamic elements
};

/********************************************************************
//
// Stats Constructor Function
//
// This function initializes all of the values passed in by the
// command line arguments
//
// Return Value
// ------------
// none
//
// Value Parameters
// ----------------
// _simTime        int    simulation time
// _maxSleep        int    maximum sleep time
// _numProducerThreads    int    number of producer threads
// _numConsumerThreads    int    number of consumer threads
// _bufferSize        int    size of buffer
//
*******************************************************************/
Stats::Stats(int _simTime, int _maxSleep, int _numProducerThreads, int _numConsumerThreads, int _bufferSize)
{
   //Vars
   simulationTime = _simTime;
   maxThreadSleepTime = _maxSleep;
   numProducerThreads = _numProducerThreads;
   numConsumerThreads = _numConsumerThreads;
   sizeOfBuffer = _bufferSize;

   //Allocate memory
   totalThreadNumProduced = new int[_numProducerThreads];
   totalThreadNumConsumed = new int[_numConsumerThreads];

}

/********************************************************************
//
// ~Stats Function
//
// This function deletes pointers
//
// Return Value
// ------------
// none
//
*******************************************************************/
Stats::~Stats()
{
   delete [] totalThreadNumProduced;
   delete [] totalThreadNumConsumed;
}

/********************************************************************
//
// Print Function
//
// This function prints out a summary of stats at the end of
// the program
//
// Return Value
// ------------
// void
//
*******************************************************************/
void Stats::print()
{

   cout << "PRODUCER / CONSUMER SIMULATION COMPLETE ";
   cout << "======================================= ";
   cout << "Simulation Time:" << setw (24) << simulationTime << endl;
   cout << "Maximum Thread Sleep Time:" << setw(16) << maxThreadSleepTime << endl;
   cout << "Number of Producer Threads:" << setw(16) << numProducerThreads << endl;
   cout << "Number of Consumer Threads:" << setw(16) << numConsumerThreads << endl;
   cout << "Size of Buffer:" << setw(24) << sizeOfBuffer << " ";
  
   //Producer threads
   int totalProd = 0;

   //-----sum up the amount of items produced by all threads
   for(int i=0; i < numProducerThreads; i++)
       totalProd += totalThreadNumProduced[i];

   //-----display total found above
   cout << "Total Number of Items Produced:" << setw(11) << totalProd << endl;

   //-----for each producer thread show # of items produced
   for(int i=0; i < numProducerThreads; i++)
   {
       cout << "   Thread " << setw(2);
       cout << (i+1) << ":" << setw(30);
       cout << totalThreadNumProduced[i] << endl;
   }

   cout << endl;

   //Consumer threads
   int totalCons = 0;

   //------sum up the amount of items consumed by all threads
   for(int i=0; i < numConsumerThreads; i++)
       totalCons += totalThreadNumConsumed[i];

   //------display total # of items consumed
   cout << "Total Number of Items Consumed:" << setw(11) << totalCons << endl;

   //-----for each consumer thread show # of items produced
   for(int i=0; i < numConsumerThreads; i++)
   {
              cout << "   Thread " << setw(2);
       cout << (i+numProducerThreads+1) << ":" << setw(30);
              cout << totalThreadNumConsumed[i] << endl;
   }

   cout << endl;

   cout << "Number Of Items Remaining in Buffer:" << setw(6) << numItemsRemaining << endl;
   cout << "Number Of Times Buffer Was Full:" << setw(10) << numTimesBufferFull << endl;
   cout << "Number Of Times Buffer Was Empty:" << setw(9) << numTimesBufferEmpty << endl;

}
/*
=======================================
Simulation Time:           30
Maximum Thread Sleep Time:       3
Number of Producer Threads:       2
Number of Consumer Threads:       2
Size of Buffer:               5

Total Number of Items Produced:       50000
Thread 1:               30000
Thread 2:               20000

Total Number of Items Consumed:       60000  
Thread 3:               40000
Thread 4:               20000

Number Of Items Remaining in Buffer:    0
Number Of Times Buffer Was Full:   3
Number Of Times Buffer Was Empty:   4
      
*/
#endif