diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/broevent.c | 85 | ||||
-rw-r--r-- | src/fifoqueue.c | 132 | ||||
-rw-r--r-- | src/midbro.c | 74 |
3 files changed, 291 insertions, 0 deletions
diff --git a/src/broevent.c b/src/broevent.c new file mode 100644 index 0000000..362ed5e --- /dev/null +++ b/src/broevent.c @@ -0,0 +1,85 @@ +#include "fifoqueue.h" +#include "broevent.h" +#ifdef BROCCOLI +#include <broccoli.h> +#endif + +char *host_default = "127.0.0.1"; +char *port_default = "47760"; +Fifo_q * q; + + static void +modbus_register_received(BroConn *conn, void *data, BroRecord *record) +{ + int type = BRO_TYPE_COUNT; + uint64 *address = NULL; + uint64 *value = NULL; + + // TODO: handle regtype + address = bro_record_get_named_val(record, "address", &type); + if (!address) { + // TODO: handle error + return; + } + value = bro_record_get_named_val(record, "register", &type); + if (!value) { + // TODO: handle error + return; + } + #ifdef DEBUG + printf("Received value %"PRIu64" from uid=%"PRIu64"\n",*value,*address); + #endif + + add_to_queue(q, create_sensor_object(*value, *address)); + + #ifdef DEBUG + printf("Added to queue.\n"); + #endif +} + + void * +bro_event_listener(void * args) +{ + q = (Fifo_q *) args; + int fd = -1; + BroConn *bc = NULL; + bro_init(NULL); + char hostname[512]; + + snprintf(hostname, 512, "%s:%s", host_default, port_default); + if (! (bc = bro_conn_new_str(hostname, BRO_CFLAG_RECONNECT | BRO_CFLAG_ALWAYS_QUEUE))) + { + printf("Could not get Bro connection handle.\n"); + exit(-1); + } + bro_debug_calltrace = 0; + bro_debug_messages = 0; + + bro_event_registry_add(bc, "pasad_register_received", + (BroEventFunc) modbus_register_received, NULL); + + if (! bro_conn_connect(bc)) + { + printf("Could not connect to Bro at %s:%s.\n", host_default, + port_default); + exit(-1); + } + + fd =bro_conn_get_fd(bc); + fd_set rfds; + setbuf(stdout,NULL); + + while(true) + { + FD_ZERO(&rfds); + FD_SET(fd,&rfds); + if(select(fd+1,&rfds,NULL,NULL,NULL) == -1){ + printf("select(): Bad file descriptor"); + break; + } + + bro_conn_process_input(bc); + } + + bro_conn_delete(bc); +} diff --git a/src/fifoqueue.c b/src/fifoqueue.c new file mode 100644 index 0000000..e7ed8f6 --- /dev/null +++ b/src/fifoqueue.c @@ -0,0 +1,132 @@ +#include <pthread.h> +#include <stdlib.h> +#include <stdio.h> +#include <semaphore.h> +#include "types.h" +#include "fifoqueue.h" + + Fifo_q * +init_queue(int size) +{ + Fifo_q * q = (Fifo_q *) malloc(sizeof(Fifo_q)); + q->head = NULL; + q->tail = NULL; + q->maxSize = size; + q->currentSize = 0; + q->droppedValues = 0; + q->largestBufferSize = 0; + q->valuesReceived = 0; + q->valuesReleased = 0; + /*Queue empty from the beginning (block)*/ + sem_init(&q->bufferEmptyBlock, 0, 0); + sem_init(&q->bufferFullBlock, 0, size); + sem_init(&q->lock, 0, 1); + return q; +} + + boolean +is_full(Fifo_q * q) +{ + if(q->currentSize < q->maxSize) + return false; + else + return true; +} + + boolean +is_empty(Fifo_q * q) +{ + if(q->head==NULL) + return true; + else + return false; +} + + int +add_to_queue(Fifo_q * q, Sensor_t * sensor) +{ + + if(q == NULL){ + printf("Error: Queue not initialized\n"); + free(sensor); //free if not appended + return -1; + } + /* Drop Least Recently or Drop Most Recently */ + #ifdef DLR + if(is_full(q)){ + pop_from_queue(q); + q->droppedValues++; + return 0; + } + #else + sem_wait(&q->bufferFullBlock); + #endif + sem_wait(&q->lock); + Queue_t * new_elem = (Queue_t *) malloc(sizeof(Queue_t)); + new_elem->next = NULL; + new_elem->sensor = sensor; + if(is_empty(q)){ + q->head = new_elem; + }else + q->tail->next = new_elem; + q->tail = new_elem; + q->currentSize++; + q->valuesReceived++; + if(q->currentSize > q->largestBufferSize) + q->largestBufferSize = q->currentSize; + sem_post(&q->lock); + sem_post(&q->bufferEmptyBlock); + return 1; +} + + Sensor_t * +pop_from_queue(Fifo_q * q) +{ + int semStat; + sem_wait(&q->bufferEmptyBlock); + sem_wait(&q->lock); + Queue_t * head = q->head; + Sensor_t * sensor = head->sensor; + /* If dequeue the last element */ + if(q->currentSize == 1){ + q->head = NULL; + q->tail = NULL; + }else{ + q->head = head->next; + } + free(head); + q->currentSize--; + q->valuesReleased++; + sem_post(&q->lock); + #ifndef DLR + sem_post(&q->bufferFullBlock); + #endif + return sensor; +} + + Sensor_t * +create_sensor_object(int value, int uid) +{ + Sensor_t * sensor = (Sensor_t *) malloc(sizeof(Sensor_t)); + sensor->value = value; + sensor->uid = uid; + return sensor; +} + void +print_queue(Fifo_q * q) +{ + sem_wait(&q->lock); + Queue_t * current = q->head; + printf("\nContent of the queue with size=%d\n",q->currentSize); + if(current == NULL){ + printf("The queue is empty!\n"); + sem_post(&q->lock); + return; + } + while(current != NULL){ + printf("sensor value=%d, sensor uid=%d\n", + current->sensor->value, current->sensor->uid); + current = current->next; + } + sem_post(&q->lock); +} diff --git a/src/midbro.c b/src/midbro.c new file mode 100644 index 0000000..e2f9644 --- /dev/null +++ b/src/midbro.c @@ -0,0 +1,74 @@ +#include <pthread.h> +#include <unistd.h> +#include <signal.h> +#include "fifoqueue.h" +#include "broevent.h" +#include "midbro.h" +#ifdef BROCCOLI +#include <broccoli.h> +#endif + +Fifo_q * queue; +pthread_t event_listener; +sigset_t signal_set; + + void +sigint_handler(int signal) +{ + printf("\nStatistics:\n" + "Total values received: %d\n" + "Total values dropped: %d\n" + "Total values released: %d\n" + "Maximum buffer utilization: %d\n" + "Buffer fixed size: %d\n" + "Buffer size upon termination: %d\n", + queue->valuesReceived, queue->droppedValues, + queue->valuesReleased, queue->largestBufferSize, + queue->maxSize, queue->currentSize); + exit(0); +} + void +request_n_values(int number, int arrayOfValues[]) +{ + int i; + Sensor_t * sensor; + for(i=0; i<number; ++i){ + sensor = pop_from_queue(queue); + arrayOfValues[i] = sensor->value; + free(sensor); + } + printf("Release %d sensor data values\n", number); +} + int +request_value() +{ + int value; + Sensor_t * sensor; + sensor = pop_from_queue(queue); + value = sensor->value; + free(sensor); + printf("Release 1 sensor data value\n"); + return value; +} + + void +start_data_capture() +{ + int res; + queue = init_queue(500); /* Initiate queue with fixed size */ + /* Create producer thread that listen for bro events */ + sigemptyset(&signal_set); + sigaddset(&signal_set, SIGINT); + res = pthread_sigmask(SIG_BLOCK, &signal_set, NULL); + if(res != 0) + perror("SIGINT block"); + res = pthread_create(&event_listener, NULL, bro_event_listener, queue); + if(res){ + perror("Unable to create thread"); + exit(-1); + } + res = pthread_sigmask(SIG_UNBLOCK, &signal_set, NULL); + if(res != 0) + perror("SIGINT unblock"); + signal(SIGINT, sigint_handler); +} |