Auto merge of #3804 - tiif:blockit, r=oli-obk

Support blocking for epoll

This PR enabled epoll to have blocking operation.

The changes introduced by this PR are:
- Refactored part of the logic in ``epoll_wait`` to ``blocking_epoll_callback``
- Added a new field ``thread_ids`` in ``Epoll`` for blocked thread ids
- Added a new ``BlockReason::Epoll``
This commit is contained in:
bors 2024-08-27 18:40:47 +00:00
commit 2d69baa6e3
8 changed files with 315 additions and 63 deletions

View File

@ -172,6 +172,8 @@ pub enum BlockReason {
Futex { addr: u64 },
/// Blocked on an InitOnce.
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
}
/// The state of a thread.

View File

@ -278,6 +278,14 @@ impl WeakFileDescriptionRef {
}
}
impl VisitProvenance for WeakFileDescriptionRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// A weak reference can never be the only reference to some pointer or place.
// Since the actual file description is tracked by strong ref somewhere,
// it is ok to make this a NOP operation.
}
}
/// A unique id for file descriptions. While we could use the address, considering that
/// is definitely unique, the address would expose interpreter internal state when used
/// for sorting things. So instead we generate a unique id per file description that stays

View File

@ -2,8 +2,9 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io;
use std::rc::{Rc, Weak};
use std::time::Duration;
use crate::shims::unix::fd::{FdId, FileDescriptionRef};
use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::*;
use crate::*;
@ -19,6 +20,8 @@ struct Epoll {
// This is an Rc because EpollInterest need to hold a reference to update
// it.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
/// A list of thread ids blocked on this epoll instance.
thread_id: RefCell<Vec<ThreadId>>,
}
/// EpollEventInstance contains information that will be returned by epoll_wait.
@ -58,6 +61,8 @@ pub struct EpollEventInterest {
data: u64,
/// Ready list of the epoll instance under which this EpollEventInterest is registered.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
/// The file descriptor value that this EpollEventInterest is registered under.
epfd: i32,
}
/// EpollReadyEvents reflects the readiness of a file description.
@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
events,
data,
ready_list: Rc::clone(ready_list),
epfd: epfd_value,
}));
if op == epoll_ctl_add {
@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// The `timeout` argument specifies the number of milliseconds that
/// `epoll_wait()` will block. Time is measured against the
/// CLOCK_MONOTONIC clock.
/// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block,
/// while if the timeout is -1, the function will block
/// until at least one event has been retrieved (or an error
/// occurred).
/// A call to `epoll_wait()` will block until either:
/// • a file descriptor delivers an event;
@ -421,59 +430,100 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
events_op: &OpTy<'tcx>,
maxevents: &OpTy<'tcx>,
timeout: &OpTy<'tcx>,
) -> InterpResult<'tcx, Scalar> {
dest: &MPlaceTy<'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let epfd = this.read_scalar(epfd)?.to_i32()?;
let epfd_value = this.read_scalar(epfd)?.to_i32()?;
let events = this.read_immediate(events_op)?;
let maxevents = this.read_scalar(maxevents)?.to_i32()?;
let timeout = this.read_scalar(timeout)?.to_i32()?;
if epfd <= 0 || maxevents <= 0 {
if epfd_value <= 0 || maxevents <= 0 {
let einval = this.eval_libc("EINVAL");
this.set_last_error(einval)?;
return Ok(Scalar::from_i32(-1));
this.write_int(-1, dest)?;
return Ok(());
}
// This needs to come after the maxevents value check, or else maxevents.try_into().unwrap()
// will fail.
let events = this.deref_pointer_as(
let event = this.deref_pointer_as(
&events,
this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()),
)?;
// FIXME: Implement blocking support
if timeout != 0 {
throw_unsup_format!("epoll_wait: timeout value can only be 0");
}
let Some(epfd) = this.machine.fds.get(epfd) else {
return Ok(Scalar::from_i32(this.fd_not_found()?));
let Some(epfd) = this.machine.fds.get(epfd_value) else {
let result_value: i32 = this.fd_not_found()?;
this.write_int(result_value, dest)?;
return Ok(());
};
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
// Create a weak ref of epfd and pass it to callback so we will make sure that epfd
// is not close after the thread unblocks.
let weak_epfd = epfd.downgrade();
let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = this.project_array_fields(&events)?;
while let Some(des) = array_iter.next(this)? {
if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) {
this.write_int_fields_named(
&[
("events", epoll_event_instance.events.into()),
("u64", epoll_event_instance.data.into()),
],
&des.1,
)?;
num_of_events = num_of_events.strict_add(1);
} else {
break;
}
// We just need to know if the ready list is empty and borrow the thread_ids out.
// The whole logic is wrapped inside a block so we don't need to manually drop epfd later.
let ready_list_empty;
let mut thread_ids;
{
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
let binding = epoll_file_description.get_ready_list();
ready_list_empty = binding.borrow_mut().is_empty();
thread_ids = epoll_file_description.thread_id.borrow_mut();
}
Ok(Scalar::from_i32(num_of_events))
if timeout == 0 || !ready_list_empty {
// If the ready list is not empty, or the timeout is 0, we can return immediately.
blocking_epoll_callback(epfd_value, weak_epfd, dest, &event, this)?;
} else {
// Blocking
let timeout = match timeout {
0.. => {
let duration = Duration::from_millis(timeout.try_into().unwrap());
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration))
}
-1 => None,
..-1 => {
throw_unsup_format!(
"epoll_wait: Only timeout values greater than or equal to -1 are supported."
);
}
};
thread_ids.push(this.active_thread());
let dest = dest.clone();
this.block_thread(
BlockReason::Epoll,
timeout,
callback!(
@capture<'tcx> {
epfd_value: i32,
weak_epfd: WeakFileDescriptionRef,
dest: MPlaceTy<'tcx>,
event: MPlaceTy<'tcx>,
}
@unblock = |this| {
blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event, this)?;
Ok(())
}
@timeout = |this| {
// No notification after blocking timeout.
let Some(epfd) = weak_epfd.upgrade() else {
throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.")
};
// Remove the current active thread_id from the blocked thread_id list.
epfd.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?
.thread_id.borrow_mut()
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
Ok(())
}
),
);
}
Ok(())
}
/// For a specific file description, get its ready events and update the corresponding ready
@ -483,17 +533,47 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
///
/// This *will* report an event if anyone is subscribed to it, without any further filtering, so
/// do not call this function when an FD didn't have anything happen to it!
fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> {
let this = self.eval_context_ref();
fn check_and_update_readiness(
&mut self,
fd_ref: &FileDescriptionRef,
) -> InterpResult<'tcx, ()> {
let this = self.eval_context_mut();
let id = fd_ref.get_id();
let mut waiter = Vec::new();
// Get a list of EpollEventInterest that is associated to a specific file description.
if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) {
for weak_epoll_interest in epoll_interests {
if let Some(epoll_interest) = weak_epoll_interest.upgrade() {
check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
let is_updated = check_and_update_one_event_interest(
fd_ref,
epoll_interest.clone(),
id,
this,
)?;
if is_updated {
// Edge-triggered notification only notify one thread even if there are
// multiple threads block on the same epfd.
let epfd = this.machine.fds.get(epoll_interest.borrow().epfd).unwrap();
// This unwrap can never fail because if the current epoll instance were
// closed and its epfd value reused, the upgrade of weak_epoll_interest
// above would fail. This guarantee holds because only the epoll instance
// holds a strong ref to epoll_interest.
// FIXME: We can randomly pick a thread to unblock.
if let Some(thread_id) =
epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
{
waiter.push(thread_id);
};
}
}
}
}
waiter.sort();
waiter.dedup();
for thread_id in waiter {
this.unblock_thread(thread_id, BlockReason::Epoll)?;
}
Ok(())
}
}
@ -517,14 +597,15 @@ fn ready_list_next(
}
/// This helper function checks whether an epoll notification should be triggered for a specific
/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness,
/// this function sends a notification to only one epoll instance.
/// epoll_interest and, if necessary, triggers the notification, and returns whether the
/// notification was added/updated. Unlike check_and_update_readiness, this function sends a
/// notification to only one epoll instance.
fn check_and_update_one_event_interest<'tcx>(
fd_ref: &FileDescriptionRef,
interest: Rc<RefCell<EpollEventInterest>>,
id: FdId,
ecx: &MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
) -> InterpResult<'tcx, bool> {
// Get the bitmask of ready events for a file description.
let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx);
let epoll_event_interest = interest.borrow();
@ -539,6 +620,46 @@ fn check_and_update_one_event_interest<'tcx>(
let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
// Triggers the notification by inserting it to the ready list.
ready_list.insert(epoll_key, event_instance);
return Ok(true);
}
return Ok(false);
}
/// Callback function after epoll_wait unblocks
fn blocking_epoll_callback<'tcx>(
epfd_value: i32,
weak_epfd: WeakFileDescriptionRef,
dest: &MPlaceTy<'tcx>,
events: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(epfd) = weak_epfd.upgrade() else {
throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.")
};
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;
while let Some(des) = array_iter.next(ecx)? {
if let Some(epoll_event_instance) = ready_list_next(ecx, &mut ready_list) {
ecx.write_int_fields_named(
&[
("events", epoll_event_instance.events.into()),
("u64", epoll_event_instance.data.into()),
],
&des.1,
)?;
num_of_events = num_of_events.strict_add(1);
} else {
break;
}
}
ecx.write_int(num_of_events, dest)?;
Ok(())
}

