allocator_thread: rework message sending structures

simplify code by adding a new at_msg struct that contains both
the message header and the message body.
this allow e.g. atomic writes of the whole message instead of doing
it in 2 steps. unfortunately reads still have to be done in at least
2 steps, since there are no atomicity guarantees[0].
additionally the message header shrunk from 8 to 4 bytes.

[0]: https://stackoverflow.com/questions/14661708/under-what-conditions-are-pipe-reads-atomic
pull/348/head
rofl0r 2020-09-20 17:06:37 +01:00
parent 12e5da1b90
commit ed8f8444ab
2 changed files with 68 additions and 47 deletions

View File

@ -20,6 +20,7 @@
#include "ip_type.h" #include "ip_type.h"
#include "mutex.h" #include "mutex.h"
#include "hash.h" #include "hash.h"
#include "remotedns.h"
/* stuff for our internal translation table */ /* stuff for our internal translation table */
@ -117,7 +118,7 @@ static ip_type4 ip_from_internal_list(char* name, size_t len) {
internal_ips->list[internal_ips->counter] = new_mem; internal_ips->list[internal_ips->counter] = new_mem;
internal_ips->list[internal_ips->counter]->hash = hash; internal_ips->list[internal_ips->counter]->hash = hash;
new_mem = dumpstring((char*) name, len + 1); new_mem = dumpstring((char*) name, len);
if(!new_mem) { if(!new_mem) {
internal_ips->list[internal_ips->counter] = 0; internal_ips->list[internal_ips->counter] = 0;
@ -138,23 +139,12 @@ static ip_type4 ip_from_internal_list(char* name, size_t len) {
/* stuff for communication with the allocator thread */ /* stuff for communication with the allocator thread */
enum at_msgtype {
ATM_GETIP,
ATM_GETNAME,
ATM_EXIT,
};
enum at_direction { enum at_direction {
ATD_SERVER = 0, ATD_SERVER = 0,
ATD_CLIENT, ATD_CLIENT,
ATD_MAX, ATD_MAX,
}; };
struct at_msghdr {
enum at_msgtype msgtype;
size_t datalen;
};
static pthread_t allocator_thread; static pthread_t allocator_thread;
int req_pipefd[2]; int req_pipefd[2];
int resp_pipefd[2]; int resp_pipefd[2];
@ -198,13 +188,11 @@ again:
} }
} }
static int sendmessage(enum at_direction dir, struct at_msghdr *hdr, void* data) { static int sendmessage(enum at_direction dir, struct at_msg *msg) {
static int* destfd[ATD_MAX] = { [ATD_SERVER] = &req_pipefd[1], [ATD_CLIENT] = &resp_pipefd[1] }; static int* destfd[ATD_MAX] = { [ATD_SERVER] = &req_pipefd[1], [ATD_CLIENT] = &resp_pipefd[1] };
int ret = trywrite(*destfd[dir], hdr, sizeof *hdr); assert(msg->h.datalen <= MSG_LEN_MAX);
if(ret && hdr->datalen) { int ret = trywrite(*destfd[dir], msg, sizeof (msg->h)+msg->h.datalen);
assert(hdr->datalen <= MSG_LEN_MAX); assert(msg->h.datalen <= MSG_LEN_MAX);
ret = trywrite(*destfd[dir], data, hdr->datalen);
}
return ret; return ret;
} }
@ -225,17 +213,19 @@ again:
goto again; goto again;
} }
} }
static int readmsg(int fd, struct at_msg *msg) {
int ret = tryread(fd, msg, sizeof(msg->h));
if(ret != 1) return ret;
return tryread(fd, &msg->m, msg->h.datalen);
}
static int getmessage(enum at_direction dir, struct at_msghdr *hdr, void* data) { static int getmessage(enum at_direction dir, struct at_msg *msg) {
static int* readfd[ATD_MAX] = { [ATD_SERVER] = &req_pipefd[0], [ATD_CLIENT] = &resp_pipefd[0] }; static int* readfd[ATD_MAX] = { [ATD_SERVER] = &req_pipefd[0], [ATD_CLIENT] = &resp_pipefd[0] };
ssize_t ret; ssize_t ret;
if((ret = wait_data(*readfd[dir]))) { if((ret = wait_data(*readfd[dir]))) {
if(!tryread(*readfd[dir], hdr, sizeof *hdr)) if(!readmsg(*readfd[dir], msg))
return 0; return 0;
assert(hdr->datalen <= MSG_LEN_MAX); assert(msg->h.datalen <= MSG_LEN_MAX);
if(hdr->datalen) {
ret = tryread(*readfd[dir], data, hdr->datalen);
}
} }
return ret; return ret;
} }
@ -243,26 +233,24 @@ static int getmessage(enum at_direction dir, struct at_msghdr *hdr, void* data)
static void* threadfunc(void* x) { static void* threadfunc(void* x) {
(void) x; (void) x;
int ret; int ret;
struct at_msghdr msg; struct at_msg msg;
union { while((ret = getmessage(ATD_SERVER, &msg))) {
char host[MSG_LEN_MAX]; switch(msg.h.msgtype) {
ip_type4 ip;
} readbuf;
while((ret = getmessage(ATD_SERVER, &msg, &readbuf))) {
switch(msg.msgtype) {
case ATM_GETIP: case ATM_GETIP:
/* client wants an ip for a DNS name. iterate our list and check if we have an existing entry. /* client wants an ip for a DNS name. iterate our list and check if we have an existing entry.
* if not, create a new one. */ * if not, create a new one. */
readbuf.ip = ip_from_internal_list(readbuf.host, msg.datalen - 1); msg.m.ip = ip_from_internal_list(msg.m.host, msg.h.datalen);
msg.datalen = sizeof(ip_type4); msg.h.datalen = sizeof(ip_type4);
break; break;
case ATM_GETNAME: { case ATM_GETNAME: {
char *host = string_from_internal_ip(readbuf.ip); char *host = string_from_internal_ip(msg.m.ip);
if(host) { if(host) {
size_t l = strlen(host); size_t l = strlen(host);
assert(l < MSG_LEN_MAX); assert(l+1 < MSG_LEN_MAX);
memcpy(readbuf.host, host, l + 1); memcpy(msg.m.host, host, l + 1);
msg.datalen = l + 1; msg.h.datalen = l + 1;
} else {
msg.h.datalen = 0;
} }
break; break;
} }
@ -271,7 +259,7 @@ static void* threadfunc(void* x) {
default: default:
abort(); abort();
} }
ret = sendmessage(ATD_CLIENT, &msg, &readbuf); ret = sendmessage(ATD_CLIENT, &msg);
} }
return 0; return 0;
} }
@ -282,27 +270,31 @@ ip_type4 at_get_ip_for_host(char* host, size_t len) {
ip_type4 readbuf; ip_type4 readbuf;
MUTEX_LOCK(internal_ips_lock); MUTEX_LOCK(internal_ips_lock);
if(len > MSG_LEN_MAX) goto inv; if(len > MSG_LEN_MAX) goto inv;
struct at_msghdr msg = {.msgtype = ATM_GETIP, .datalen = len + 1 }; struct at_msg msg = {.h.msgtype = ATM_GETIP, .h.datalen = len + 1 };
if(sendmessage(ATD_SERVER, &msg, host) && memcpy(msg.m.host, host, len+1);
getmessage(ATD_CLIENT, &msg, &readbuf)); if(sendmessage(ATD_SERVER, &msg) &&
getmessage(ATD_CLIENT, &msg)) readbuf = msg.m.ip;
else { else {
inv: inv:
readbuf = ip_type_invalid.addr.v4; readbuf = ip_type_invalid.addr.v4;
} }
assert(msg.msgtype == ATM_GETIP); assert(msg.h.msgtype == ATM_GETIP);
MUTEX_UNLOCK(internal_ips_lock); MUTEX_UNLOCK(internal_ips_lock);
return readbuf; return readbuf;
} }
size_t at_get_host_for_ip(ip_type4 ip, char* readbuf) { size_t at_get_host_for_ip(ip_type4 ip, char* readbuf) {
struct at_msghdr msg = {.msgtype = ATM_GETNAME, .datalen = sizeof(ip_type4) }; struct at_msg msg = {.h.msgtype = ATM_GETNAME, .h.datalen = sizeof(ip_type4), .m.ip = ip };
size_t res = 0; size_t res = 0;
MUTEX_LOCK(internal_ips_lock); MUTEX_LOCK(internal_ips_lock);
if(sendmessage(ATD_SERVER, &msg, &ip) && getmessage(ATD_CLIENT, &msg, readbuf)) { if(sendmessage(ATD_SERVER, &msg) && getmessage(ATD_CLIENT, &msg)) {
if((ptrdiff_t) msg.datalen <= 0) res = 0; if((int16_t) msg.h.datalen <= 0) res = 0;
else res = msg.datalen - 1; else {
memcpy(readbuf, msg.m.host, msg.h.datalen);
res = msg.h.datalen - 1;
}
} }
assert(msg.msgtype == ATM_GETNAME); assert(msg.h.msgtype == ATM_GETNAME);
MUTEX_UNLOCK(internal_ips_lock); MUTEX_UNLOCK(internal_ips_lock);
return res; return res;
} }

29
src/remotedns.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef REMOTEDNS_H
#define REMOTEDNS_H
#include <unistd.h>
#include "ip_type.h"
enum at_msgtype {
ATM_GETIP = 0,
ATM_GETNAME,
ATM_FAIL,
ATM_EXIT,
};
struct at_msghdr {
unsigned char msgtype; /* at_msgtype */
char reserved;
unsigned short datalen;
};
struct at_msg {
struct at_msghdr h;
union {
char host[260];
ip_type4 ip;
} m;
};
#endif