/* * counter.c * Adam Carpenter - 2017 - acarpent - acarpenter@email.wm.edu * Utilizes code from P. Kearns * Utilizes code from linuxthreads demonstration "prodcons.c" */ // **** Library inclusions **** #include #include #include #include #include #include #include // **** Definitions for counter **** #define FALSE 0 #define LEN_LINE 2048 #define LEN_FILENAME 4096 #define MAX_THREADS 26 #define TRUE 1 #define USAGE "counter: usage: counter -b num_lines -t max_counters -d file_delay -D thread_delay [file]...\n" // **** Structure declarations **** struct prodConBuff { /* * struct prodConBuff */ pthread_mutex_t bufferLock;// Mutex ensures exclusive access to buffer. int readPos; // Position for reading. int writePos; // Position for writing. pthread_cond_t notEmpty; // Signaled when buffer isn't empty. pthread_cond_t notFull; // Signaled when buffer isn't full. // char *lines[]; char **lines; // Array of strings serves as data container. }; struct NODE { /* * struct NODE */ char *word; // char *word or word[LEN_LINE] long count; struct NODE *next; }; struct linkedList { /* * struct linkedList */ pthread_mutex_t listLock; // Mutex ensures exclusive access to linked list. struct NODE *head; // Pointer to first node in list. }; // **** Global variables **** static char nextThread; static int fileDelay; static int maxCounters; static int moreStuff; static int numLines; static int threadDelay; static pthread_t threadPool[MAX_THREADS]; // Bad Threadpool. struct prodConBuff buffer; struct linkedList evenList; struct linkedList oddList; // **** Structure initializations **** static void initBuffer(struct prodConBuff *buffer) { /* * initBuffer() initializes the buffer structure to an empty array ready for * producer-consumer actions. * The buffer of lines is initialized to NULL. As the program runs, if * slots are empty they will be malloc'd to fit given lines being inserted * into the buffer, and freed as they are taken out. */ int i; pthread_mutex_init(&buffer->bufferLock, NULL); pthread_cond_init(&buffer->notEmpty, NULL); pthread_cond_init(&buffer->notFull, NULL); buffer->readPos = 0; buffer->writePos = 0; buffer->lines = (char **) calloc(numLines, LEN_LINE); for (i = 0; i < numLines; i++) { buffer->lines[i] = (char *) calloc(LEN_LINE, sizeof(char)); } if (numLines == 1) { buffer->lines[0][0] = '\0'; } } static void initLinkedList(struct linkedList *list) { /* * initLinkedList() initializes the linked list structure (its lock and its * head node.) */ pthread_mutex_init(&list->listLock, NULL); list->head = NULL; } // **** Modifier and thread functions **** static void sleeper(const struct timespec *req, struct timespec *rem) { // Researched methods of maintaining nanosleep times online and // found an interesting method using recursion that was similar to this. struct timespec newRem; if (nanosleep(req, rem) < 0) { sleeper(rem, &newRem); } } static void quitThread() { /* * quitThread() */ #ifdef atcdebug fprintf(stderr, "counter: thread quit because no more stuff.\n"); #endif nextThread--; pthread_exit(NULL); } static int putInBuffer(struct prodConBuff *buffer, char *line) { /* * putInBuffer() */ int newThread; newThread = FALSE; pthread_mutex_lock(&buffer->bufferLock); // If buffer is full, spawn a new counter thread. if ((buffer->writePos + 1) % numLines == buffer->readPos) { newThread = TRUE; } // Wait until buffer isn't full anymore. while ((buffer->writePos + 1) % numLines == buffer->readPos) { pthread_cond_wait(&buffer->notFull, &buffer->bufferLock); #ifdef atcdebug fprintf(stderr, "reader: waiting for space...\n"); #endif } #ifdef atcdebug fprintf(stderr, "reader: gonna insert in slot: %d\n", buffer->writePos); #endif // Copy the text into the next slot in the buffer. strncpy(buffer->lines[buffer->writePos], line, LEN_LINE); buffer->writePos++; if (buffer->writePos >= numLines) { buffer->writePos = 0; } #ifdef atcdebug fprintf(stderr, "reader: buffer is now: { "); int i; for (i = 0; i < numLines; i++) { if (i == buffer->writePos) { fprintf(stderr, "*"); } fprintf(stderr, "%s ", buffer->lines[i]); } fprintf(stderr, "}\n\n"); #endif // Signal buffer isn't empty anymore and unlock mutex. pthread_cond_signal(&buffer->notEmpty); pthread_mutex_unlock(&buffer->bufferLock); return newThread; } static int putInSingleBuffer(struct prodConBuff *buffer, char *line) { /* * putInSingleBuffer(0) */ int newThread; pthread_mutex_lock(&buffer->bufferLock); newThread = FALSE; // If slot is taken, spawn a new counter thread if (buffer->lines[0][0] != '\0') { newThread = TRUE; } // Wait until slot isn't full anymore. while (buffer->lines[0][0] != '\0') { pthread_cond_wait(&buffer->notFull, &buffer->bufferLock); } // Insert line into buffer. strncpy(buffer->lines[buffer->writePos], line, LEN_LINE); // Signal buffer slot isn't empty anymore and unlock mutex. pthread_cond_signal(&buffer->notEmpty); pthread_mutex_unlock(&buffer->bufferLock); return newThread; } static void getFromBuffer(struct prodConBuff *buffer, char *line) { /* * getFromBuffer() */ pthread_mutex_lock(&buffer->bufferLock); // If buffer is 'empty' and nothing more will be written, quit. if (!moreStuff && buffer->readPos == buffer->writePos) { quitThread(); } pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // Wait until buffer isn't empty anymore. while (buffer->readPos == buffer->writePos) { #ifdef atcdebug fprintf(stderr, "counter: waiting for stuff...\n"); #endif pthread_cond_wait(&buffer->notEmpty, &buffer->bufferLock); } pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); #ifdef atcdebug fprintf(stderr, "counter: getting from slot: %d\n", buffer->readPos); #endif // Get line from buffer and advance read position. strncpy(line, buffer->lines[buffer->readPos], LEN_LINE); buffer->readPos++; if (buffer->readPos >= numLines) buffer->readPos = 0; #ifdef atcdebug fprintf(stderr, "counter: buffer is now: { "); int i; for (i = 0; i < numLines; i++) { if (i == buffer->readPos) { fprintf(stderr, "*"); } fprintf(stderr, "%s ", buffer->lines[i]); } fprintf(stderr, "}\n\n"); #endif // Signal buffer isn't full anymore and unlock mutex. pthread_cond_signal(&buffer->notFull); pthread_mutex_unlock(&buffer->bufferLock); } static void getFromSingleBuffer(struct prodConBuff *buffer, char *line) { /* * getFromSingleBuffer(); */ pthread_mutex_lock(&buffer->bufferLock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // Wait until buffer isn't empty anymore. while (buffer->lines[0][0] == '\0') { // If buffer slot is empty and nothing more will be written to it, quit. if ((buffer->lines[0][0] == 0) && !moreStuff) { quitThread(); } pthread_cond_wait(&buffer->notEmpty, &buffer->bufferLock); } pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // Get line from buffer slot. strncpy(line, buffer->lines[0], LEN_LINE); buffer->lines[0][0] = '\0'; // Signal buffer slot isn't full anmore and unlock mutex. pthread_cond_signal(&buffer->notFull); pthread_mutex_unlock(&buffer->bufferLock); } static void putInList(struct linkedList *list, char *word) { /* * putInList() */ struct NODE *current; struct NODE *new; pthread_mutex_lock(&list->listLock); #ifdef atcdebug fprintf(stderr, "counter: gonna insert into list\n"); #endif if (list->head == NULL) { new = (struct NODE *) malloc(sizeof(struct NODE)); new->word = (char *) malloc(strlen(word)); strcpy(new->word, word); new->count = 1; list->head = new; } else { for (current = list->head; current != NULL; current = current->next) { if (strcmp(current->word, word) == 0) { // Word already in list; update count. current->count++; break; } else if (current->next != NULL && strcmp(word, current->next->word) < 0) { // Insert in front of next-greatest lexiographical word. new = (struct NODE *) malloc(sizeof(struct NODE)); new->word = (char *) malloc(strlen(word)); strcpy(new->word, word); new->count = 1; new->next = current->next; current->next = new; break; } else if (current->next == NULL) { // Reached the end; Insert at back of list. new = (struct NODE *) malloc(sizeof(struct NODE)); new->word = (char *) malloc(strlen(word)); strcpy(new->word, word); new->count = 1; new->next = NULL; current->next = new; break; } } } #ifdef atcdebug fprintf(stderr, "counter: list is now: {"); current = list->head; while (current != NULL) { fprintf(stderr, "%s->", current->word); current = current->next; } fprintf(stderr, "}\n\n"); #endif pthread_mutex_unlock(&list->listLock); } static void printList(struct linkedList *list) { /* * printList() */ struct NODE *current; // For debugging purposes and future implementations, lock the list. pthread_mutex_lock(&list->listLock); // Print the word of every node in the list. for (current = list->head; current != NULL; current = current->next) { printf("Count: %ld Word: %s\n", current->count, current->word); } pthread_mutex_unlock(&list->listLock); } static void freeList(struct linkedList *list) { /* * freeList() */ struct NODE *current; struct NODE *temp; // For debugging purposes and future implementations, lock the list. pthread_mutex_lock(&list->listLock); // Free every node in the list. while (current != NULL) { temp = current; current = current->next; free(temp); } pthread_mutex_unlock(&list->listLock); } static void * counterThread(void * arg) { /* * counterThread() */ char name; char line[LEN_LINE]; char *token; struct timespec requestedTime; struct timespec remainingTime; name = (char) arg; requestedTime.tv_sec = threadDelay / 1000; requestedTime.tv_nsec = threadDelay % 1000 * 1000000; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); #ifdef atcdebug fprintf(stderr, "counter: name == %c, numLines == %d, threadDelay == %d\n", name, numLines, threadDelay); #endif while (TRUE) { // Sleep for allotted time. sleeper(&requestedTime, &remainingTime); if (numLines == 1) { getFromSingleBuffer(&buffer, line); } else { getFromBuffer(&buffer, line); } // Get the first word in the line. token = strtok(line, " "); // With each word, see if there are an even or odd number of characters // and insert the word into the designated list. Then print the name // of the thread on stdout. while (token != NULL) { if (strlen(token) % 2 == 0) { putInList(&evenList, token); #ifndef atcdebug printf("%c ", name); #endif } else { putInList(&oddList, token); #ifndef atcdebug printf("%c ", name); #endif } token = strtok(NULL, " "); } } return NULL; } static void createThread() { /* * createThread() */ // Would a new thread put us over the maximum allowed? // If not, create another counter thread as requested. if (((nextThread - 'a') < maxCounters)) { #ifdef atcdebug fprintf(stderr, "reader: creating thread, name == %c\n", nextThread); #endif pthread_create(&threadPool[nextThread - 'a'], NULL, counterThread, (void *) nextThread); nextThread++; } } // **** Main **** int main(int argc, char *argv[]) { /* * main() acts as the reader thread, and therefore the producer. */ char line[LEN_LINE]; int i; FILE *textFile; struct timespec requestedTime; struct timespec remainingTime; // Get command line args. if (argc < 10) { fprintf(stderr, USAGE); exit(1); } numLines = maxCounters = fileDelay = threadDelay = -1; i = 1; errno = 0; while (argv[i] && i < 9) { // Get numLines. if (strcmp(argv[i], "-b") == 0) { i++; if (argv[i]) { numLines = strtol(argv[i], NULL, 10); if (errno != 0 || numLines <= 0) { fprintf(stderr, "counter: num_lines must be a positive, non-zero integer.\n"); exit(1); } } else { fprintf(stderr, USAGE); exit(1); } } // Get maxCounters. else if (strcmp(argv[i], "-t") == 0) { i++; if (argv[i]) { maxCounters = strtol(argv[i], NULL, 10); if (errno != 0 || maxCounters <= 0 || maxCounters > 26) { fprintf(stderr, "counter: max_counters must be a positive, non-zero integer no greater than 26.\n"); exit(1); } } else { fprintf(stderr, USAGE); exit(1); } } // Get fileDelay. else if (strcmp(argv[i], "-d") == 0) { i++; if (argv[i]) { fileDelay = strtol(argv[i], NULL, 10); if (errno != 0 || fileDelay < 0) { fprintf(stderr, "counter: file_delay must be a positive integer.\n"); exit(1); } } else { fprintf(stderr, USAGE); exit(1); } } // Get threadDelay. else if (strcmp(argv[i], "-D") == 0) { i++; if (argv[i]) { threadDelay = strtol(argv[i], NULL, 10); if (errno != 0 || threadDelay < 0) { fprintf(stderr, "counter: thread_delay must be a positive integer.\n"); exit(1); } } else { fprintf(stderr, USAGE); exit(1); } } else { if (numLines < 0 || maxCounters < 0 || fileDelay < 0 || threadDelay < 0) { fprintf(stderr, USAGE); exit(1); } } i++; } #ifdef atcdebug fprintf(stderr, "reader: numLines = %d, maxCounters = %d, fileDelay = %d, " "threadDelay = %d\n", numLines, maxCounters, fileDelay, threadDelay); #endif // Set up thread environment. initBuffer(&buffer); initLinkedList(&evenList); initLinkedList(&oddList); nextThread = 'a'; moreStuff = TRUE; // Start up first counter (consumer) thread. createThread(); // **** Reader (producer) thread **** textFile = NULL; requestedTime.tv_sec = threadDelay / 1000; requestedTime.tv_nsec = threadDelay % 1000 * 1000000; while (argv[i]) { // Open each file given in argv[]. if (!(textFile = fopen(argv[i], "r"))) { fprintf(stderr, "counter: could not open %s\n", argv[i]); } else { // Read each line in the file and insert it into the buffer. while (fgets(line, LEN_LINE, textFile)) { // Strip newline. line[strcspn(line, "\n")] = '\0'; if (numLines == 1) { // Attempt to create a new thread if function says // buffer was full. if (putInSingleBuffer(&buffer, line) == TRUE) { createThread(); } } else { // Attempt to create a new thread if put function says // buffer was full. if (putInBuffer(&buffer, line) == TRUE) { createThread(); } } // Sleep for allotted time. while (nanosleep(&requestedTime, &remainingTime) == -1) { requestedTime = remainingTime; } } fclose(textFile); } i++; } // Let counter threads know that there won't be any more input and wait // for threads to finish before printing and terminating. moreStuff = FALSE; sleep(2); for (i = 0; i < MAX_THREADS; i++) { if (threadPool[i]) { pthread_cancel(threadPool[i]); } } printf("\n==== Words with an even number of letters ====\n"); printList(&evenList); printf("\n==== Words with an odd number of letters ====\n"); printList(&oddList); freeList(&evenList); freeList(&oddList); exit(0); }