Purpose The goal of this assignment is to write a program th
Purpose
The goal of this assignment is to write a program that uses threads and synchronization. As a result, you will get experience in writing a program that actually runs in parallel on Linux using threads (pthreads) and synchronization.
Shared Memory Producer/Consumer Program
You will write a program with four threads, structured like:
Reader->Munch1->Munch2->Writer
The Reader thread will read an input file, one line at a time. Reader will take the each line of the input and pass it to thread Munch1 though a queue of messages.
Munch1 will scan the line and replace each blank character with an asterisk (\"*\") character. It will then pass the line to thread Munch2 through another queue of messages.
Munch2 will scan the line and convert all lower case letters to upper case (e.g., convert \"a\" to \"A\"). It will then pass the line to thread Writer though yet another queue of messages.
Writer will write the line to an output file.
Synchronization and Communication
The threads will communicate through shared memory using thread synchronization.
Program Details
1. Your program will create four threads. Note that you will not be using fork and exec.
2. Thread Reader will read in each input line. If the line is longer than 63 characters, it will truncate it to 63 characters (plus the null byte at the end). Just throw away (flush to end of line) any extra characters.
3. You will read from stdin and write to stdout.
4. See the manual page entry for the function \"index\" to making writing Munch1 easier.
5. See the manual page entries for \"islower\" and \"toupper\" to making writing Munch2 easier. (Read these functions names as \"is lower\" and \"to upper\".
6. Thread Writer will count the number of lines and print this number to stdout.
7. You should develop a module that implements a queue of character string buffers.
o The queue itself will be an array of pointers to strings, with integers that indicate the head and tail of the list.
o The maximum size of the buffer array will be defined by a constant (using #define or constant int definition. This maximum should be set to 10.
o Buffers will be allocated (using malloc) by the Reader thread and deallocated (using free) by the Writer thread.
8. Threads should terminate when there is no more input (end of file).
9. Do not use global variables to signal completion of the input for the various threads. This is a bad idea.
10. Make sure to isolate the queue abstraction in a separate module. You should have functions for allocation, enqueuing and dequeuing.
11. Each queue needs its own synchronization, which can be realized by semaphores. Please study the handout carefully.
Deliverables
To turn in your programs, copy all .C and .h files, the Makefile and a README.
Solution
ThreadSynchronizationService class
------------------------------------------------------
package producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadSynchronizationService {
public static void main(String[] args) {
BlockingQueue<String> queue1 = new ArrayBlockingQueue<String>(10);
BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
BlockingQueue<String> queue3 = new ArrayBlockingQueue<String>(10);
Reader task1 = new Reader(null, queue1);
Munich1 task2 = new Munich1(queue1, queue2);
Munich2 task3 = new Munich2(queue2, queue3);
Writer task4 = new Writer(queue3, null);
Thread reader = new Thread(task1);
reader.setName(\"Reader\");
Thread munch1 = new Thread(task2);
munch1.setName(\"Munch1\");
Thread munch2 = new Thread(task3);
munch2.setName(\"Munch2\");
Thread writer = new Thread(task4);
writer.setName(\"Writer\");
reader.start();
try {
reader.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
munch1.start();
try {
munch1.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
munch2.start();
try {
munch2.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
writer.start();
}
}
Reader class
--------------------------
package producerconsumer;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
public class Reader extends Thread {
private BlockingQueue<String> srcQueue = null;
private BlockingQueue<String> destQueue = null;
public Reader(BlockingQueue<String> srcQueue,
BlockingQueue<String> destQueue) {
this.srcQueue = srcQueue;
this.destQueue = destQueue;
}
public void run() {
System.out.println(Thread.currentThread().getName()
+ \" is going to read string...\");
String input = readInput();
System.out.println(Thread.currentThread().getName() + \" read string : \"
+ input);
this.destQueue.add(input);
}
public String readInput() {
Scanner scanner = null;
try {
System.out.println(\"Please enter input here : \ \");
scanner = new Scanner(System.in);
String input = scanner.nextLine();
if (input.length() > 63) {
return input.substring(0, 62);
} else {
return input;
}
} finally {
scanner.close();
}
}
}
Munich1 class
------------------------
package producerconsumer;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
public class Munich1 extends Thread {
private BlockingQueue<String> srcQueue = null;
private BlockingQueue<String> destQueue = null;
public Munich1(BlockingQueue<String> srcQueue,
BlockingQueue<String> destQueue) {
this.srcQueue = srcQueue;
this.destQueue = destQueue;
}
public void run() {
System.out
.println(Thread.currentThread().getName()
+ \" is going to replace Blank character with astrisk symbol ...\");
String astriskString = this.srcQueue.poll().replace(\" \", \"*\");
System.out.println(Thread.currentThread().getName()
+ \" string with astrisk symbol ...\" + astriskString);
this.destQueue.add(astriskString);
}
public String readInput() {
Scanner scanner = null;
try {
System.out.println(\"Please enter input here : \ \");
scanner = new Scanner(System.in);
String input = scanner.nextLine();
if (input.length() > 63) {
return input.substring(0, 62);
} else {
return input;
}
} finally {
scanner.close();
}
}
}
Munich2 class
---------------------
package producerconsumer;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
public class Munich2 extends Thread {
private BlockingQueue<String> srcQueue = null;
private BlockingQueue<String> destQueue = null;
public Munich2(BlockingQueue<String> srcQueue,
BlockingQueue<String> destQueue) {
this.srcQueue = srcQueue;
this.destQueue = destQueue;
}
public void run() {
System.out.println(Thread.currentThread().getName()
+ \" is going to convert string in uppercase ...\");
String upperCaseString = this.srcQueue.poll().toUpperCase();
System.out.println(\" Upper case string : \" + upperCaseString);
this.destQueue.add(upperCaseString);
}
public String readInput() {
Scanner scanner = null;
try {
System.out.println(\"Please enter input here : \ \");
scanner = new Scanner(System.in);
String input = scanner.nextLine();
if (input.length() > 63) {
return input.substring(0, 62);
} else {
return input;
}
} finally {
scanner.close();
}
}
}
Writer class
-----------------------
package producerconsumer;
import java.util.concurrent.BlockingQueue;
public class Writer extends Thread {
private BlockingQueue<String> srcQueue = null;
private BlockingQueue<String> destQueue = null;
public Writer(BlockingQueue<String> srcQueue,
BlockingQueue<String> destQueue) {
this.srcQueue = srcQueue;
this.destQueue = destQueue;
}
public void run() {
System.out.println(Thread.currentThread().getName()
+ \" going to print output...\");
System.out.println(this.srcQueue.poll());
}
}




