Deconstructing Send, Arc, and Mutex
thread::spawn
Function
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
// ...
}
Quick Primer on Rust Closures
- 3 categories of data
- data the closure closes over / captures: Upvars
- convenient compiler terminology
- not represented by closure type signature
- parameters
- returned value
- data the closure closes over / captures: Upvars
let upper_threshold = 20;
let outliers: Vec<_> = data.iter().copied().filter(|n| -> bool {
// `n` is a parameter, `upper_threshold` is an *upvar*
n >= upper_threshold
}).collect();
Spawn closure type
F: FnOnce() -> T
- closure doesn't accept any parameters
- closure can consume upvars ("FnOnce")
F: Send + 'static
- applies to upvars
T: Send + 'static
- applies to returned value
T: 'static
Two options allowed:
- the type doesn't have any references inside ("Owned data")
struct User { name: String }
- the references inside the type are
'static
struct Db { connection_string: &'static str }
Why F: 'static
and T: 'static
?
- applies to data passed from parent thread to child thread or vice-versa
- prevents passing references to local variables
- one thread can finish before the other and such references may become invalid
+ 'static
avoids this by ensuring any references point to data that has the static lifetime (i.e. that lives forever)
T: Send
pub unsafe auto trait Send { }
auto
means all types get this trait automatically- opt-out instead of opt-in
- various types in standard library implement
Send
or!Send
unsafe
means you have to putunsafe
keyword in front ofimpl
when implementingSend
or!Send
- precautionary measure
Why would one implement Send
or !Send
- Rust pointers (
*const T
,*mut T
,NonNull<T>
) are!Send
- Use-case: what if the pointer comes from FFI library that assumes that all functions using this pointer are called from the same thread?
Arc
has aNonNull<..>
inside and becomes!Send
automatically- to override this behavior
Arc
explicitly implementsSend
- to override this behavior
Send
in thread::spawn
Function
F: Send
and T: Send
means that all data traveling from the parent thread to child thread has to be marked as Send
- Rust compiler has no inherent knowledge of threads, but the use of marker traits and lifetime annotations let the type / borrow checker prevent data race errors
Sharing data between threads
Example: Message Log for TCP Echo Server
use std::{
io::{self, BufRead as _, Write as _},
net, thread,
};
fn handle_client(stream: net::TcpStream) -> Result<(), io::Error> {
let mut writer = io::BufWriter::new(&stream);
let reader = io::BufReader::new(&stream);
for line in reader.lines() {
let line = line?;
writeln!(writer, "{}", line)?;
writer.flush()?;
}
Ok(())
}
fn main() -> Result<(), io::Error> {
let listener = net::TcpListener::bind("0.0.0.0:7878")?;
for stream in listener.incoming() {
let stream = stream?;
thread::spawn(|| {
let _ = handle_client(stream);
});
}
Ok(())
}
Task
- create a log of lengths of all lines coming from all streams
let mut log = Vec::<usize>::new();
log.push(line.len());
"Dream" API
fn handle_client(stream: net::TcpStream, log: &mut Vec<usize>) -> Result<(), io::Error> {
// ...
for line in ... {
log.push(line.len());
// ...
}
Ok(())
}
fn main() -> Result<(), io::Error> {
let mut log = vec![];
for stream in listener.incoming() {
// ...
thread::spawn(|| {
let _ = handle_client(stream, &mut log);
});
}
Ok(())
}
Errors
error[E0373]: closure may outlive the current function, but it borrows `log`, which is owned by the current function
--> src/main.rs:26:23
|
26 | thread::spawn(|| {
| ^^ may outlive borrowed value `log`
27 | let _ = handle_client(stream.unwrap(), &mut log);
| --- `log` is borrowed here
|
--> src/main.rs:26:23
|
26 | thread::spawn(|| {
| ^^ may outlive borrowed value `log`
27 | let _ = handle_client(stream.unwrap(), &mut log);
| --- `log` is borrowed here
|
note: function requires argument type to outlive `'static`
Lifetime problem
Problem:
- local data may be cleaned up prematurely
Solution:
- move the decision when to clean the data from compile-time to run-time
- use reference-counting
Attempt 1: Rc
let mut log = Rc::new(vec![]);
let mut thread_log = log.clone()
now doesn't clone the data, but simply increases the reference count- both variables now have owned type, and satisfy
F: 'static
requirement
- both variables now have owned type, and satisfy
error[E0277]: `Rc<​Vec<​usize​>​>` cannot be sent between threads safely
Rc
in Rust Standard Library
- uses
usize
for reference counting - explicitly marked as
!Send
pub struct Rc<T> {
ptr: NonNull<RcBox<T>>,
}
impl<T> !Send for Rc<T> {}
struct RcBox<T> {
strong: Cell<usize>,
weak: Cell<usize>,
value: T,
}
Arc
in Rust Standard Library
- uses
AtomicUsize
for reference counting - explicitly marked as
Send
pub struct Arc<T> {
ptr: NonNull<ArcInner<T>>,
}
impl<T> Send for Arc<T> {}
struct ArcInner<T: ?Sized> {
strong: atomic::AtomicUsize,
weak: atomic::AtomicUsize,
data: T,
}
Rc
vs Arc
Arc
usesAtomicUsize
for reference counting- slower
- safe to increment / decrement from multiple threads
- With the help of marker trait
Send
and trait bounds onthread::spawn
, the compiler forces you to use the correct type
Arc
/ Rc
"transparency"
let mut log = Arc::new(Vec::new());
// how does this code work?
log.len();
// and why doesn't this work?
log.push(1);
Deref
and DerefMut
traits
pub trait Deref {
type Target: ?Sized;
fn deref(&self) -> &Self::Target;
}
pub trait DerefMut: Deref {
fn deref_mut(&mut self) -> &mut Self::Target;
}
Deref
coercions
Deref
can convert a&self
reference to a reference of another type- conversion function call can be inserted by the compiler for you automatically
- in most cases the conversion is a no-op or a fixed pointer offset
- deref functions can be inlined
Target
is an associated type- can't
deref()
into multiple different types
- can't
DerefMut: Deref
allows theDerefMut
trait to reuse the sameTarget
type- read-only and read-write references coerce to the references of the same type
Arc
/ Rc
"transparency" with Deref
let mut log = Arc::new(Vec::new());
// Arc<T> implements `Deref` from `&Arc<T> into `&T`
log.len();
// the same as
Vec::len(<Arc<_> as Deref>::deref(&log));
// Arc<T> DOES NOT implement `DerefMut`
// log.push(1);
// the line above would have expanded to:
// Vec::push(<Arc<_> as DerefMut>::deref_mut(&mut log), 1);
Arc
and mutability
- lack of
impl DerefMut for Arc
prevents accidental creation of multiple&mut
to underlying data - the solution is to move mutability decision to runtime
let log = Arc::new(Mutex::new(Vec::new()));
 
Arc
guarantees availability of data in memory- prevents memory form being cleaned up prematurely
Mutex
guarantees exclusivity of mutable access- provides only one
&mut
to underlying data simultaneously
- provides only one
Mutex
in Action
log
is passed as&
and isderef
-ed fromArc
by the compilermut
ability is localized to a localguard
variableMutex::lock
method takes&self
MutexGuard
implementsDeref
andDerefMut
!'_
lifetime annotation is needed only because guard struct has a&Mutex
inside
fn handle_client(..., log: &Mutex<Vec<usize>>) -> ... {
for line in ... {
let mut guard: MutexGuard<'_, Vec<usize>> = log.lock().unwrap();
guard.push(line.len());
// line above expands to:
// Vec::push(<MutexGuard<'_, _> as DerefMut>::deref_mut(&mut guard), line.len());
writeln!(writer, "{}", line)?;
writer.flush()?;
}
}
Mutex
locking and unlocking
- we
lock
the mutex for exclusive access to underlying data at runtime - old C APIs used a pair of functions to lock and unlock the mutex
MutexGuard
does unlocking automatically when is dropped- time between guard creation and drop is called critical section
Lock Poisoning
MutexGuard
in itsDrop
implementation checks if it is being dropped normally or during apanic
unwind- in later case sets a poison flag on the mutex
- calling
lock().unwrap()
on a poisoned Mutex causespanic
- if the mutex is "popular" poisoning can cause many application threads to panic, too.
PoisonError
doesn't provide information about the panic that caused the poisoning
Critical Section "Hygiene"
- keep it short to reduce the window when mutex is locked
- avoid calling functions that can panic
- using a named variable for Mutex guard helps avoiding unexpected temporary lifetime behavior
Critical Section Example
fn handle_client(..., log: &Mutex<Vec<usize>>) -> ... {
for line in ... {
{
let mut guard: MutexGuard<'_, Vec<usize>> = log.lock().unwrap();
guard.push(line.len());
} // critical section ends here, before all the IO
writeln!(writer, "{}", line)?;
writer.flush()?;
}
}
 
drop(guard)
also works, but extra block nicely highlights the critical section
Lessons Learned
- careful use of traits and trait boundaries lets the compiler detect problematic multi-threading code at compile time
Arc
andMutex
let the program ensure data availability and exclusive mutability at runtime where the compiler can't predict the behavior of the programDeref
coercions make concurrency primitives virtually invisible and transparent to use- Make invalid state unrepresentable
Full Example
use std::{
io::{self, BufRead as _, Write as _},
net,
sync::{Arc, Mutex},
thread,
};
fn handle_client(stream: net::TcpStream, log: &Mutex<Vec<usize>>) -> Result<(), io::Error> {
let mut writer = io::BufWriter::new(&stream);
let reader = io::BufReader::new(&stream);
for line in reader.lines() {
let line = line?;
{
let mut guard = log.lock().unwrap();
guard.push(line.len());
}
writeln!(writer, "{}", line)?;
writer.flush()?;
}
Ok(())
}
fn main() -> Result<(), io::Error> {
let log = Arc::new(Mutex::new(vec![]));
let listener = net::TcpListener::bind("0.0.0.0:7878")?;
for stream in listener.incoming() {
let stream = stream?;
let thread_log = log.clone();
thread::spawn(move || {
let _ = handle_client(stream, &thread_log);
});
}
Ok(())
}