Разлика между thread::spawn
и thread::Builder::new().spawn()
use std::thread;
let handle = thread::spawn(|| println!("изненада!")); // JoinHandle<()>
let probably_handle = thread::Builder::new()
.name(String::from("safe"))
.spawn(|| println!("сюрприз!")); // Result<JoinHandle<()>>
handle.join();
probably_handle.unwrap().join();
// match probably_handle { ... }
Какво може да правим с JoinHandle
?
let handle = thread::spawn(|| 2_i32 + 2); // JoinHandle<i32>
match handle.join() {
Ok(value) => println!("{}", value),
Err(e) => panic!("Изчислението беше ТВЪРДЕ СЛОЖНО! {:?}", e)
}
let counter = Arc::new(Mutex::new(0_u32)); // Arc<Mutex<u32>>
let mut handles = Vec::new();
for _ in 0..10 {
let counter = Arc::clone(&counter); // Arc<Mutex<u32>>
let handle = thread::spawn(move || {
let mut num = counter.
lock(). // LockResult<MutexGuard<T>>
unwrap(); // MutexGuard<T>
// *MutexGuard<T> -> *&T
*num += 1;
});
handles.push(handle);
}
Send
-типове могат да се move-ват между нишки. Arc
е Send
, Rc
не е Send
.
Sync
-типове могат да се reference-ват между нишки. Reference към споделен тип може да се вземе от няколко нишки (спазвайки обичайните правила). RefCell
не е Sync
. Mutex
е Sync
.
T
е Sync
, ако &T
е Send
.AtomicU32 ~~ Mutex<u32>
use std::sync::{Arc, Barrier};
use std::thread;
let mut handles = Vec::with_capacity(10);
// създаваме бариера, която ще блокира първите 9 нишки, които извикат .wait()
// и ще събуди всички когато 10-тата извика .wait()
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = Arc::clone(&barrier);
// Еднаквите съобщения ще се принтират заедно
handles.push(thread::spawn(move|| {
println!("before wait");
c.wait();
println!("after wait");
}));
}
for handle in handles {
handle.join().unwrap();
}
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let &(ref mutex, ref cvar) = &*pair2;
let mut started = mutex.lock().unwrap();
*started = true;
// Казваме на `cvar` че сме променили стойността.
// Ако има чакаща нишка с `cvar.wait()` тя ще бъде събудена
// Ако няма нищо няма да се случи - извиквания на `notify_one` или `notify_all` не се бъферират
cvar.notify_one();
});
// ...
fn main() {
// ...
let &(ref mutex, ref cvar) = &*pair;
let mut started = mutex.lock().unwrap();
// Изчакваме новата нишка да започне
// Възможно е тази нишката да бъде събудена спонтанно, без да е извикано `cvar.notify_*()`,
// затова проверяваме дали е изпълнено условието в цикъл
while !*started {
// `cvar.wait()` прави следното
// - отключва мутекса `started`
// - слага нишката да спи
// - когато нишката се събуди, заключва мутекса и го връща
started = cvar.wait(started).unwrap();
}
}
let lock = RwLock::new(5);
// many reader locks can be held at once
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // read locks are dropped at this point
// only one write lock may be held, however
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
} // write lock is dropped here
Mutex
, но различава между заключване за писане и за четене
RwLock
-а заключен за писане, RwLock
-а ще бъде отровен
Don't communicate by sharing memory,
share memory by communicating
-- go-lang
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap()); // 10
}
std::sync::mpsc::channel()
(Sender, Receiver)
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
k
съобщения
k = 0
става "rendezvous" каналlet (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
let (sender, receiver) = mpsc::channel();
let shared_sender = Arc::new(sender);
thread::spawn(move || {
shared_sender.send(1).unwrap();
shared_sender.send(2).unwrap();
});
thread::spawn(move || {
shared_sender.send(3).unwrap();
shared_sender.send(4).unwrap();
});
println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
error[E0277]: the trait bound `std::sync::mpsc::Sender: std::marker::Sync` is not satisfied --> src/main.rs:9:5 | 9 | thread::spawn(move || { | ^^^^^^^^^^^^^ `std::sync::mpsc::Sender ` cannot be shared between threads safely |
Sender
е Send
, но не е Sync
clone()
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
// примерен изход: 3 4 1 2
fn send(&self, t: T) -> Result<(), SendError<T>>
t
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
fn send(&self, t: T) -> Result<(), SendError<T>>
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
send
изпраща t
и блокира ако буфера е пълен
try_send
изпраща t
и не блокира
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
Receiver
не може да се клонира
Receiver
e Send
, но не е Sync
Ако много искаме можем да си развием въображението
Arc<Mutex<Receiver>>
?
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
sender.send(12).unwrap();
sender2.send(23).unwrap();
mem::drop(sender);
assert_eq!(receiver.try_recv(), Ok(12));
assert_eq!(receiver.try_recv(), Ok(23));
assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
mem::drop(sender2);
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
for msg in receiver.iter() {
// ...
}
for msg in receiver.try_iter() {
// ...
}
while let Ok(msg) = receiver.recv() {
// ...
}
while let Ok(msg) = receiver.try_recv() {
// ...
}
Стандартната библиотека имплементира networking примитиви в модула std::net
use std::net::UdpSocket;
// сокета се затваря на края на scope-a
{
let mut socket = UdpSocket::bind("127.0.0.1:34254")?;
// Получава една дейтаграма от сокета. Ако буфера е прекалено малък за съобщението,
// то ще бъде орязано.
let mut buf = [0; 10];
let (amt, src) = socket.recv_from(&mut buf)?;
// Редекларира `buf` като слайс от получените данни и ги праща в обратен ред.
let buf = &mut buf[..amt];
buf.reverse();
socket.send_to(buf, &src)?;
}
use std::io::prelude::*;
use std::net::TcpStream;
// стриймът се затваря на края на scope-a
{
let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
let _ = stream.write(&[1]);
let _ = stream.read(&mut [0; 128]);
}
use std::net::{TcpListener, TcpStream};
fn handle_client(stream: TcpStream) {
// ...
}
let listener = TcpListener::bind("127.0.0.1:80").unwrap();
// примера конекции и ги обработва
for stream in listener.incoming() {
handle_client(stream?);
}
Ще разгледаме проста чат система за демонстрация на нишки, канали и TCP
Пълния код може да се разгледа в Github
Какво няма да обхванем:
const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;
fn main() {
let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
server.set_nonblocking(true).expect("Failed to initialize nonblocking");
// Stores client sockets
let mut clients = Vec::new();
let (sx, rx) = mpsc::channel::<String>();
loop {
/* accept */
/* broadcast */
thread::sleep(Duration::from_millis(100));
}
}
Използваме nonblocking sockets, за да може да не правим само accept в главната нишка
// Try to accept a client
if let Ok((mut socket, addr)) = server.accept() {
println!("Client {} connected", addr);
clients.push(socket.try_clone().expect("Failed to clone client"));
let sx = sx.clone();
thread::spawn(move || loop {
/* try recv */
thread::sleep(Duration::from_millis(100));
});
}
/* message broadcast */
if let Ok(msg) = rx.try_recv() {
// Try to send message from master channel
clients = clients.into_iter().filter_map(|mut client| {
let mut buf = msg.clone().into_bytes();
buf.resize(MESSAGE_SIZE, 0);
client.write_all(&buf).map(|_| client).ok()
}).collect::<Vec<_>>();
}
let mut buf = vec![0; MESSAGE_SIZE];
// Try to receive message from client
match socket.read_exact(&mut buf) {
Ok(_) => {
let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
let msg = String::from_utf8(msg).expect("Invalid utf8 message");
println!("{}: {:?}", addr, msg);
sx.send(msg).expect("Send to master channel failed");
},
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) => {
println!("Closing connection with: {}", addr);
break;
}
}
const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;
fn main() {
let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
client.set_nonblocking(true).expect("Failed to initialize nonblocking");
let (sx, rx) = mpsc::channel::<String>();
thread::spawn(move || loop {
/* try recv */
/* try send */
sleep();
});
/* repl */
}
println!("Welcome!");
loop {
let mut buf = String::new();
io::stdin().read_line(&mut buf).expect("Reading form stdin failed");
let msg = buf.trim().to_string();
if msg == ":q" || sx.send(msg).is_err() { break }
}
println!("Bye!");
Прилича много на кода в server-а
let mut buf = vec![0; MESSAGE_SIZE];
// Try to receive message from server
match client.read_exact(&mut buf) {
Ok(_) => {
let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
let msg = String::from_utf8(msg).expect("Invalid utf8 message");
println!("message recv {:?}", msg);
},
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) => {
println!("Connection with the server closed");
break;
}
}
let mut buf = vec![0; MESSAGE_SIZE];
// Try to send message
match rx.try_recv() {
Ok(msg) => {
let mut buf = msg.clone().into_bytes();
buf.resize(MESSAGE_SIZE, 0);
client.write_all(&buf).expect("Writing to socket failed");
println!("message sent {:?}", msg);
},
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => break
}