Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use io_uring for files on linux #175

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: start work on an io_uring driver
  • Loading branch information
Panke committed Apr 1, 2021
commit b2cec8d2f5cd7a4bc16151651ebaafdd87548dab
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ EpollEventDriver | yes | — | — | — | — | —
WinAPIEventDriver | — | yes | — | — | — | —
KqueueEventDriver | — | — | yes | yes¹ | — | —
LibasyncEventDriver | —¹| —¹| —¹| —¹| — | —
UringEventDriver | —¹| no | no | no | unknown | no

¹ planned, but not currenly implemented

@@ -48,20 +49,20 @@ The following compilers are tested and supported:
Driver development status
-------------------------

Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync
----------------------|--------|-------|---------|---------|----------
TCP Sockets | yes | yes | yes | yes | —
UDP Sockets | yes | yes | yes | yes | —
USDS | yes | yes | — | yes | —
DNS | yes | yes | yes | yes | —
Timers | yes | yes | yes | yes | —
Events | yes | yes | yes | yes | —
Unix Signals | yes² | yes | — | — | —
Files | yes | yes | yes | yes | —
UI Integration | yes¹ | yes¹ | yes | yes¹ | —
File watcher | yes² | yes | yes | yes² | —
Pipes | yes | yes | — | yes | —
Processes | yes | yes | — | yes | —
Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync | Uring
----------------------|--------|-------|---------|---------|----------|-------
TCP Sockets | yes | yes | yes | yes | — | —
UDP Sockets | yes | yes | yes | yes | — | —
USDS | yes | yes | — | yes | — | —
DNS | yes | yes | yes | yes | — | —
Timers | yes | yes | yes | yes | — | —
Events | yes | yes | yes | yes | — | —
Unix Signals | yes² | yes | — | — | — | —
Files | yes | yes | yes | yes | — | yes
UI Integration | yes¹ | yes¹ | yes | yes¹ | — | yes?
File watcher | yes² | yes | yes | yes² | — | —
Pipes | yes | yes | — | yes | — | —
Processes | yes | yes | — | yes | — | —

¹ Manually, by adopting the X11 display connection socket

3 changes: 2 additions & 1 deletion dub.sdl
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@ name "eventcore"
description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities."
license "MIT"
copyright "Copyright © 2016-2018 Sönke Ludwig"

license "MIT"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate license field

dependency "during" version="~>0.2.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a new "uring" configuration at the bottom of the file. Is "during" a local package of yours? I don't see it on code.dlang.org.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I can't test this locally in the current state, my only comment right now is that the integration should be optional as a separate build configuration. Once that is done, everything builds, and the white space changes and Meson build file changes are in separate commits, we could also just document it as experimental and merge it to master, so that improvements can be made in smaller incremental steps.

Great! Than I will polish it up to be suitable for master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a new "uring" configuration at the bottom of the file. Is "during" a local package of yours? I don't see it on code.dlang.org.

It's https://github.com/tchaloupka/during from code.dlang.org

Copy link
Contributor Author

@Panke Panke Mar 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

white space changes

which one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm it's really just two lines in utils.d, nevermind then, I thought it was more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's https://github.com/tchaloupka/during from code.dlang.org

Interesting, the search is obviously broken w.r.t. package names. Accessing it manually works, but I get zero results for "during".

targetType "library"

libs "resolv" platform="linux"
5 changes: 4 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
@@ -8,8 +8,11 @@ project_version_suffix = ''
project_version = meson.project_version()
project_version_full = project_version + project_version_suffix

taggedalgebraic_dep = dependency('taggedalgebraic', version: ['>=0.10.12', '<0.12'])
taggedalgebraic_dep = dependency('taggedalgebraic',
version: ['>=0.10.12', '<0.12'],
fallback: ['taggedalgebraic', 'taggedalgebraic_source_dep'])

source_root = meson.source_root()
build_root = meson.build_root()
subdir('source/eventcore')
subdir('examples')
1 change: 1 addition & 0 deletions source/eventcore/driver.d
Original file line number Diff line number Diff line change
@@ -1019,6 +1019,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileOpenCallback = void delegate(FileFD);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias FileCloseCallback = void delegate(FileFD, CloseStatus);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
32 changes: 25 additions & 7 deletions source/eventcore/drivers/posix/driver.d
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@ import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.posix.io_uring.io_uring : UringEventLoop;
import eventcore.drivers.posix.io_uring.files : UringDriverFiles;
import eventcore.drivers.timer;
import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue;
@@ -51,7 +53,8 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias FileDriver = UringDriverFiles;
else alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
else alias PipeDriver = DummyEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
@@ -61,6 +64,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
else alias ProcessDriver = DummyEventDriverProcesses!Loop;