View File

@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
"epoll_wait" => {
let [epfd, events, maxevents, timeout] =
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.epoll_wait(epfd, events, maxevents, timeout)?;
this.write_scalar(result, dest)?;
this.epoll_wait(epfd, events, maxevents, timeout, dest)?;
}
"eventfd" => {
let [val, flag] =

View File

@ -1,15 +0,0 @@
error: unsupported operation: epoll_wait: timeout value can only be 0
--> CARGO_REGISTRY/.../epoll.rs:LL:CC
|
LL | / syscall!(epoll_wait(
LL | | self.ep.as_raw_fd(),
LL | | events.as_mut_ptr(),
LL | | events.capacity() as i32,
LL | | timeout,
LL | | ))
| |__________^ epoll_wait: timeout value can only be 0
|
= help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
error: aborting due to 1 previous error

View File

@ -0,0 +1,139 @@
//@only-target-linux
// test_epoll_block_then_unblock depends on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0
use std::convert::TryInto;
use std::thread;
use std::thread::spawn;
// This is a set of testcases for blocking epoll.
fn main() {
test_epoll_block_without_notification();
test_epoll_block_then_unblock();
test_notification_after_timeout();
}
// Using `as` cast since `EPOLLET` wraps around
const EPOLL_IN_OUT_ET: u32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _;
#[track_caller]
fn check_epoll_wait<const N: usize>(
epfd: i32,
expected_notifications: &[(u32, u64)],
timeout: i32,
) {
let epoll_event = libc::epoll_event { events: 0, u64: 0 };
let mut array: [libc::epoll_event; N] = [epoll_event; N];
let maxsize = N;
let array_ptr = array.as_mut_ptr();
let res = unsafe { libc::epoll_wait(epfd, array_ptr, maxsize.try_into().unwrap(), timeout) };
if res < 0 {
panic!("epoll_wait failed: {}", std::io::Error::last_os_error());
}
assert_eq!(
res,
expected_notifications.len().try_into().unwrap(),
"got wrong number of notifications"
);
let slice = unsafe { std::slice::from_raw_parts(array_ptr, res.try_into().unwrap()) };
for (return_event, expected_event) in slice.iter().zip(expected_notifications.iter()) {
let event = return_event.events;
let data = return_event.u64;
assert_eq!(event, expected_event.0, "got wrong events");
assert_eq!(data, expected_event.1, "got wrong data");
}
}
// This test allows epoll_wait to block, then unblock without notification.
fn test_epoll_block_without_notification() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
// Create an eventfd instances.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };
// Register eventfd with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
assert_eq!(res, 0);
// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fd as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
// This epoll wait blocks, and timeout without notification.
check_epoll_wait::<1>(epfd, &[], 5);
}
// This test triggers notification and unblocks the epoll_wait before timeout.
fn test_epoll_block_then_unblock() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
// Create a socketpair instance.
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
// Register one side of the socketpair with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
assert_eq!(res, 0);
// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
// epoll_wait before triggering notification so it will block then get unblocked before timeout.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
let thread1 = spawn(move || {
thread::yield_now();
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
});
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
thread1.join().unwrap();
}
// This test triggers a notification after epoll_wait times out.
fn test_notification_after_timeout() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
// Create a socketpair instance.
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
// Register one side of the socketpair with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
assert_eq!(res, 0);
// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
// epoll_wait timeouts without notification.
check_epoll_wait::<1>(epfd, &[], 10);
// Trigger epoll notification after timeout.
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
// Check the result of the notification.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
}

View File

@ -1,14 +1,12 @@
//@compile-flags: -Zmiri-permissive-provenance -Zmiri-backtrace=full
//@only-target-x86_64-unknown-linux: support for tokio only on linux and x86
//@error-in-other-file: timeout value can only be 0
//@normalize-stderr-test: " += note:.*\n" -> ""
use tokio::time::{sleep, Duration, Instant};
#[tokio::main]
async fn main() {
let start = Instant::now();
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_millis(100)).await;
let time_elapsed = &start.elapsed().as_millis();
assert!((1000..1100).contains(time_elapsed), "{}", time_elapsed);
assert!((100..1000).contains(time_elapsed), "{}", time_elapsed);
}