From f4a7033d3629976f9980cb956e2c29315c6306c0 Mon Sep 17 00:00:00 2001 From: Robert Gustafsson Date: Tue, 3 Oct 2017 09:14:55 +0200 Subject: Some refactoring --- broccoli/src/broevent.c | 65 ++++++++++++++++++++++++++ broccoli/src/fifoqueue.c | 114 +++++++++++++++++++++++++++++++++++++++++++++ broccoli/src/midbropasad.c | 33 +++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 broccoli/src/broevent.c create mode 100644 broccoli/src/fifoqueue.c create mode 100644 broccoli/src/midbropasad.c (limited to 'broccoli/src') diff --git a/broccoli/src/broevent.c b/broccoli/src/broevent.c new file mode 100644 index 0000000..3813270 --- /dev/null +++ b/broccoli/src/broevent.c @@ -0,0 +1,65 @@ +#include "fifoqueue.h" +#include "broevent.h" +#ifdef BROCCOLI +#include +#endif + +char *host_default = "127.0.0.1"; +char *port_default = "47760"; +Fifo_q * q; + + static void +bro_response(BroConn *conn, void *data, uint64* registers, uint64* uid) +{ + add_to_queue(q,create_sensor_object(*registers,*uid)); + //printf("Received value %"PRIu64" from uid=%"PRIu64"\n",*registers,*uid); + + conn = NULL; + data = NULL; +} + + void * +bro_event_listener(void * args) +{ + q = (Fifo_q *) args; + int fd = -1; + BroConn *bc = NULL; + bro_init(NULL); + char hostname[512]; + + snprintf(hostname, 512, "%s:%s", host_default, port_default); + if (! (bc = bro_conn_new_str(hostname, BRO_CFLAG_RECONNECT | BRO_CFLAG_ALWAYS_QUEUE))) + { + printf("Could not get Bro connection handle.\n"); + exit(-1); + } + bro_debug_calltrace = 0; + bro_debug_messages = 0; + + bro_event_registry_add(bc, "response",(BroEventFunc) bro_response, NULL); + + if (! bro_conn_connect(bc)) + { + printf("Could not connect to Bro at %s:%s.\n", host_default, + port_default); + exit(-1); + } + + fd =bro_conn_get_fd(bc); + fd_set rfds; + setbuf(stdout,NULL); + + while(true) + { + FD_ZERO(&rfds); + FD_SET(fd,&rfds); + if(select(fd+1,&rfds,NULL,NULL,NULL) == -1){ + perror("select()"); + break; + } + + bro_conn_process_input(bc); + } + + bro_conn_delete(bc); +} diff --git a/broccoli/src/fifoqueue.c b/broccoli/src/fifoqueue.c new file mode 100644 index 0000000..9b972e7 --- /dev/null +++ b/broccoli/src/fifoqueue.c @@ -0,0 +1,114 @@ +#include +#include +#include +#include "types.h" +#include "fifoqueue.h" + +pthread_mutex_t lock; +pthread_mutex_t bufferEmptyBlock; + + Fifo_q * +init_queue(int size) +{ + Fifo_q * q = (Fifo_q *) malloc(sizeof(Fifo_q)); + q->head = NULL; + q->tail = NULL; + q->maxSize = size; + q->currentSize = 0; + if (pthread_mutex_init(&lock, NULL) != 0) + { + printf("WARNING: Couldn't initialize lock\n"); + } + if (pthread_mutex_init(&bufferEmptyBlock, NULL) != 0) + { + printf("WARNING: Couldn't initialize blocking lock\n"); + } + pthread_mutex_lock(&bufferEmptyBlock); + return q; +} + + boolean +is_full(Fifo_q * q) +{ + if(q->currentSize < q->maxSize) + return false; + else + return true; +} + + boolean +is_empty(Fifo_q * q) +{ + if(q->head==NULL) + return true; + else + return false; +} + + int +add_to_queue(Fifo_q * q, Sensor_t * sensor) +{ + + pthread_mutex_lock(&lock); + /* TODO delete first one if full */ + if(q == NULL){ + return -1; + } + else if(is_full(q)){ + return -1; + } + Queue_t * new_elem = (Queue_t *) malloc(sizeof(Queue_t)); + new_elem->next = NULL; + new_elem->sensor = sensor; + if(is_empty(q)){ + q->head = new_elem; + pthread_mutex_unlock(&bufferEmptyBlock); + }else + q->tail->next = new_elem; + q->tail = new_elem; + q->currentSize++; + pthread_mutex_unlock(&lock); + return 1; +} + + Sensor_t * +pop_from_queue(Fifo_q * q) +{ + + if(is_empty(q)){ + perror("The queue is empty"); + pthread_mutex_lock(&bufferEmptyBlock); + } + pthread_mutex_lock(&lock); + Queue_t * head = q->head; + q->head = head->next; + Sensor_t * sensor = head->sensor; + free(head); + q->currentSize--; + pthread_mutex_unlock(&lock); + return sensor; +} + + Sensor_t * +create_sensor_object(int value, int uid){ + Sensor_t * sensor = (Sensor_t *) malloc(sizeof(Sensor_t)); + sensor->value = value; + sensor->uid = uid; + return sensor; +} + void +print_queue(Fifo_q * q) +{ + pthread_mutex_lock(&lock); + Queue_t * current = q->head; + if(current == NULL){ + printf("The queue is empty!"); + return; + } + while(current != NULL){ + printf("sensor value=%d, sensor uid=%d\n", + current->sensor->value, current->sensor->uid); + current = current->next; + } + pthread_mutex_unlock(&lock); +} diff --git a/broccoli/src/midbropasad.c b/broccoli/src/midbropasad.c new file mode 100644 index 0000000..756e558 --- /dev/null +++ b/broccoli/src/midbropasad.c @@ -0,0 +1,33 @@ +#include +#include +#include "fifoqueue.h" +#include "broevent.h" +#ifdef BROCCOLI +#include +#endif + + void +start_data_capture(Fifo_q * q) +{ + int res; + pthread_t event_listener; + res = pthread_create(&event_listener, NULL, bro_event_listener, q); + if(res){ + perror("Unable to create thread"); + exit(-1); + } +} + + int +main(int argc, char **argv) +{ + Fifo_q * q = init_queue(50); + start_data_capture(q); + while(true){ + printf("Main thread\n"); + sleep(10); + print_queue(q); + } + free(q); + return 0; +} -- cgit v1.2.1