Loop m_loop;
version (linux) UringEventLoop m_uring;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
@@ -76,15 +80,17 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
this()
@nogc @trusted {
m_loop = mallocT!Loop;
version (linux) m_uring = mallocT!UringEventLoop;
m_sockets = mallocT!SocketsDriver(m_loop);
m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver;
m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, this);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes, m_uring);
m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events);
version (linux) m_files = mallocT!FileDriver(m_uring);
else m_files = mallocT!FileDriver(m_events);
m_watchers = mallocT!WatcherDriver(m_events);
}

@@ -177,14 +183,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Timers m_timers;
Events m_events;
Processes m_processes;
UringEventLoop m_uring;
bool m_exit = false;
EventID m_wakeupEvent;

shared Mutex m_threadCallbackMutex;
ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
}

this(Loop loop, Timers timers, Events events, Processes processes)
this(Loop loop, Timers timers, Events events, Processes processes,
UringEventLoop uring)
@nogc {
m_loop = loop;
m_timers = timers;
@@ -194,6 +202,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_threadCallbackMutex = mallocT!(shared(Mutex));
m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
m_threadCallbacks.reserve(1000);
m_uring = uring;
m_uring.registerEventID(m_wakeupEvent);
}

final void dispose()
@@ -207,7 +217,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg);
}

@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
@property size_t waiterCount() const {
return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount + m_uring.waiterCount;
}

final override ExitReason processEvents(Duration timeout)
{
@@ -223,12 +235,17 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
if (!waiterCount) {
return ExitReason.outOfWaiters;
}

version (linux) {
// this is required to make the kernel aware of
// submitted SEQ, otherwise the first call to
// process events could stall
m_uring.submit;
}
bool got_events;

if (timeout <= 0.seconds) {
got_events = m_loop.doProcessEvents(0.seconds);
m_timers.process(MonoTime.currTime);
version (linux) got_events |= m_uring.doProcessEvents(0.seconds);
} else {
auto now = MonoTime.currTime;
do {
@@ -237,6 +254,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
auto prev_step = now;
now = MonoTime.currTime;
got_events |= m_timers.process(now);
version(linux) got_events |= m_uring.doProcessEvents(0.seconds);
if (timeout != Duration.max)
timeout -= now - prev_step;
} while (timeout > 0.seconds && !m_exit && !got_events);
5 changes: 3 additions & 2 deletions source/eventcore/drivers/posix/events.d
Original file line number Diff line number Diff line change
@@ -80,7 +80,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
if (!() @trusted {
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
int nl = addr.nameLen;
import eventcore.internal.utils : print;
if (bind(fd[1], addr.name, addr.nameLen) != 0)
return false;
assert(nl == addr.nameLen);
@@ -167,7 +166,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
@trusted {
EventID event = cast(EventID)fd;
ulong cnt;
() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
() @trusted {
.read(cast(int)event, &cnt, cnt.sizeof);
} ();
trigger(event, cnt > 0);
}
} else {
259 changes: 259 additions & 0 deletions source/eventcore/drivers/posix/io_uring/files.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
module eventcore.drivers.posix.io_uring.files;

import eventcore.internal.utils;

import eventcore.driver;
import eventcore.drivers.posix.io_uring.io_uring;
import core.sys.posix.sys.types;
import core.sys.posix.sys.stat;
import core.sys.linux.fcntl;
import during;

import taggedalgebraic;

final class UringDriverFiles : EventDriverFiles
{
nothrow:
private {
UringEventLoop m_loop;
int m_idx;
}

this(UringEventLoop loop) @nogc nothrow
{
m_loop = loop;
}

void dispose() @safe {}

//
FileFD open(string path, FileOpenMode mode)
{
import std.string : toStringz;
import std.conv : octal;

int flags;
int amode;

final switch (mode) {
case FileOpenMode.read: flags = O_RDONLY; break;
case FileOpenMode.readWrite: flags = O_RDWR; break;
case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC; amode = octal!644; break;
case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND; amode = octal!644; break;
}
auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } ();
if (fd < 0) return FileFD.init;
return adopt(fd);
}

void open(string path, FileOpenMode mode, FileOpenCallback)
{
//TODO how do we handle ops without a prior resource/fd?
}

FileFD adopt(int system_file_handle)
{
auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } ();
if (flags == -1) return FileFD.invalid;
return () @trusted { return m_loop.initFD!FileFD(system_file_handle); }();
}

/** Disallows any reads/writes and removes any exclusive locks.
Note that the file handle may become invalid at any point after the
call to `close`, regardless of its current reference count. Any
operations on the handle will not have an effect.
*/
void close(FileFD file, FileCloseCallback onClosed) @trusted
{
if (!isValid(file)) {
onClosed(file, CloseStatus.invalidHandle);
return;
}
SubmissionEntry e;
e.prepClose(cast(int) file);
m_loop.put(cast(FD) file, EventType.status, e, &handleClose,
cast(UserCallback) onClosed);
}

private void onCompletionEvent(int fd, EventType type, ref const(CompletionEntry) e,
UserCallback cb)
{
}

private void handleClose(FD fd, ref const(CompletionEntry) e, UserCallback userCb)
nothrow
{
FileCloseCallback cb = cast(FileCloseCallback) userCb;
if (e.res < 0)
cb(cast(FileFD) fd, CloseStatus.ioError);
else
cb(cast(FileFD) fd, CloseStatus.ok);
}

private void handleOpen(FD fd, ref const(CompletionEntry) e, UserCallback cb)
{

}

ulong getSize(FileFD file)
{
import core.sys.linux.unistd : lseek64;
import core.sys.posix.stdio : SEEK_END;
if (!isValid(file)) return ulong.max;

// stat_t seems to be defined wrong on linux/64
return lseek64(cast(int)file, 0, SEEK_END);
}

/** Shrinks or extends a file to the specified size.
Params:
file = Handle of the file to resize
size = Desired file size in bytes
on_finish = Called when the operation finishes - the `size`
parameter is always set to zero
Note: this is a blocking call, since io_uring does not support
this yet.
*/
void truncate(FileFD file, ulong size, FileIOCallback on_finish)
{
import core.sys.linux.unistd : ftruncate;
// currently not supported by uring
if (ftruncate(cast(int)file, size) != 0) {
on_finish(file, IOStatus.error, 0);
return;
}
on_finish(file, IOStatus.ok, 0);
}

/** Writes data to a file
Note that only a single write operation is allowed at once. The caller
needs to make sure that either `on_write_finish` got called, or
`cancelWrite` was called before issuing the next call to `write`.
Note: IOMode is ignored
*/
void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode mode, FileIOCallback on_write_finish)
@trusted
{
if (!isValid(file))
{
on_write_finish(file, IOStatus.invalidHandle, 0);
return;
}

SubmissionEntry e;
e.prepWrite(cast(int) file, buffer, offset);
m_loop.put(file, EventType.write, e, &handleIO,
cast(UserCallback) on_write_finish);
}

private void handleIO(FD file, ref const(CompletionEntry) e, UserCallback cb)
nothrow
{
import std.algorithm : max;
FileIOCallback ioCb = cast(FileIOCallback) cb;
IOStatus status = e.res < 0 ? IOStatus.error : IOStatus.ok;
size_t written = max(0, e.res);

ioCb(cast(FileFD) file, status, written);
}

/** Cancels an ongoing write operation.
After this function has been called, the `FileIOCallback` specified in
the call to `write` is guaranteed not to be called.
*/
void cancelWrite(FileFD file)
{
m_loop.cancelOp(file, EventType.write);
}

/** Reads data from a file.
Note that only a single read operation is allowed at once. The caller
needs to make sure that either `on_read_finish` got called, or
`cancelRead` was called before issuing the next call to `read`.
Note: `mode` is ignored for files
*/
void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish)
@trusted
{
if (!isValid(file))
{
on_read_finish(file, IOStatus.invalidHandle, 0);
return;
}
SubmissionEntry e;
e.prepRead(cast(int) file, buffer, offset);
m_loop.put(file, EventType.read, e, &handleIO,
cast(UserCallback) on_read_finish);
}

/** Cancels an ongoing read operation.
After this function has been called, the `FileIOCallback` specified in
the call to `read` is guaranteed not to be called.
*/
void cancelRead(FileFD file)
{
m_loop.cancelOp(file, EventType.read);
}

/** Determines whether the given file handle is valid.
A handle that is invalid will result in no operations being carried out
when used. In particular `addRef`/`releaseRef` will have no effect, but
can safely be called and I/O operations will result in
`IOStatus.invalidHandle`.
A valid handle gets invalid when either the reference count drops to
zero, or after the file was explicitly closed.
*/
bool isValid(FileFD handle) const @nogc
{
return m_loop.isValid(handle);
}

/** Increments the reference count of the given file.
*/
void addRef(FileFD descriptor)
{
m_loop.addRef(descriptor);
}

/** Decrements the reference count of the given file.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
Passing an invalid handle will result in a return value of `true`.
*/
bool releaseRef(FileFD descriptor)
{
return m_loop.releaseRef(descriptor);
}

/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T)(FileFD descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) @nogc { emplace(cast(T*)ptr); }
static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}

