Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to Read Frames from uStreamer Unix Socket in Golang #308

Open
iamsavani opened this issue Mar 17, 2025 · 4 comments
Open

Unable to Read Frames from uStreamer Unix Socket in Golang #308

iamsavani opened this issue Mar 17, 2025 · 4 comments

Comments

@iamsavani
Copy link

iamsavani commented Mar 17, 2025

I'm trying to integrate uStreamer with a Go (Golang) application to capture raw frames via a Unix socket and send them over WebRTC using Pion. However, I am unable to read frames from the socket, and the connection either closes immediately or returns EOF.

Start uStreamer with the following command:
ustreamer --device=/dev/video1 --encoder=hw --unix=/run/ustreamer.sock --unix-rm --format=yuyv --port=8002 --host=0.0.0.0 --unix-mode=0660

Run my Go code to read frames from the socket:

conn, err := net.Dial("unix", "/run/ustreamer.sock")
    if err != nil {
        log.Fatalf("Failed to connect to uStreamer socket: %v", err)
    }
    defer conn.Close()

    inboundPacket := make([]byte, frameSize)

    for {
        n, err := conn.Read(inboundPacket)
        if err != nil {
            log.Printf("Error during read: %v", err)
            return
        }
        // send frame to the channel
    }

While running above code. I am getting this error Error during read

Log

-- INFO  [42422.581    stream] -- Device fd=8 opened
-- INFO  [42422.581    stream] -- Using input channel: 0
-- ERROR [42422.581    stream] -- Can't set input channel
-- INFO  [42422.582    stream] -- Device fd=8 closed
-- INFO  [42422.582    stream] -- Sleeping 1 seconds before new stream init ...

I am not sure if I am using uStreamer correctly, but the idea is to use it with a different WebRTC library other than Janus.

@mdevaev
Copy link
Member

mdevaev commented Mar 17, 2025

  • uStreamer does not provide raw interface over unix socket, it's http-over-unix-domain-socket.
  • "Can't set input channel" is a configuration problem.

@iamsavani
Copy link
Author

Thank you for your response! I appreciate the clarification about uStreamer’s Unix socket being HTTP-over-UDS.

My goal is to integrate uStreamer natively without relying on Janus, allowing for more flexibility in using different programming languages like Go and Node.js. I’d love to explore how to efficiently access and process the video stream directly within these environments.

If there are any recommended approaches or best practices for achieving this, I’d greatly appreciate your insights!

@mdevaev
Copy link
Member

mdevaev commented Mar 17, 2025

You can use shared memory to access to frames. But I don't have Go experience so I can't tell will ir wrong fine or not.

@iamsavani
Copy link
Author

iamsavani commented Mar 17, 2025

I’m working on a C library that reads frames from a shared memory segment (/dev/shm/test.h264). I intend to use this with Go via cgo bindings. Below is what I have so far, but I’m unsure if I’m handling shared memory access and frame reading correctly.

Key concerns:

  • Am I correctly mapping and reading frames from shared memory?
  • Is there a better way to handle synchronization or detect frame boundaries?
  • How can I ensure that I always get a complete frame and not partial/corrupt data?

Here’s the C code I’ve written:

#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdatomic.h>
#include <time.h>
#include <pthread.h>
#include <math.h>
#include <sys/file.h>
#include <errno.h>
#include <linux/videodev2.h>
#include "plugin.h"

#define US_MEMSINK_MAGIC	((u64)0xCAFEBABECAFEBABE)
#define US_MEMSINK_VERSION	((u32)7)
#define SINK_NAME "test.h264"
#define DATA_SIZE 2 * 1024 * 1024
#define SINK_SIZE (sizeof(us_memsink_shared_s) + DATA_SIZE)

#define US_ERROR_COMMON        -1
#define US_ERROR_NO_DATA       -3


#define INLINE inline __attribute__((always_inline))

static atomic_bool _g_key_required = false;
static us_memsink_shared_s *mem = NULL;
int fd = -1;
static atomic_bool video_sink_open = ATOMIC_VAR_INIT(false);
u64 frame_id;

// Function to initialize a frame
us_frame_s *us_frame_init(void) {
	us_frame_s *frame;
	US_CALLOC(frame, 1);
	us_frame_realloc_data(frame, 512 * 1024);
	frame->dma_fd = -1;
	return frame;
}

us_memsink_shared_s *us_memsink_shared_map(int fd, uz data_size) {
	us_memsink_shared_s *mem = mmap(
		NULL,
		sizeof(us_memsink_shared_s) + data_size,
		PROT_READ | PROT_WRITE, MAP_SHARED,
		fd, 0);
	if (mem == MAP_FAILED) {
		return NULL;
	}
	assert(mem != NULL);
	return mem;
}

