Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions rawspec.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ ssize_t read_fully(int fd, void * buf, size_t bytes_to_read)
return total_bytes_read;
}

ssize_t read_to_skip(int fd, size_t bytes_to_read) {
char tmp_buffer[8192];

ssize_t remaining_bytes = bytes_to_read;
ssize_t bytes_read, next_read;

while(remaining_bytes > 0) {
next_read = ((remaining_bytes - 8192) > 0) ? 8192 : remaining_bytes;
if((bytes_read = read_fully(fd, tmp_buffer, next_read)) != next_read) {
printf("read_to_skip: failed to read requested %ld bytes (%ld bytes read)\n", next_read, bytes_read);
return bytes_read;
}
remaining_bytes -= bytes_read;
}

return bytes_to_read - remaining_bytes;

}

static struct option long_opts[] = {
{"dest", 1, NULL, 'd'},
{"ffts", 1, NULL, 'f'},
Expand Down Expand Up @@ -703,15 +722,28 @@ char tmp[16];
} // irregular pktidx step

// Seek past first schan channel
lseek(fdin, 2 * ctx.Np * schan * (ctx.Nbps/8) * ctx.Ntpb, SEEK_CUR);
if(!raw_hdr.fifo) {
lseek(fdin, 2 * ctx.Np * schan * (ctx.Nbps / 8) * ctx.Ntpb, SEEK_CUR);
} else {
if(read_to_skip(fdin, 2 * ctx.Np * schan * (ctx.Nbps / 8) * ctx.Ntpb) < 0) {
break; // Give up on this stem and go to next stem
}
}


// Read ctx.Nc coarse channels from this block
bytes_read = read_fully(fdin,
ctx.h_blkbufs[bi % ctx.Nb_host],
2 * ctx.Np * ctx.Nc * (ctx.Nbps/8) * ctx.Ntpb);

// Seek past channels after schan+nchan
lseek(fdin, 2 * ctx.Np * (raw_hdr.obsnchan-(schan+Nc)) * (ctx.Nbps/8) * ctx.Ntpb, SEEK_CUR);
if(!raw_hdr.fifo) {
lseek(fdin, 2 * ctx.Np * (raw_hdr.obsnchan - (schan + Nc)) * (ctx.Nbps / 8) * ctx.Ntpb, SEEK_CUR);
} else {
if(read_to_skip(fdin, 2 * ctx.Np * (raw_hdr.obsnchan - (schan + Nc)) * (ctx.Nbps / 8) * ctx.Ntpb) < 0) {
break; // Give up on this stem and go to next stem
}
}

if(bytes_read == -1) {
perror("read");
Expand Down
58 changes: 52 additions & 6 deletions rawspec_rawutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
#include <unistd.h>
#include <stdint.h>
#include <string.h>
#include <sys/stat.h>
#include <errno.h>

#include "rawspec_rawutils.h"
#include "hget.h"


int32_t rawspec_raw_get_s32(const char * buf, const char * key, int32_t def)
{
char tmpstr[48];
Expand Down Expand Up @@ -137,7 +140,7 @@ int rawspec_raw_header_size(char * hdr, size_t len, int directio)
// Loop over the 80-byte records
for(i=0; i<len; i += 80) {
// If we found the "END " record
if(!strncmp(hdr+i, "END ", 4)) {
if(!strncmp(hdr+i, RAWPSEC_HEADER_END_KEY, 4)) {
//printf("header_size: found END at record %d\n", i);
// Move to just after END record
i += 80;
Expand Down Expand Up @@ -196,11 +199,50 @@ off_t rawspec_raw_read_header(int fd, rawspec_raw_hdr_t * raw_hdr)
// Ensure that hdr is aligned to a 512-byte boundary so that it can be used
// with files opened with O_DIRECT.
char hdr[MAX_RAW_HDR_SIZE] __attribute__ ((aligned (512)));
int hdr_size;
off_t pos = lseek(fd, 0, SEEK_CUR);
int hdr_size = 0;

if(raw_hdr->hdr_size == 0) {
struct stat file_stat;
if(fstat(fd, &file_stat) < 0) {
printf("fstat failed [%d]\n", errno);
return -1;
}

if(S_ISFIFO(file_stat.st_mode)) {
raw_hdr->fifo = 1;
}

// Read header (plus some data, probably)
hdr_size = read(fd, hdr, MAX_RAW_HDR_SIZE);
}
off_t pos;

if(!raw_hdr->fifo) {
pos = lseek(fd, 0, SEEK_CUR);

// Read header (plus some data, probably)
hdr_size = read(fd, hdr, MAX_RAW_HDR_SIZE);
} else {
pos = 0;
size_t read_len;
int key_length = strlen(RAWPSEC_HEADER_END_KEY);
while((read_len = read(fd, &(hdr[hdr_size]), RAWSPEC_HEADER_ENTRY_LEN)) == RAWSPEC_HEADER_ENTRY_LEN) {
if(strncmp(&(hdr[hdr_size]), RAWPSEC_HEADER_END_KEY, key_length) == 0) {
break;
}
hdr_size += read_len;

if(hdr_size > MAX_RAW_HDR_SIZE) {
printf("FIFO passed max header size (%d)\n", MAX_RAW_HDR_SIZE);
return -1;
}
}

if(read_len != RAWSPEC_HEADER_ENTRY_LEN) {
printf("last read was not the expected size");
return -1;
}

hdr_size += read_len;
}

if(hdr_size == -1) {
return -1;
Expand Down Expand Up @@ -253,7 +295,11 @@ off_t rawspec_raw_read_header(int fd, rawspec_raw_hdr_t * raw_hdr)
//printf("RRP: hdr=%lu\n", hdr_size);

// Seek forward from original position past header (and any padding)
pos = lseek(fd, pos + hdr_size, SEEK_SET);
if(!raw_hdr->fifo) {
pos = lseek(fd, pos + hdr_size, SEEK_SET);
} else {
pos = hdr_size;
}
//printf("RRP: seek=%ld\n", pos);

return pos;
Expand Down
7 changes: 6 additions & 1 deletion rawspec_rawutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

typedef struct {
int directio;
int fifo;
size_t blocsize;
unsigned int npol;
unsigned int obsnchan;
Expand All @@ -26,8 +27,12 @@ typedef struct {
size_t hdr_size; // Size of header in bytes (not including DIRECTIO padding)
} rawspec_raw_hdr_t;


#define RAWSPEC_HEADER_ENTRY_LEN (80)
// Multiple of 80 and 512
#define MAX_RAW_HDR_SIZE (25600)
#define MAX_RAW_HDR_SIZE (512 * RAWSPEC_HEADER_ENTRY_LEN)

#define RAWPSEC_HEADER_END_KEY "END "

#ifdef __cplusplus
extern "C" {
Expand Down