/// Low-level user data access. Use `userData` instead.
protected void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system nothrow
{
return m_loop.rawUserData(descriptor, size, initialize, destroy);
}

}
312 changes: 312 additions & 0 deletions source/eventcore/drivers/posix/io_uring/io_uring.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/**
Linux io_uring based event driver implementation.
io_uring is an efficient API for asynchronous I/O on Linux, suitable for
large numbers of concurrently open sockets.
It beats epoll if you have more than 10'000 connections, has a smaller
systemcall overhead and supports sockets as well as timers and files.
io_uring works differently from epoll. Poll/epoll/kqueue based solutions
will tell you when it is safe to read or .write from a descriptor
without blocking. Upon such a notification one has to do it manually by
envoking the resp. system call.
On the other hand in io_uring one asynchronously issues a command to
read or write to a descriptor and gets a notification back once the
operation is complete. Issuing a command is done by placing a
Submission Queue Entry (SQE) in a ringbuffer shared by user space and
kernel. Upon completion a Completion Queue Entry (CQE) is placed
by the kernel into another shared ringbuffer.
This has implications on the layout of the internal data structures. While
the posix event loops center everything around the descriptors, the io_uring
loop tracks individual operations (that reference descriptors).
1. For some operations (e.g. opening a file) the descriptor is only
available after the operation completes.
2. After an operation is cancelled, it will result in an CQE nontheless,
and we cannot reliably handle this without tracking individual operations
(and multiple operations of the same kind per descriptor)
We still have to track information per descriptor, e.g. to ensure that
only one operation per kind is ongoing at the time.
This implementation tries to integrate with an sockets based event loop
(the base loop) for the primary reason that not all functionality
(e.g. signalfd) is currently available via io_uring.
We do this by registering an eventfd with the base loop that triggers
whenever new completions are available and thus wakes up the base loop.
*/
module eventcore.drivers.posix.io_uring.io_uring;

version (linux):

import eventcore.driver;
import eventcore.internal.utils;

import core.time : Duration;

import during;
import std.stdio;

/// eventcore allows exactly one simultaneous operation per kind per
/// descriptor.
enum EventType {
none,
read,
write,
status
}

private void assumeSafeNoGC(scope void delegate() nothrow doit) nothrow @nogc
@trusted {
(cast(void delegate() nothrow @nogc)doit)();
}

// this is the callback provided by the user to e.g. EventDriverFiles
alias UserCallback = void delegate() nothrow;

// this is provided by i.e. EventDriverFiles to UringEventLoop. The event loop
// will call OpCallback which should in turn call UserCallback. This is useful
// to decouble the uring stuff from the actual drivers.
alias OpCallback = void delegate(FD, ref const(CompletionEntry), UserCallback) nothrow;

// data stored per operation. Since after completion the `OpCallback`
// is called with the descriptor and CompletionEntry, all necessary information
// should be available without creating a real closure for OpCallback.
struct OpData
{
OpCallback opCb;
UserCallback userCb;
}

// information regarding a specific resource / descriptor
private struct ResourceData
{
// from the os, file descriptor, socket etc
int descriptor;
// ref count, we'll clean up internal data structures
// if this reaches zero
uint refCount;
// all data structures are reused and the validation
// counter makes sure an user cannot access a reused
// slot with an old handle
uint validationCounter;

// to track that at most one op per EventType is ongoing
OpData[EventType.max+1] runningOps;
}

// the user can store extra information per resource
// which is kept in a sep. array
private struct UserData
{
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
}