// Open video sink
int open_video_sink(MemSink *self) {
    if (atomic_exchange(&video_sink_open, true)) {
        return -1; // Video sink already open
    }

    self->fd = shm_open(SINK_NAME, O_RDWR, 0);
	self->data_size = 2 * 1024 * 1024;
	self->frame_id = 0;
    if (self->fd < 0) {
        close_video_sink(self);
        return -1;
    }

    if ((self->mem = us_memsink_shared_map(self->fd, self->data_size)) == NULL) {
        close_video_sink(self);
        return -1;
    }
    assert(self->mem != NULL);

    self->frame = us_frame_init();

    return 0;
}

// Close video sink
void close_video_sink(MemSink *self) {
    if (atomic_exchange(&video_sink_open, false)) {
        if (self->mem) {
            munmap(self->mem, SINK_SIZE);
        }
        if (self->fd >= 0) {
            close(self->fd);
        }
    }
}

INLINE void us_get_now(clockid_t clk_id, time_t *sec, long *msec) {
	struct timespec ts;
	assert(!clock_gettime(clk_id, &ts));
	*sec = ts.tv_sec;
	*msec = round(ts.tv_nsec / 1.0e6);

	if (*msec > 999) {
		*sec += 1;
		*msec = 0;
	}
}

INLINE double us_get_now_monotonic(void) {
	time_t sec;
	long msec;
	us_get_now(CLOCK_MONOTONIC, &sec, &msec);
	return (double)sec + ((double)msec) / 1000;
}

INLINE int us_flock_timedwait_monotonic(int fd, double timeout) {
	const double deadline_ts = us_get_now_monotonic() + timeout;
	int retval = -1;
	while (true) {
		retval = flock(fd, LOCK_EX | LOCK_NB);
		if (retval == 0 || errno != EWOULDBLOCK || us_get_now_monotonic() > deadline_ts) {
			break;
		}
		if (usleep(1000) < 0) {
			break;
		}
	}
	return retval;
}

// Wait for a new frame
int us_memsink_fd_wait_frame(MemSink *self) {
	const double deadline_ts = us_get_now_monotonic() + 1; // wait_timeout
	double now_ts;
	int locked = -1;
	do {
		locked = us_flock_timedwait_monotonic(self->fd, 1); // lock_timeout
		now_ts = us_get_now_monotonic();
		if (locked < 0) {
			if (errno == EWOULDBLOCK) {
				goto retry;
			}
			goto os_error;
		}

		us_memsink_shared_s *mem = self->mem;
		if (mem->magic != US_MEMSINK_MAGIC || mem->version != US_MEMSINK_VERSION) {
			goto retry;
		}

		mem->last_client_ts = now_ts;
		if (mem->id == self->frame_id) {
			goto retry;
		}

		return 0;

	os_error:
		return -1;
	retry:
		if (locked >= 0 && flock(self->fd, LOCK_UN) < 0) {
			goto os_error;
		}
		if (usleep(1000) < 0) {
			goto os_error;
		}
	} while (now_ts < deadline_ts);
	return US_ERROR_NO_DATA;
}

void us_frame_realloc_data(us_frame_s *frame, uz size) {
	if (frame->allocated < size) {
		US_REALLOC(frame->data, size);
		frame->allocated = size;
	}
}

void us_frame_set_data(us_frame_s *frame, const u8 *data, uz size) {
	us_frame_realloc_data(frame, size);
	memcpy(frame->data, data, size);
	frame->used = size;
}

u8 *us_memsink_get_data(us_memsink_shared_s *mem) {
	return (u8*)(mem) + sizeof(us_memsink_shared_s);
}

// Get a frame from the shared memory
int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required) {
    us_frame_set_data(frame, us_memsink_get_data(mem), mem->used);
    US_FRAME_COPY_META(mem, frame);
    *frame_id = mem->id;
    mem->last_client_ts = us_get_now_monotonic();
	if (key_required) {
		mem->key_requested = true;
	}
    bool retval = 0;
	if (frame->format != V4L2_PIX_FMT_H264) {
		printf("Got non-H264 frame from memsink");
		retval = -1;
	}
	if (flock(fd, LOCK_UN) < 0) {
		printf("Can't unlock memsink");
		retval = -1;
	}
	return retval;
}

// Retrieve the next frame from shared memory
us_frame_s* next_get_frame(MemSink *self) {
	if (self->mem == NULL || self->fd <= 0) {
		return NULL;
	}
    const int waited = us_memsink_fd_wait_frame(self);
    if(waited == 0) {
        us_memsink_shared_s *mem = self->mem;
		us_frame_set_data(self->frame, us_memsink_get_data(mem), mem->used);
		US_FRAME_COPY_META(self->mem, self->frame);
		self->frame_id = mem->id;
		self->frame_ts = us_get_now_monotonic();
		printf("Frame Format %u\n", self->frame->format);
		return self->frame;
    }
    return NULL; // No frame available
}

Any guidance or suggestions would be much appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants