diff options
-rw-r--r-- | snowcast_server_concurrent.c | 42 | ||||
-rwxr-xr-x | test | bin | 52849 -> 52785 bytes |
2 files changed, 19 insertions, 23 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c index 7f70294..0586635 100644 --- a/snowcast_server_concurrent.c +++ b/snowcast_server_concurrent.c @@ -19,6 +19,7 @@ typedef struct user { int udpPort; int stationNum; int sockfd; + pthread_t streamThread; } user_t; #define NUM_STATIONS 2 @@ -37,10 +38,8 @@ int start_threads = 0; int max_active_users = 0; pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; -// array from fd to users +// array from index to user_data user_t *user_data; -// array from fd to user's stream thread -pthread_t *user_stream_threads; station_t station_data[NUM_STATIONS]; int sockfd_to_user[MAX_USERS + 4]; @@ -71,11 +70,6 @@ main(int argc, char *argv[]) user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc"); return 1; } - user_stream_threads = malloc(sizeof(pthread_t) * max_active_users); - if (!user_stream_threads) - { - perror("malloc"); return 1; - } // thread that manages file descriptors pthread_t s_thread, sync_thread; @@ -121,26 +115,29 @@ void *send_udp_packet_routine(void *arg) { int did_send_data = 0; int did_load_data = 0; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int * i = (int *) arg; while (1) { - pthread_mutex_lock(&mutex); + pthread_mutex_lock(&m); if (!start_threads && did_send_data && did_load_data) { did_load_data = 0; did_send_data = 0; } while(!start_threads) { - pthread_cond_wait(&cond, &mutex); + pthread_cond_wait(&cond, &m); } - pthread_mutex_unlock(&mutex); - if(!did_send_data && start_threads) { + pthread_mutex_unlock(&m); + if(!did_send_data) { printf("send data: thread %d \n", i); did_send_data = 1; } - if(!did_load_data && start_threads) { + if(!did_load_data) { printf("load data: thread %d \n", i); did_load_data = 1; } + } return NULL; } @@ -179,9 +176,10 @@ void *synchronization_thread(void *arg) { start_threads = 1; printf("\nbroadcast %d\n", c++); pthread_cond_broadcast(&cond); - usleep(10000); + usleep(1000); start_threads = 0; sleep(1); + } } @@ -408,14 +406,12 @@ void *init_user(int sockfd) { user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; - pthread_t *more_stream_threads = realloc(user_stream_threads, sizeof(pthread_t) * max_active_users); - if (!more_stream_threads) { perror("realloc"); exit(1); } - user_stream_threads = more_stream_threads; } // map sockfd to this user index & create its stream thread - user_data[user_index] = (user_t) {-1, -1, sockfd}; + pthread_t user_thread; + pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index); + user_data[user_index] = (user_t){-1, -1, sockfd, user_thread}; sockfd_to_user[sockfd] = user_index; - pthread_create(&user_stream_threads[user_index], NULL, send_udp_packet_routine, (void *)sockfd); // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); } @@ -430,16 +426,16 @@ void *update_user_station(int sockfd, int stationNum) { pthread_mutex_unlock(&mutex_user_data); } void *print_user_data(int index) { - printf("udpPort: %d, stationNum: %d, sockfd: %d\n", - user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); + printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); } void *destroy_user(int sockfd) { pthread_mutex_lock(&mutex_user_data); // stop the thread streaming to the user - pthread_cancel(user_stream_threads[sockfd_to_user[sockfd]]); + pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread); // "remove" the user from the list of user data - user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; |