aboutsummaryrefslogtreecommitdiff
path: root/snowcast_server_concurrent.c
diff options
context:
space:
mode:
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r--snowcast_server_concurrent.c87
1 files changed, 57 insertions, 30 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c
index d853507..77072fe 100644
--- a/snowcast_server_concurrent.c
+++ b/snowcast_server_concurrent.c
@@ -10,6 +10,8 @@
#include <sys/stat.h>
#include <fcntl.h>
+#include <sys/types.h>
+
#include "protocol.h"
@@ -206,7 +208,7 @@ void *print_info_routine(void *arg) {
write(print_fd, file_path, strlen(file_path));
for (int j = 0; j < max_active_users; j++) {
- if (user_data[j].sockfd == -1)
+ if (!user_data[j].sockfd || user_data[j].sockfd == -1)
continue;
if (user_data[j].stationNum == i) {
char *localhost_ip = ",127.0.0.1:";
@@ -327,20 +329,24 @@ void *send_udp_packet_routine(void *arg) {
// get file path
char* file_path = station_data[station_num].filePath;
// get current seek chunk
- FILE* file_stream = fopen(file_path, "r");
+ int stream_fd = open(file_path, O_RDONLY);
+ if (stream_fd == -1) {
+ perror("open");
+ return (NULL);
+ }
int current_chunk = station_data[station_num].seekIndex;
- if (fseek(file_stream, current_chunk, SEEK_SET) == -1) {
+ if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) {
perror("fseek");
return (NULL);
}
size_t BYTES_PER_SECOND = 16*1024;
// read 1000 bytes of the file
char file_buffer[BYTES_PER_SECOND];
- if (fread(file_buffer, BYTES_PER_SECOND, 1, file_stream) == -1) {
+ if (read(stream_fd, file_buffer, BYTES_PER_SECOND) == -1) {
perror("fread");
return (NULL);
}
- fclose(file_stream);
+ close(stream_fd);
// printf("send data: thread %d \n", user_index);
// int numbytes;
// if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0,
@@ -381,14 +387,15 @@ void *send_announce_routine(void *arg) {
// if (user_data[i].streamThread == NULL) {
// break;
// }
- if (user_data[i].sockfd == -1)
+ if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) {
continue;
+ }
// print_user_data(i);
// update the station of each user
if (user_data[i].stationNum == station_num)
{
// printf("sending announce to user %d\n", i);
- send_announce_reply(user_data[i].sockfd, i);
+ send_announce_reply(user_data[i].sockfd, station_num);
}
}
}
@@ -411,14 +418,24 @@ void *synchronization_thread(void *arg) {
{
// printf("checking station %d\n", i);
// get size of file
- FILE* fp = fopen(station_data[i].filePath, "r");
- fseek(fp, 0L, SEEK_END);
- size_t size = ftell(fp);
- fclose(fp);
- if (size == -1) {
- perror("ftell");
+ struct stat f_info;
+ // int file_fd = open(station_data[i].filePath, O_RDONLY);
+ // if (file_fd == -1) {
+ // perror("open");
+ // return (NULL);
+ // }
+ if (stat(station_data[i].filePath, &f_info) == -1) {
+ perror("fstat");
return (NULL);
}
+
+ size_t size = f_info.st_size;
+
+ // fclose(file_fd);
+ // if (size == -1) {
+ // perror("ftell");
+ // return (NULL);
+ // }
station_data[i].seekIndex += BYTES_PER_SECOND;
// if the seek index is greater than the size of the file, reset it
if (station_data[i].seekIndex >= size)
@@ -550,7 +567,7 @@ void *select_thread(void *arg) {
// got error or connection closed by client
if (nbytes == 0) {
// connection closed
- printf("selectserver: socket %d hung up\n", i);
+ // printf("selectserver: socket %d hung up\n", i);
} else {
perror("recv");
}
@@ -620,39 +637,46 @@ void *select_thread(void *arg) {
} // END got new incoming connection
} // END looping through file descriptors
- // broadcast the new files over the udp socket list for each use
-
} // END for(;;)--and you thought it would never end!
}
void *init_user(int sockfd) {
// add the user to the list of user data
pthread_mutex_lock(&mutex_user_data);
-
// this is to save memory space.
// in general, the displacement of 4 is where a user "used to be"
- int user_index = max_active_users++;
+ // int user_index = max_active_users++;
+ // int running_index = 0;
+ // while(running_index < max_active_users)
+ // {
+ // if (user_data[running_index++].sockfd == -1)
+ // {
+ // user_index = running_index;
+ // break;
+ // }
+ // // printf("reusing memory\n");
+ // }
int running_index = 0;
- while(running_index++ < max_active_users)
- {
- if (user_data[running_index].sockfd == -1)
- {
- user_index = running_index;
+ while(running_index < max_active_users) {
+ if (user_data[running_index].sockfd == -1) {
break;
}
- // printf("reusing memory\n");
+ running_index++;
}
- // have to make more memory
- if (user_index == max_active_users) {
- ///printf("making new memory\n");
+ if (running_index == max_active_users) {
+ // printf("reached max active users\n");
+ printf("making new memory\n");
+ max_active_users++;
+ // TODO: FIX SO THAT IT USES CURRENT USERS, NOT MAX_ACTIVE_USERS FOT THE RESIZE
user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users);
if (!more_users) { perror("realloc"); exit(1); }
user_data = more_users;
}
+
// map TCP sockfd to this user index
- user_data[user_index] = (user_t){-1, -1, sockfd, -1};
- sockfd_to_user[sockfd] = user_index;
+ user_data[running_index] = (user_t){-1, -1, sockfd, -1};
+ sockfd_to_user[sockfd] = running_index;
// free(user_stream_threads);
pthread_mutex_unlock(&mutex_user_data);
}
@@ -675,16 +699,19 @@ void *print_user_data(int index) {
printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n",
user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread);
}
+
void destroy_user(int sockfd) {
pthread_mutex_lock(&mutex_user_data);
-
// stop the thread streaming to the user
+ // TODO: close the FD in the stream thread
pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread);
+ // close(user_data[sockfd_to_user[sockfd]].udpPort);
// pthread_kill(user_data[sockfd_to_user[sockfd]].streamThread, SIGINT);
// "remove" the user from the list of user data
user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1};
// map sockfd to -1
sockfd_to_user[sockfd] = -1;
+ // close(sockfd);
pthread_mutex_unlock(&mutex_user_data);
}