aboutsummaryrefslogtreecommitdiff
path: root/server.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-21 20:42:11 +0000
committersotech117 <michael_foiani@brown.edu>2023-09-21 20:42:11 +0000
commit97ff64f344c99f76bbed6b934236ee65eda8a9ef (patch)
tree9223eb56d7695899acd6cecf303043a05dbd8f73 /server.c
parentc8ea8ec6748e030e00ba8b4a4ee0d58c58a87b1c (diff)
change model to seek
Diffstat (limited to 'server.c')
-rw-r--r--server.c134
1 files changed, 86 insertions, 48 deletions
diff --git a/server.c b/server.c
index 7246885..333f521 100644
--- a/server.c
+++ b/server.c
@@ -17,10 +17,13 @@
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
+#define MAX_STREAM_RATE 16*1024
typedef struct station {
- int seekIndex;
+ int streamFd;
char* filePath;
+ int fileBufferSize;
+ char fileBuffer[MAX_STREAM_RATE];
} station_t;
typedef struct user {
@@ -58,6 +61,9 @@ void *send_udp_packet_routine(void* arg);
void *select_thread(void* arg);
void *synchronization_thread(void* arg);
+void init_station(int station_num, const char *station_name);
+void seek_stations(int station_num);
+
int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]);
void *print_info_routine(void *arg);
@@ -85,20 +91,22 @@ main(int argc, char *argv[])
port = argv[1];
num_stations = argc - 2;
+
// init stations
size_t totalSize = 0;
// get size to malloc
for (int i = 2; i < argc; i++)
{
// printf("file: %s\n", argv[i]);
- totalSize += sizeof(int) + strlen(argv[i]);
+ // each "station" has a fd (int), filePath (char*), file_buffer_size (int), buffer_size (MAX_STREAM_RATE)
+ totalSize += sizeof(int) + strlen(argv[i]) + sizeof(int) + MAX_STREAM_RATE;
}
station_data = malloc(totalSize);
if (!station_data) { perror("malloc station data"); return 1; }
// assign the stations
for (int i = 2; i < argc; i++)
{
- station_data[i - 2] = (station_t) { 0, argv[i]};
+ init_station(i - 2, argv[i]);
}
// make array of user data
@@ -308,35 +316,39 @@ void *send_udp_packet_routine(void *arg) {
if (!did_work) {
// sendto a random string of data to the user
- int station_num = user_data[user_index].stationNum;
- char *data = station_data[station_num].filePath;
+ // int station_num = user_data[user_index].stationNum;
+ // char *data = station_data[station_num].filePath;
// printf("load data: thread %d \n", user_index);
// get file path
- char* file_path = station_data[station_num].filePath;
- // get current seek chunk
- int stream_fd = open(file_path, O_RDONLY);
- if (stream_fd == -1) {
- perror("open");
- return (NULL);
- }
- int current_chunk = station_data[station_num].seekIndex;
- if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) {
- perror("fseek");
- return (NULL);
- }
- size_t BYTES_PER_SECOND = 16*1024;
+ // char* file_path = station_data[station_num].filePath;
+ // // get current seek chunk
+ // int stream_fd = open(file_path, O_RDONLY);
+ // if (stream_fd == -1) {
+ // perror("open");
+ // return (NULL);
+ // }
+ // int current_chunk = station_data[station_num].seekIndex;
+ // if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) {
+ // perror("fseek");
+ // return (NULL);
+ // }
// read 1000 bytes of the file
- char file_buffer[BYTES_PER_SECOND];
- int bytes_read = 0;
- if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) {
- perror("fread");
- return (NULL);
- }
- close(stream_fd);
+ // char file_buffer[BYTES_PER_SECOND];
+ // int bytes_read = 0;
+ // if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) {
+ // perror("fread");
+ // return (NULL);
+ // }
+ // close(stream_fd);
+
+ station_t *station_info = &station_data[station_num];
+ int bytes_read = station_info->fileBufferSize;
+ // potential error here!
+ // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
- if (send_all_udp(udp_sockfd, file_buffer, &bytes_read, thread_res) == -1)
+ if (send_all_udp(udp_sockfd, station_info->fileBuffer, &bytes_read, thread_res) == -1)
{
perror("send_all_udp");
printf("We only sent %d bytes because of the error!\n", bytes_read);
@@ -385,31 +397,13 @@ void *synchronization_thread(void *arg) {
start_threads = 0;
size_t BYTES_PER_SECOND = 16*1024;
+ // seek each file and read
for (int i = 0; i < num_stations; i++)
{
- // get size of file
- struct stat f_info;
- if (stat(station_data[i].filePath, &f_info) == -1) {
- perror("fstat");
- return (NULL);
- }
-
- size_t size = f_info.st_size;
-
- station_data[i].seekIndex += BYTES_PER_SECOND;
- // if the seek index is greater than the size of the file, reset it
- if (station_data[i].seekIndex >= size)
- {
- station_data[i].seekIndex = 0;
-
- pthread_t send_announce_thread;
- pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)i);
- }
+ seek_stations(i);
}
-
- usleep(2000);
- usleep(1000000-4000);
+ usleep(1000000-2000);
}
}
@@ -769,7 +763,7 @@ void send_announce_reply(int fd, int station_num) {
void send_invalid_command_reply(int fd, size_t message_size, char* message) {
char *send_buffer = malloc(message_size+2);
if (!send_buffer) {
- perror("malloc in send ancalid command");
+ perror("malloc in send invalid command");
return;
}
@@ -786,6 +780,50 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) {
free(send_buffer);
}
+void init_station(int station_num, const char* station_name) {
+ station_t *station = &station_data[station_num];
+
+ // open the file
+ int stream_fd = open(station_name, O_RDONLY);
+ if (stream_fd == -1) {
+ perror("open");
+ return;
+ }
+ station->streamFd = stream_fd;
+ station->filePath = station_name;
+
+ // setup file buffer
+ char stream_buffer[MAX_STREAM_RATE];
+ memset(stream_buffer, 0, MAX_STREAM_RATE);
+
+ station->fileBufferSize = MAX_STREAM_RATE;
+ memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE);
+
+
+ // load the first buffer into the stations
+ seek_stations(station_num);
+}
+
+void seek_stations(int station_num) {
+ station_t *station_info = &station_data[station_num];
+ memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE);
+ int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+ // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
+
+ // time to restart the file
+ if (bytes_read == 0) {
+ if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) {
+ perror("fseek");
+ }
+ pthread_t send_announce_thread;
+ pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num);
+
+ // load first chunk
+ bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+ }
+ station_info->fileBufferSize = bytes_read;
+}
+
// Parses a buffer into tokens, from cs33 :)
int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) {