///
final class UringEventLoop
{
import eventcore.internal.consumablequeue;
import std.typecons : Tuple, tuple;

private {
Uring m_io;
ChoppedVector!(ResourceData) m_fds;
int m_runningOps;
}

nothrow @nogc
this()
{
assumeSafeNoGC({
int res = m_io.setup();
debug(UringEventLoopDebug)
{
if (res < 0)
print("Setting up uring failed: %s", -res);
}
});
}

void registerEventID(EventID id) nothrow @trusted @nogc
{
m_io.registerEventFD(cast(int) id);
}

bool isValid(FD fd) const @nogc nothrow @safe
{
return fd.value < m_fds.length
&& m_fds[fd].validationCounter == fd.validationCounter;
}

void addRef(FD fd) @nogc nothrow @safe
{
if (!isValid(fd))
return;
m_fds[fd].refCount += 1;
}

bool releaseRef(FD fd) @nogc nothrow @trusted
{
if (!isValid(fd))
return true;

ResourceData* fds = &m_fds[fd];
fds.refCount -= 1;
if (fds.refCount == 0)
{
import std.traits : EnumMembers;

// cancel all pendings ops
foreach (type; EnumMembers!EventType)
{
if (fds.runningOps[type].opCb != null)
cancelOp(fd, type);
}
}
return fds.refCount == 0;
}

void submit() nothrow @trusted @nogc
{
m_io.submit(0);
}

bool doProcessEvents(Duration timeout, bool dontWait = true) nothrow @trusted
{
import std.algorithm : max;

// we add a timeout so that we do not wait indef.
if (!dontWait && timeout != Duration.max)
{
KernelTimespec timespec;
timeout.split!("seconds", "nsecs")(timespec.tv_sec, timespec.tv_nsec);
m_io.putWith!((ref SubmissionEntry e, KernelTimespec timespec)
{
// wait for timeout or first completion
e.prepTimeout(timespec, 1);
e.user_data = ulong.max;
})(timespec);
}
int res = m_io.submit(dontWait ? 0 : 1);
if (res < 0)
{
return false;
}
bool gotEvents = !m_io.empty;
foreach (ref CompletionEntry e; m_io)
{
import eventcore.internal.utils : print;
import std.algorithm : all;
// internally used timeouts
if (e.user_data == ulong.max)
continue;
int fd;
EventType type;
// let the specific driver handle the rest
splitKey(e.user_data, fd, type);
assert (fd < m_fds.length);
OpData* op = &m_fds[fd].runningOps[type];
// cb might be null, if the operation was cancelled
if (op.opCb)
{
m_runningOps -= 1;
op.opCb(FD(fd, m_fds[fd].validationCounter), e, op.userCb);
*op = OpData.init;
} else if (m_fds[fd].runningOps[].all!(x => x.opCb == null))
{
resetFD(m_fds[fd]);
}
}
return gotEvents;
}

void resetFD(ref ResourceData data) nothrow @nogc
{
data.descriptor = 0;
}

@property size_t waiterCount() const nothrow @safe { return m_runningOps; }

package FDType initFD(FDType)(size_t fd)
{
auto slot = &m_fds[fd];
assert (slot.refCount == 0, "Initializing referenced file descriptor slot.");
assert (slot.descriptor == 0, "Initializing referenced file descriptor slot.");
slot.refCount = 1;
return FDType(fd, slot.validationCounter);
}

package void put(in FD fd, in EventType type, SubmissionEntry e,
OpCallback cb, UserCallback userCb) nothrow
{
m_runningOps += 1;
ulong userData = combineKey(cast(int) fd, type);
e.user_data = userData;
m_io.put(e);
assert (m_fds[fd.value].runningOps[type].opCb == null);
m_fds[fd.value].runningOps[type] = OpData(cb, userCb);
}

void cancelOp(FD fd, EventType type) @trusted nothrow @nogc
{
if (!isValid(fd))
{
print("cancel for invalid fd");
return;
}
if (m_fds[fd].runningOps[type] == OpData.init)
{
print("cancelling op that's not running");
return;
}
ulong op = combineKey(cast(int) fd, type);
m_io.putWith!((ref SubmissionEntry e, ulong op)
{
// result is ignored (own userData is ulong.max)
prepCancel(e, op);
e.user_data = ulong.max;
})(op);
m_runningOps -= 1;
m_fds[fd].runningOps[type] = OpData.init;
}

package void* rawUserData(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
nothrow @system
{
return null;
}

package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system @nogc nothrow
{
return null;
}
}

void splitKey(ulong key, out int fd, out EventType type) @nogc nothrow
{
fd = cast(int) (key >> 32);
type = cast(EventType) ((key << 32) >>> 32);
}

ulong combineKey(int fd, EventType type) @nogc nothrow
{
return cast(ulong)(fd) << 32 | cast(int) type;
}

@nogc nothrow
unittest
{
ulong orig = 0xDEAD0001;
int fd;
EventType type;
splitKey(orig, fd, type);
assert (type == cast(EventType)1);
assert (fd == 0xDEAD);
assert (orig == combineKey(driver, op));
}
2 changes: 2 additions & 0 deletions source/eventcore/meson.build
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ eventcore_src = [
'drivers/posix/driver.d',
'drivers/posix/epoll.d',
'drivers/posix/events.d',
'drivers/posix/io_uring/io_uring.d',
'drivers/posix/io_uring/files.d',
'drivers/posix/kqueue.d',
'drivers/posix/pipes.d',
'drivers/posix/processes.d',